MapReduce has an inherent and intrinsic limitation built into its architecture which impacts throughput. All new frameworks are trying to address this limitation in some form or the other.
MapReduce Limitations in Hadoop
Some of the characteristics of MapReduce which impose serious throughput constraints in practice are
Starting a MR Job has an overhead of a few 10’s of seconds to a few minutes
A MR jobs can chain Mapper’s to a Reducer to perform post processing on the reducer output. However the Reducer in a MR Job cannot chain to more Reducer’s (plural refers to multiple types of Reducer’s and not the number of reducers). Hadoop supports MR Job’s chaining which addresses this limitation. Unfortunately as we will discuss soon, it introduces new costs to the process.
A MapReduce job writes to distributed storage (usually HDFS and sometimes to HBase). A MR Job cannot begin until its data is fully ready. For example if you know you are expecting a 100 files and 50 are available and the next 50 are being made available, you cannot start of the job with the existing set of 50 files while the remaining become available. This means that in a chained MR jobs scenario each downstream job needs to wait until its upstream job completes successfully and commits it data to the distributed storage
Example Applications where MR Jobs need to be chained
Several types of applications need chained MR Jobs
Graph processing applications- Imagine you are receiving a massive set of discrete events where each event is only aware of its parent event identifier. If you want to obtain connected sub-graphs from this massive event set you would create MapReduce program which connects a set of parent-child events in iteration 1 by performing a join on the event identifier with the parent event identifier. Each subsequent iteration joins the intermediate sub-graphs formed in the prior iteration. For example iteration 1 would produce graphs like 1-2, 2-3, 3-4 and iteration 2 would merge the graphs in iteration 1 as 1-2-3, 2-3-4 and so on.
Machine Learning Algorithm Implementation - Another common example of where Chained MapReduce jobs are used is when you perform Machine Learning using Bayesian techniques. Most of these techniques are iterative where you start with an intial guess for the solution and in each iteration you use optimization techniques to converge to the true solution. Mahout is a Machine Learning library which implements such numerical solutions to Data Science problems. Sometimes it can take a few 100 or a few 1000 iterations to converge to an acceptable (usable from a business perspective) solution. In Mahout each iteration is a separate MapReduce program which uses the results of the previous iteration (MapReduce program) to converge to a better solution. As mentioned earlier MapReduce programs have a significant overhead (10’s of seconds to a 2-3 minutes) of startup time. Imagine having to run 1000 iterations. In worst case scenario you have spent 2000 minutes just waiting for the job to start. This is clearly not acceptable. Most Mahout Implementations only use a few 10’s of iterations. This has an adverse impact on the quality of solution which is often directly proportional to the number of iterations. More iterations produce better results in the ML domain.
Using Hive and Pig - Yet another common (and extremely popular) example of chaining MapReduce jobs is using Hive and Pig. When you submit a query to Hive or a data-flow to Pig the Hive or Pig execution engine comes up with an execution plan. For complex queries and data flows this often involves running chained MapReduce jobs where the results of an upstream job in the chain will be used for downstream jobs until the final result is computed.
One of the biggest limitation of the original MapReduce framework in my opinion is that it does not allow a Reducer to be chained to another Reducer. Simply having this functionality would have removed the requirement to chain MapReduce jobs.
Chained Reducer’s would have helped
Having the ability to chain Reducer classes would have helped address the above challenges to an extent. Imagine if the OutputFormat could optionally serve to a downstream Reducer instance instead of persistent storage like HDFS or HBase. Or MultipleOutput could serve the Mapper/Reducer output to multiple downstream Reducer classes instead of persistent storage.
Some of the benefits which this feature would provide are
Suddenly, you don’t need chained Map Reduce jobs. Output of each Reducer can now serve various Reducer instances. Which means there are no additional startup times costs which get incurred when starting the subsequent MR Job in the chain.
The output of the Reducer can be immediately served to the next set of Reducers. Of course the downstream Reducer’s cannot begin until the Shuffle-Sort process is complete but each type of Reducer can begin as soon as its own set of Shuffle-Sort completed. There is a disk I/O cost associated to writing to local disk as well as the Network I/O of shuffle but it is lower than writing to distributed storage like HDFS before the next round of MR can start.
The up-stream Reducer serves as the Mapper for the downstream Reducer instances. This is a huge saving. In the current Chained MR job scenario each MR Job in the chain has a Mapper as well as a Reducer. Often a Mapper simply reads from the HDFS and passes Key Value pairs to the Reducer. This alone is very expensive and will be avoided as we have skipped the Mapper portion from the chaining in the process of serving the upstream Reducer output the downstream Reducer directly.
Useful as this feature will be, enhancing MapReduce to support it would need a significant overhaul to the underlying framework which is not practical. Consequently new frameworks evolved to address this limitation.
Scratching the Itch - How various frameworks address the challenge
Several solutions have evolved to handle these limitations
Bulk Synchronous Processing framework (Ex. HAMA and Giraph) – These frameworks are most suitable for Graph Processing as well as Iterative Machine Learning scenarios. The idea is to perform local processing in a distributed set of peer processes and exchange messages with other peer processes. After each peer process performs its own processing (in a given iteration) and message passing it informs the master node that it is ready to start the next iteration. However it does not start the next iteration until the Master allows it to begin. Each peer process will synchronize at a common barrier point in this fashion. When the master has received the request form each peer process and ensured that all messages are correctly exchanged between peers it will signal each peer process to start the next iteration. This series of iterations will continue until the peers or the master signals that the required result has been achieved. “Enough” is a custom decision which is implemented by the programmer.
Apache Tachyon – This technology utilizes Off-Heap memory as a proxy for the disk. Data is assumed (from the MapReduce instance perspective) to be persisted to disk (HDFS) when it is written to this memory. The underlying Tachyon framework will write the data from the Off-Heap memory to disk transparently in separate threads. Tachyon maintains lineage information. This comes handy if the write to the disk fails in which case Tachyon can restart jobs. The key point is, downstream MapReduce jobs can start as soon as the data is available in Off-Heap memory. However if the write to disk for a job fails, the job and all the subsequent downstream jobs which have started will restart. The assumption is that this will be rare occurrence and chained MapReduce job set can complete significantly faster as data is being written and read mostly from memory. This method has significant value for processes like Hive and Pig which utilizes a very small number of chained MapReduce jobs.
Apache Spark/Tez – Spark and Tez solve the same problem as Tachyon but in an expanded way. Both utilize heap-memory in concert with disk and both support a DAG (Directed Acyclic Graph) of processes. This is a formal way of saying, it supports chaining of Reducer’s types which we discussed earlier. This handles the limitation of original MapReduce framework of not being able to chain the Reducer to another set if Reducer(s). Spark/Tez expand on the original MapReduce framework in the following ways-
a. Support a DAG of processes instead of having only a single Reducer. This simplifies handling of chained MapReduce jobs for processes like Hive or Pig
b. Utilize Heap-Memory to reduce I/O between various operations (actions in Spark speak).
c. Utilize Off-Heap memory. This is experimental and can be obtained by running Spark on Tachyon. This is extremely valuable for Machine Learning Algorithms where intermediate datasets can be large. Utilizing off-heap memory allows large datasets to be partitioned and maintained in memory
d. Support in-process iterations on the same datasets maintained in memory (with transparent support for spill-over to disk). This is the same benefit provided by frameworks like HAMA. If each iteration requires a different partition of data (ex. Graph Processing) it will involve a Shuffle phase and consequently network I/O. However the most expensive I/O (disk I/O) can be saved if the working data set of each process is small enough to be held in memory (or off-heap memory if used with Tachyon)
MapReduce does not allow chaining of Reducers which necessitates the use of chained MapReduce jobs for any reasonably complex business application. This leads to excess I/O for iterative applications and excessive waiting time between jobs as a downstream job cannot start until the upstream job(s) have completed and committed their changes to HDFS. Several frameworks have evolved to address this limitation.
Sameer Wadkar - Big Data guy, publisher and open source contributor