Hadoop Material

             

   Hadoop Distributed Filesystem

 When a dataset outgrows the storage capacity of a single physical machine, it becomes necessary to partition it across a number of separate machines.

Filesystems that manage the storage across a network of machines are called distributed filesystems.

Since they are network-based, all the complications of network programming kick in, thus making distributed filesystems more complex than regular disk filesystems.

For example, one of the biggest challenges is making the filesystem tolerate node failure without suffering data loss.
Hadoop comes with a distributed filesystem called HDFS, which stands for  Hadoop  distributed filesystem.




                                        Design of   HDFS


Very large files
“Very large” in this context means files that are hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of data.


Streaming data access
HDFS is built around the idea that the most efficient data processing pattern is a write-once, read-many-times pattern. A dataset is typically generated or copied from source, then various analyses are performed on that dataset over time.


Commodity hardware
Hadoop doesn’t require expensive, highly reliable hardware to run on. It’s designed to run on clusters of commodity hardware for which the chance of node failure across the cluster is high, at least for large clusters. HDFS is designed to carry on working without a noticeable interruption to the user in the face of such failure.
 

       Where HDFS is not a good fit

Low-latency data access
Applications that require low-latency access to data, in the tens of milliseconds range, will not work well with HDFS. Remember, HDFS is optimized for delivering a high throughput of data, and this may be at the expense of latency. Hbase is currently a better choice for low-latency access.


Lots of small files
Since the namenode holds filesystem metadata in memory, the limit to the number of files in a filesystem is governed by the amount of memory on the namenode. As a rule of thumb, each file, directory, and block takes about 150 bytes. So, for example, if you had one million files, each taking one block, you would need at least 300 MB of memory. While storing millions of files is feasible, billions is beyond the capability of current hardware.


Multiple writers, arbitrary file modifications
Files in HDFS may be written to by a single writer. Writes are always made at the end of the file. There is no support for multiple writers, or for modifications at arbitrary offsets in the file.


                Why Is a Block in HDFS So Large?


HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost of seeks.


By making a block large enough, the time to transfer the data from the disk can be made to be significantly larger than the time to seek to the start of the block.


Thus the time to transfer a large file made of multiple blocks operates at the disk transfer rate.


A quick calculation shows that if the seek time is around 10 ms, and the transfer rate is 100 MB/s, then to make the seek time 1% of the transfer time, we need to make the block size around 100 MB.


The default is actually 64 MB, although many HDFS installations use 128 MB blocks.


             Advantage of  HDFS?


Moving Computation is Cheaper than Moving Data

A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge.

The assumption is that it is often better to migrate the computation closer to where the data is located rather than moving the data to where the application is running.

HDFS provides interfaces for applications to move themselves closer to where the data is located.


        HDFS Architecture 

HDFS has a master/slave architecture.

An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients.

It also consist of secondary namenode. It updates the namespace image with datalog.





A Client Reading data from HDFC









The client opens the file it wishes to read by calling open() on the FileSystem object, which for HDFS is an instance of DFS.


DFS calls the namenode, using RPC, to determine the locations of the blocks for the first few blocks in the file.


For each block, the namenode returns the addresses of the datanodes that have a copy of that block and the datanodes are sorted according to their proximity to the client


The DFS returns a FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from.





The client then calls read() on the stream

DFSStream connects to the first (closest) datanode for the first block in the file.

Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream


When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block


When the client has finished reading, it calls close() on the FSDataInputStream


Distance b/w nodes





                                A client writing data to  HDFS




The client creates the file by calling create() on DFS

DFS makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it.

The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file.

If these checks pass, the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException.

The DFS returns an FSDataOutputStream for the client to start writing data to.

As the client writes data, DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue.


The data queue is consumed by the DataStreamer, whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas.

The list of datanodes forms a pipeline, we’ll assume the replication level is three, so there are three nodes in the pipeline.

The DataStreamer streams the packets to the first datanode in the pipeline, which stores the packet and forwards it to the second datanode in the pipeline. Similarly, the second datanode stores the packet and forwards it to the third (and last) datanode in the pipeline

DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes, called the ack queue. A packet is removed from the ackqueue only when it has been acknowledged by all the datanodes in the pipeline


What     if datanode fails while data is being written to it ? 



First the pipeline is closed

Any packets in the ackqueue are added to the front of the data queue so that datanodes that are downstream from the failed node will not miss any packets.

The current block on the good datanodes is given a new identity, which is communicated to the namenode, so that the partial block on the failed datanode will be deleted if the failed datanode recovers later on.

The failed datanode is removed from the pipeline and the remainder of the block’s data is written to the two good datanodes                                                                       in the pipeline.

The namenode notices that the block is under-replicated, and it arranges for a further replica to be created on another node

Replication





                            Comparison with Other Systems

Why can’t we use databases with lots of disks to do large- scale batch analysis? Why is MapReduce   needed?

The answer to these questions comes from another trend in disk drives:

seek time is improving more slowly than transfer rate. Seeking is the
process of moving the disk’s head to a particular place on the disk to read or write data.
It characterizes the latency of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth.

On the other hand, for updating a small proportion of records in a database, a traditional B-Tree (the data structure used in relational databases, which is limited by the rate it can perform seeks) works well.
For updating the majority of a database, a B-Tree
is less efficient than MapReduce, which uses Sort/Merge to rebuild the database.


Example  1: Problem Statement




What’s the highest recorded global temperature for each year in the dataset?




Map and Reduce with example








Example  2: Word Count 

For the given sample input the first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>


After  applying Combiner and  Sorting 

The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>
The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>


Final output of reducer: 

Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>


  MapReduce Logical Data Flow

   




