Hadoop MapReduce is a software framework for easily writing applications that process vast amounts of data (multi-terabyte data sets) in parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner. Similar to HDFS, Hadoop MapReduce can also be executed even in commodity hardware and assumes that nodes can fail anytime and still process the job. MapReduce can process a large volume of data in parallel, by dividing a task into independent sub-tasks. Similar to HDFS, MapReduce also has a master-slave architecture.
A MapReduce job usually splits the input dataset into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a filesystem.
The input and output, even the intermediary output in a MapReduce job, are in the form of <key, value> pair. Key and Value have to be serializable and do not use the Java serialization package, but have an interface, which has to be implemented, and which can be efficiently serialized, as the data processing has to move from one node to another.
1. Steps in MapReduce Applications
In big data, you want to break a large data set into many smaller pieces and process them in parallel with the same algorithm. With HDFS, the files are already divided into bite-sized pieces. MapReduce helps you in processing all the pieces of this data.
MapReduce jobs are complex and involve multiple steps; some steps are performed by Hadoop with default behavior and can be overridden if needed. The following are the mandatory steps performed in MapReduce in sequence:
In MapReduce, the Mapper code should have a logic, which can be independent of other block data. Mapper logic should leverage all the parallel steps possible in the algorithm. Input to Mapper is set in the Driver program of a particular InputFormat type and file(s) on which the Mapper process has to run. The output of Mapper will be a map <key, value>, key and value set in Mapper output are not saved in HDFS, but an intermediate file is created in the OS space path, and that file is read and shuffled, and sorting takes place.
1.2. Shuffle and sorting (Combine)
Shuffle and sort are intermediate steps in MapReduce between Mapper and Reducer, which is handled by Hadoop and can be overridden if required. The Shuffle process aggregates all the Mapper output by grouping key values of the Mapper output and the value will be appended in a list of values. So, the Shuffle output format will be a map
<key, List<list of values>>.
Reducer is the aggregator process where data after shuffle and sorting, is sent to Reducer where we have <key, List<list of values>>, and Reducer will process the list of values. Each key could be sent to a different Reducer. Reducer can set the value, and that will be consolidated in the final output of a MapReduce job and the value will be saved in HDFS as the final output.
Let’s see all the actions in a sequence diagram. This will help in visualizing the whole process much better.
Let’s see an example of the above steps. Consider we have one file split into two nodes and the content of the files are:
- file1 content : “hello world hello moon”
- file2 content : “goodbye world goodnight moon”
Now the result of MapReduce after each step will be as follows:
Applications without a reducer are known as map-only jobs, which can be useful when there’s no need to combine the result sets from the map tasks.
2. Writing MapReduce Applications in Java
To use MapReduce on big data stored in HDFS, you must provide the below inputs:
- Specify input/output locations
- Supply map and reduce functions via implementations of appropriate interfaces and/or abstract classes
Map and Reduce functions must operate with Serializable key and values because they will be moving from/to different nodes.
For serialization, Hadoop uses the following two interfaces:
2.1. Writable Interface (for Values)
Writable interface is used for values for serialization and deserialization. Some of the classes that implement the
Writable interface are
ByteWritable etc. We can create our own custom
Writable class that can be used in MapReduce. To create a custom class, we have to implement the
Writable class and implement the following two methods:
- void write (DataOutput out): This serializes the object.
- void readFields (DataInput in): This reads the input stream and converts it to an object.
2.2. WritableComparable Interface (for Keys)
WritableComparable is used for keys, which are inherited from the
Writable interface and implement a Comparable interface to provide a comparison of value Objects.
Some of the implementations are
ByteWritable etc. For creating custom
WritableComparable class, we have to implement
WritableComparable class and implement the following three methods:
- void write (DataPutput out): This serializes the object
- void readFields (DataInput in): This reads the input stream and converts it to an object
- Int compareTo (Object obj): Compare the values required to sort the key
After you have the above inputs combined in a MapReduce job, you can submit it to JobTracker.
3. JobTracker and TaskTracker Processes
To perform all things discussed above, MapReduce has the following two daemon processes:
3.1. JobTracker (Master Process)
The primary functions of JobTracker are resource management, tracking resource availability, and task process cycle. JobTracker identifies the TaskTracker to perform certain tasks and monitors the progress and status of a task. JobTracker is a single point of failure for the MapReduce process.
3.2. TaskTracker (Slave Process)
TaskTracker is the slave daemon process that performs a task assigned by JobTracker. TaskTracker sends heartbeat messages to JobTracker periodically to notify about the free slots and sends the status to JobTracker about the task and checks if any task has to be performed.
4. Speculative Execution
MapReduce jobs are broken into multiple Mapper and Reducer processes, and some intermediate tasks, so that a job can produce hundreds or thousands of tasks, and some tasks or nodes can take a long time to complete a task. Hadoop monitors and detects when a task is running slower than expected, and if the node has a history of performing the task slower, then it starts the same task in another node as a backup, and this is called the speculative execution of tasks.
Hadoop doesn’t try to fix or diagnose the node or process, since the process is not giving an error, but it is slow, and slowness can occur because of hardware degradation, software misconfiguration, network congestion, and so on.
Once the slow-performing task has been marked, JobTracker initiates the task in a different node and takes the result of the task that completes first and kills the other tasks and makes a note of the situation. If a node is consistently lagging behind, then JobTracker gives less preference to that node.
Speculative execution can be enabled or disabled, and by default, it is turned on, as it is a useful process. Speculative execution has to monitor every task in some cases can affect the performance and resources. Speculation Execution is not advised in jobs where a task especially a reducer can get millions of values due to skewness in data on a specific reducer which will take longer time than other tasks and starting another task will not help.
5. MapReduce Limitations
- We cannot control the order in which the maps or reductions are run.
- For maximum parallelism, you need Maps and Reduces to not depend on data generated in the same MapReduce job (i.e. both should be stateless).
- Reduce operations do not take place until all Maps are complete.
- The general assumption is that the output of Reduce is smaller than the input to Map.
- Arguably less performance because a database with an index will always be faster than a MapReduce job on unindexed data.
Happy Learning !!