Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner. Similar to HDFS, Hadoop MapReduce can also be executed even in commodity hardware, and assumes that nodes can fail anytime and still process the job. MapReduce can process a large volume of data in parallel, by dividing a task into independent sub-tasks. Similar to HDFS, MapReduce also has a master-slave architecture.
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system.
The input and output, even the intermediary output in a MapReduce job, are in the form of
<key, value> pair. Key and Value have to be serializable and do not use the Java serialization package, but have an interface, which has to be implemented, and which can be efficiently serialized, as the data process has to move from one node to another.
Steps in MapReduce Applications
In big data, you want to break a large data set into many smaller pieces and process them in parallel with the same algorithm. With the HDFS, the files are already divided into bite-sized pieces. MapReduce helps you in processing all the pieces of this data.
MapReduce jobs are complex and involve multiple steps; some steps are performed by Hadoop with default behavior and can be overridden if needed. The following are the mandatory steps performed in MapReduce in sequence:
In MapReduce, Mapper code should have a logic, which can be independent of other block data. Mapper logic should leverage all the parallel steps possible in the algorithm. Input to Mapper is set in the Driver program of a particular InputFormat type and file(s) on which the Mapper process has to run. The output of Mapper will be a map
<key, value>, key and value set in Mapper output is not saved in HDFS, but an intermediate file is created in the OS space path and that file is read and shuffle and sorting takes place.
Shuffle and sorting (Combine)
Shuffle and sort are intermediate steps in MapReduce between Mapper and Reducer, which is handled by Hadoop and can be overridden if required. The Shuffle process aggregates all the Mapper output by grouping key values of the Mapper output and the value will be appended in a list of values. So, the Shuffle output format will be a map
<key, List<list of values>>.
Reducer is the aggregator process where data after shuffle and sort, is sent to Reducer where we have
<key, List<list of values>>, and Reducer will process on the list of values. Each key could be sent to a different Reducer. Reducer can set the value, and that will be consolidated in the final output of a MapReduce job and the value will be saved in HDFS as the final output.
Let’s see all the action inform of a sequence diagram. This will help in visualizing the whole process much better.
Let’s see an example of above steps. Consider we have one file splitted into two nodes and content of files are:
- file1 content : “hello world hello moon”
- file2 content : “goodbye world goodnight moon”
Now the result of MapReduce after each step will be as follows:
Writing MapReduce Applications in Java
To use MapReduce on big data stored in HDFS, you must provide below inputs:
- Specify input/output locations
- Supply map and reduce functions via implementations of appropriate interfaces and/or abstract classes
Map and Reduce functions must operate with Serializable key and values because they will be moving from/to different nodes.
For serialization, Hadoop uses the following two interfaces:
Writable interface (for values)
Writableinterface is used for values for serialization and deserialization. Some of the classes that implement the
ByteWritableetc. We can create our own custom
Writableclass that can be used in MapReduce. For creating custom class, we have to implement the
Writableclass and implement the following two methods:
void write (DataOutput out): This serializes the object.
void readFields (DataInput in): This reads the input stream and converts it to an object.
WritableComparable interface (for key)
WritableComparableis used for keys, which is inherited from the
Writableinterface and implements a comparable interface to provide comparison of value Objects. Some of the implementations are
ByteWritableetc. For creating custom
WritableComparableclass, we have to implement
WritableComparableclass and implement the following three methods:
void write (DataPutput out): This serializes the object
void readFields (DataInput in): This reads the input stream and converts it to an object
Int compareTo (Object obj): Compare the values required to sort the key
After you have above inputs combined in a MapReduce job, you can submit it to JobTracker.
JobTracker and TaskTracker Processes
To perform all things discussed above, MapReduce has the following two daemon processes:
JobTracker (Master process)
The primary functions of JobTracker are resource management, tracking resource availability, and task process cycle. JobTracker identifies the TaskTracker to perform certain tasks and monitors the progress and status of a task. JobTracker is a single point of failure for the MapReduce process.
TaskTracker (Slave process)
TaskTracker is the slave daemon process that performs a task assigned by JobTracker. TaskTracker sends heartbeat messages to JobTracker periodically to notify about the free slots and sends the status to JobTracker about the task and checks if any task has to be performed.
MapReduce jobs are broken into multiple Mapper and Reducer processes, and some intermediate tasks, so that a job can produce hundreds or thousands of tasks, and some tasks or nodes can take a long time to complete a task. Hadoop monitors and detects when a task is running slower than expectation, and if the node has a history of performing the task slower, then it starts the same task in another node as a backup, and this is called as speculative execution of tasks. Hadoop doesn’t try to fix or diagnose the node or process, since the process is not giving an error, but it is slow, and slowness can occur because of hardware degradation, software misconfiguration, network congestion, and so on.
Once the slow performing task has been marked, JobTracker initiates the task in a different node and takes the result of the task that completes first and kills the other tasks and makes a note of the situation. If a node is consistently lagging behind, then JobTracker gives less preference to that node.
Speculative execution can be enabled or disabled, and by default it is turned on, as it is a useful process. Speculative execution has to monitor every task in some cases can affect the performance and resources. Speculation Execution is not advised in jobs where a task especially reducer can get millions of values due to skewness in data on a specific reducer which will take longer time than other tasks and starting another task will not help.
- You cannot control the order in which the maps or reductions are run.
- For maximum parallelism, you need Maps and Reduces to not depend on data generated in the same MapReduce job (i.e. both should be stateless).
- Reduce operations do not take place until all Maps are complete.
- General assumption that the output of Reduce is smaller than the input to Map.
- Auguably less performance because a database with an index will always be faster than a MapReduce job on unindexed data.
Happy Learning !!