Writing MapReduce Program

 
We need three things: a map function, a reduce function, and some code to run the job.For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer).

The map function is represented by the Mapper class, which declares an abstract map() method.
The Mapper class is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function.
The map() method also provides an instance of Context to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable.

Writing MapReduce Program



The reduce function is represented by the Reducer class, which declares an abstract reduce() method.
Again, four formal type parameters are used to specify the input and output types, this time for the reduce function.
The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximum temperature, which we find by iterating through the temperatures and comparing each with a record of the highest found so far.







                                     Data Locality Optimization

Hadoop does its best to run the map task on a node where the input data resides in HDFS. This is called the data locality optimization.


Data Flow

A MapReduce job is a unit of work that the client wants to be performed, it consists of
q        The input data
q        The MapReduce program
q        Configuration information

Hadoop runs the job by dividing it into tasks
q        map tasks
q     reduce tasks


                    Nodes that control the job Execution Process

The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers.

Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job.

If a tasks fails, the jobtracker can reschedule it on a different tasktracker.


Inputsplits

Hadoop divides the input to a MapReduce job into fixed-size pieces called inputsplits, or just splits.

Hadoop creates one map task for each split, which runs the userdefined map function for each record in the split.


                              What is the Optimal inputsplit size


optimal split size == block size

If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.



Map tasks write their output to local disk, not to HDFS. Why?

Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away. So storing it in HDFS, with replication, would be overkill.


Reduce Task

Reduce tasks don’t have the advantage of data locality, the input to a single reduce task is normally the output from all mappers.


MapReduce data flow with a single reduce task






                        Multiple reduce task

When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task.

There can be many keys (and their associated values)in each partition, but the records for every key are all in a single partition.

The partitioning can be controlled by a user-defined partitioning function, but normally the default partitioner—which buckets keys using a hash function—works very well.


MapReduce data flow with multiple reduce task







        How Many reduce Tasks?


The number of reduce tasks is not governed by the size of the input, but is specified    independently.


MapReduce data flow with no reduce tasks 




     CombinerFunction

Many MapReduce jobs are limited by the bandwidth available on the cluster

It pays to minimize the data transferred between map and reduce tasks.

Hadoop allows the user to specify a combiner function to be run on the map output

The output of Combiner function is input to reducer




Without Combiner Function





If We Use Combiner 





How Mapreduce Workd?

At higher level there 4 independent entities
q The client, which submits the MapReduce job.
q The jobtracker, which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.q The tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.q The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities.


              How Hadoop runs   MapReduce Job






                     Job Submission Process

The job submission process implemented by JobClient’s submitJob() method does the following:

Asks the jobtracker for a new job ID(step 2) Checks the output specification of the job
Computes the input splits for the job. If the splits cannot be computed, because the input paths don’t exist, for example, then the job is not submitted and an error is thrown to the MapReduce program.
Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a directory named after the job ID.(step 3) The job JAR is copied with a high replication factor(default mapred.submit.replication 10), so that there are lots of copies across the cluster for the tasktrackers to access when they run tasks for the job
Tells the jobtracker that the job is ready for execution(by calling submitJob() on JobTracker)(step 4)



      Job initialization

When the JobTracker receives a call to its submitJob() method,It puts it into an internal queue from where the job scheduler will pick it up and initialize it.
Initialization involves
qcreating an object to represent the job being run, which encapsulates its tasks
 qbookkeeping information to keep track of the tasksstatus and progress(step 5)
To create the list of tasks to run, the job scheduler first retrieves the input splits computed by the JobClient from the shared filesystem (step 6)
It then creates one map task for each split.
The number of reduce tasks to create is determined by the mapred.reduce.tasks property in the JobConf
The scheduler simply creates this number of reduce tasks to be run. Tasks are given IDs at this point.


Task Assignment  &hearbeat


Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.Heartbeats tell the jobtracker that a tasktracker is alive
As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value(step 7)
Tasktrackers have a fixed number of slots for map tasks and for reduce tasks:The default scheduler fills empty map task slots before reduce task slots
If the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select a reduce task.


                                            Task Execution

Once the tasktracker has been assigned a taskIt localizes the job JAR by copying it from the shared filesystem to the tasktracker’s
filesystem. It also copies any files needed from the distributed cache by the application to the local disk(step 8)
It creates a local working directory for the task, and un-jars the contents of the JAR into this directory.It creates an instance of TaskRunner to run the task.


                               Task Runner

TaskRunner launches a new Java Virtual Machine(step 9) run each task in(step 10)


Why new JVM for each Task ?

Any bugs in the user-defined map and reduce functions don’t affect the tasktracker(by causing it to crash or hang)
The child process communicates with its parent through the umbilical interface.It informs the parent of the task’s progress every few seconds until the task is complete.
User  Defined Counters


Hadoop Streaming

Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java.
Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce program.

Ruby, Python…


hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*- streaming.jar \
-input input/sample.txt \
-output output \
-mapper src/main/ruby/mapper.rb \
-reducer src/main/ruby/reducer.rb


Hadoop Pipes

Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce.Unlike Streaming, which uses standard input and output to communicate with the map and reduce code
Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function. JNI is not used.


hadoop fs -put max_temperature bin/max_temperature hadoop fs -put input/sample.txt sample.txt
hadoop pipes \
-D  hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input sample.txt \
-output output \
-program  bin/max_temperature


Job status update



Data flow for two jobs












HBase

HBase is a distributed column-oriented database built on top of HDFSHBase is the Hadoop application to use when you require real-time read/write random-access to very large datasets.



Zookeeper

  ZooKeeper allows distributed processes to coordinate with each other

Links



EmoticonEmoticon