Hadoop Distributed Filesystem
When a dataset outgrows the storage capacity of a single physical machine, it becomes necessary to partition it across a number of separate machines.
Filesystems that manage the storage across a network of machines are called distributed filesystems.
Since they are network-based, all the complications of network programming kick in, thus making distributed filesystems more complex than regular disk filesystems.
For example,
one of the biggest challenges is making the
filesystem tolerate node failure without suffering data loss.
Hadoop comes with a distributed filesystem called HDFS, which stands for Hadoop distributed
filesystem.
“Very large” in this context means files that are
hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop
clusters running today
that store petabytes of data.
Streaming data access
HDFS is built
around the idea
that the most
efficient data processing pattern is a
write-once, read-many-times pattern. A dataset is typically generated or copied from source,
then various analyses are performed on that dataset
over time.
Commodity hardware
Hadoop doesn’t require expensive, highly reliable hardware
to run on. It’s designed to run on clusters of commodity hardware for which the
chance of node failure across the cluster is high, at least for large clusters.
HDFS is designed to carry on working without a noticeable interruption to the
user in the face of such failure.
Where HDFS is not a good fit
Low-latency data
access
Applications that require low-latency
access to data, in the tens of milliseconds range, will not work well with
HDFS. Remember, HDFS is optimized
for delivering a high throughput of data, and this may be at the expense of latency. Hbase is currently a better
choice for low-latency access.
Lots of small files
Since the namenode holds filesystem metadata in memory, the limit to the
number of files in a filesystem is governed by the amount of memory on the
namenode. As a rule of thumb, each file, directory, and block takes about 150 bytes. So, for example, if you had one million files,
each taking one block, you would need at least 300 MB of memory. While storing millions of files is
feasible, billions is beyond the capability of current hardware.
Multiple writers, arbitrary file modifications
Files in HDFS may be
written to by a single writer. Writes are always made at the end of the file. There is no support for
multiple writers, or for modifications at arbitrary offsets in the file.
Why Is a Block in HDFS So Large?
HDFS blocks are large compared to disk blocks,
and the reason
is to minimize the cost of
seeks.
By making a block large enough, the time to transfer the
data from the disk can be made to be significantly larger than the time to seek
to the start of the block.
Thus the time
to transfer a large
file made of multiple blocks
operates at the
disk transfer
rate.
A quick calculation shows that if the seek time is around 10 ms, and the transfer rate is 100
MB/s, then to make the seek time 1%
of the transfer time, we need to make the
block size around 100 MB.
The default is
actually 64 MB, although many HDFS installations use 128 MB blocks.
Advantage of HDFS?
“Moving
Computation is Cheaper than Moving Data”
A
computation requested by an application is much more efficient if it
is executed near the data it operates
on. This is especially true when the size of the data set is huge.
The
assumption is that it is often better
to migrate the computation
closer to where the data is located rather than moving the data to where the
application is running.
HDFS provides interfaces for applications to move
themselves closer to where the data
is located.
HDFS Architecture
HDFS has a master/slave
architecture.
An HDFS cluster
consists of a single NameNode,
a master server that
manages the file system namespace and regulates access to files by clients.
It
also consist of secondary namenode. It updates the namespace
image with datalog.
A Client Reading data from HDFC
The client opens the file it wishes to read by calling open()
on the FileSystem object, which
for HDFS is an instance of DFS.
DFS calls the namenode, using RPC, to determine the locations of the
blocks for the first few blocks in the file.
For each block, the namenode returns the addresses of the datanodes that have a copy of that block and the datanodes are sorted
according to their proximity to the client
The
DFS returns a FSDataInputStream (an input stream
that supports file seeks)
to the client for it to read data from.
DFSStream connects to the first (closest)
datanode for the first block in the file.
Data is streamed from the datanode
back to the client, which
calls read() repeatedly on the stream
When the end of the block
is reached, DFSInputStream will close the connection to the datanode, then find the best datanode
for the next block
When the client has finished reading,
it calls close()
on the FSDataInputStream
DFS makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks
associated with it.
The namenode performs various checks
to make sure the file doesn’t already exist, and that the client has the
right permissions to create the file.
If
these checks pass,
the namenode makes a record
of the new file; otherwise,
file creation fails and the client is thrown an IOException.
The
DFS returns an FSDataOutputStream for the client
to start writing data to.
As
the client writes
data, DFSOutputStream splits
it into packets,
which it writes
to an internal queue, called
the data queue.
The data
queue is consumed by the DataStreamer, whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas.
The
list of datanodes
forms a pipeline,
we’ll assume the replication
level is three, so there are three nodes
in the pipeline.
The DataStreamer streams the packets to the first datanode in the
pipeline, which stores the packet and
forwards it to the second datanode in the pipeline. Similarly, the second
datanode stores the packet and forwards it to the third (and last) datanode
in the pipeline
DFSOutputStream also maintains an
internal queue of packets that are waiting to be acknowledged by datanodes, called
the ack queue. A packet is removed from the
ackqueue only when it has been acknowledged by all the datanodes in the pipeline
What if a datanode fails while data is being written to it ?
First
the pipeline is closed
Any
packets in the ackqueue
are added to the front of the data queue
so that datanodes that are downstream
from the failed node will not miss any packets.
The current block on the good datanodes is given a new identity,
which is communicated to the namenode, so that the partial block on
the failed datanode will be deleted if the failed datanode recovers later on.
The failed
datanode is removed
from the pipeline
and the remainder of the block’s
data is written to the two good datanodes in the pipeline.
The
namenode notices that the block is under-replicated, and it
arranges for a further replica
to be created on another
node
Replication
Comparison with Other Systems
Why can’t we use
databases with lots of disks to do large- scale batch analysis? Why is
MapReduce needed?
The answer to these questions comes from another trend in disk drives:
seek time is improving more slowly than transfer rate.
Seeking is the
process of moving
the disk’s head to a particular place
on the disk to read or write data.
It characterizes the latency of a disk operation, whereas
the transfer rate corresponds
to a disk’s bandwidth.
On the other hand, for updating a small proportion of records in a database, a traditional B-Tree (the data structure used in relational databases, which is limited
by the rate it can perform seeks)
works well.
For updating the majority of a database, a B-Tree
is less efficient than MapReduce, which
uses Sort/Merge to rebuild the database.
Example 1: Problem Statement
What’s the highest recorded global temperature for each year in the dataset?
Example 2: Word Count
For the given sample input the first
map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
After applying Combiner and Sorting
The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>
The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
Final output of reducer:
Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
MapReduce Logical Data Flow
Writing MapReduce Program
We need three
things: a map function, a reduce function, and some code to run the
job.For the
present example, the input key is a
long integer offset, the input value is a line of text,
the output key is a year,
and the output value is an air temperature (an integer).
The map function
is represented by the Mapper
class, which declares an abstract map() method.
The Mapper class is a generic type, with four formal type
parameters that specify the input key,
input value, output key, and output value types of
the map function.
The map() method
also provides an instance of Context to write
the output to. In this case,
we write the year as a Text object (since we are just
using it as a key), and the
temperature is wrapped in an IntWritable.
Writing MapReduce Program
The reduce function
is represented by the Reducer class,
which declares an abstract reduce()
method.
Again, four formal type parameters are used to specify
the input and output types, this time for the reduce function.
The input types of the reduce function
must match the output types of
the map function: Text and IntWritable. And in this
case, the output types of the reduce function
are Text and IntWritable, for a year and its maximum temperature, which we
find by iterating through the temperatures and comparing each with a record of the highest found so far.
Data Locality Optimization
Hadoop does its best to run the map task on a node where the input
data resides in HDFS. This is called
the data locality optimization.
Data Flow
A MapReduce job
is a unit of work that
the client wants
to be performed, it consists of
q The input data
q The MapReduce program
q Configuration information
Hadoop runs the job by dividing it into tasks
q map tasks
q reduce tasks
Nodes that control the job Execution Process
The
jobtracker
coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers.
Tasktrackers run tasks and send progress reports
to the jobtracker, which keeps a record of the overall
progress of each job.
If
a tasks fails,
the jobtracker can reschedule it on a different
tasktracker.
Inputsplits
Hadoop divides the input
to a MapReduce job into fixed-size pieces called inputsplits, or just splits.
Hadoop creates one map task for each split, which runs the
userdefined map function
for each record in the split.
What is the Optimal inputsplit size
optimal split size == block size
If the split spanned two blocks, it
would be unlikely that any HDFS node
stored both blocks, so some of the split would have to be transferred across
the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
Map tasks write their output to local disk, not to HDFS. Why?
Map output is intermediate output:
it’s processed by reduce tasks to
produce the final output, and once the job is complete the map output can be
thrown away. So storing it in HDFS,
with replication, would be overkill.
Reduce Task
Reduce tasks don’t have the advantage of data locality,
the input to a single reduce task is normally the output from all mappers.
MapReduce data flow with a single reduce task
Multiple reduce task
When there are multiple reducers, the map tasks partition their output, each creating one
partition for each reduce task.
There can be many keys (and their associated values)in each
partition, but the records for every
key are all in a single partition.
The partitioning can be controlled
by a user-defined partitioning function, but normally the default
partitioner—which buckets keys using a hash function—works very well.
MapReduce data flow with multiple reduce task
How Many reduce Tasks?
The number
of reduce tasks
is not governed by the size of the input, but is specified independently.
CombinerFunction
Many MapReduce jobs are limited by the bandwidth
available on the cluster
It
pays to minimize
the data transferred between map and reduce tasks.
Hadoop allows the user to specify a combiner function to be run on
the map output
The
output of Combiner function is input to reducer
How Mapreduce Workd?
At higher level there 4 independent
entities
q The client,
which submits the MapReduce job.
q The jobtracker, which coordinates the
job run. The jobtracker is a
Java application whose
main class is JobTracker.q The tasktrackers, which run the tasks that the job has been split
into. Tasktrackers are Java applications whose main class is TaskTracker.q The distributed filesystem (normally HDFS), which
is used for sharing job files between
the other entities.
How Hadoop runs MapReduce Job
Job Submission Process
Asks the jobtracker for a new job ID(step
2) Checks the output
specification of the job
Computes the input splits for the
job. If the splits cannot be computed, because the input paths
don’t exist, for example, then the
job is not submitted and an error is
thrown to the MapReduce program.
Copies the resources
needed to run the job, including the job JAR file, the configuration file and the
computed input splits, to the jobtracker’s filesystem in a directory named
after the job ID.(step 3) The job JAR is
copied with a high replication factor(default mapred.submit.replication 10), so that there
are lots of copies
across the cluster for the tasktrackers to access when they run tasks for the
job
Tells the jobtracker that the job is ready
for execution(by calling
submitJob() on JobTracker)(step 4)
Job initialization
When the JobTracker
receives a call
to its submitJob() method,It
puts it into an internal
queue from where
the job scheduler will pick it up and
initialize it.
Initialization involves
qcreating an object to represent
the
job
being run, which encapsulates its tasks
qbookkeeping information to keep track of the tasks’ status and progress(step 5)
To create the list of tasks to run, the job scheduler
first retrieves the input splits computed by the
JobClient from the shared filesystem (step
6)
It then creates one map task for
each split.
The
number of reduce
tasks to create
is determined by the
mapred.reduce.tasks property in the JobConf
The
scheduler simply creates
this number of reduce
tasks to be run.
Tasks are given
IDs at this
point.
Task Assignment &hearbeat
Tasktrackers
run a simple loop that
periodically sends heartbeat
method calls to the jobtracker.Heartbeats tell the jobtracker that
a tasktracker is alive
As a part of the heartbeat, a
tasktracker will indicate whether it is ready to run a new task, and if it is,
the jobtracker will allocate it a task, which it communicates to the
tasktracker using the heartbeat return value(step 7)
Tasktrackers have a fixed number of slots for map tasks and for reduce
tasks:The
default scheduler fills
empty map task slots before
reduce task slots
If the
tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select
a reduce task.
Task Execution
Once the tasktracker
has been assigned a taskIt
localizes the job JAR by copying
it from the shared filesystem to the
tasktracker’sfilesystem. It also copies any files needed from the distributed cache by the application to the local disk(step 8)
It creates a local working directory for the task, and un-jars the contents of the JAR into this directory.It creates an instance of TaskRunner to run the task.
Why new JVM for each Task ?
Any
bugs in the user-defined map and reduce functions
don’t affect the tasktracker(by causing it to crash or hang)
The child process
communicates with its parent through
the umbilical interface.It
informs the parent of the task’s progress every few seconds
until the task is complete.
User Defined Counters
Hadoop Streaming
Hadoop provides an API
to MapReduce that allows
you to write
your map and reduce
functions in languages other than Java.
Hadoop Streaming uses Unix standard streams as the interface
between Hadoop and your program,
so you can use any language
that can read standard
input and write
to standard output
to write your MapReduce
program.
Ruby, Python…
hadoop jar
$HADOOP_INSTALL/contrib/streaming/hadoop-*- streaming.jar \
-input input/sample.txt \
-output
output \
-mapper
src/main/ruby/mapper.rb \
-reducer
src/main/ruby/reducer.rb
Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce.Unlike Streaming, which uses standard input
and output to communicate with the map and reduce code
Pipes uses sockets as
the channel over which the tasktracker communicates with the process
running the C++ map or reduce
function. JNI is not used.
hadoop fs -put max_temperature bin/max_temperature hadoop fs -put
input/sample.txt sample.txt
hadoop
pipes \
-D hadoop.pipes.java.recordreader=true \
-D
hadoop.pipes.java.recordwriter=true \
-input
sample.txt \
-output
output \
-program bin/max_temperature
Job status
update
HBase
HBase is a distributed column-oriented database built on top of HDFSHBase is the Hadoop application to use when you require real-time read/write random-access to very large datasets.
Links
http://hadoop.apache.org/common/docs/current/hdfs_design.html http://hadoop.apache.org/mapreduce/ http://www.cloudera.com/hadoop/
http://www.cloudera.com/ http://www.cloudera.com/hadoop-training/ http://www.cloudera.com/resources/?type=Training http://blog.adku.com/2011/02/hbase-vs-cassandra.html
EmoticonEmoticon