The previous article explored how input partitions are defined by Spark. This short article will describes how partitions are defined when Spark needs to Shuffle data
Transformations which require Data Shuffling
Some transformations will require data to be shuffled. Examples of such transformations in Spark are
All of the above method provide overloaded methods to customize the desired number of partitions. The question is, when we use the method versions which do not specify the number of partitions, how many partitions will Spark define by default?
The Default Behavior
By default Spark utilizes the
HashPartitioner which extends the
Partitioner class. The following article demonstrates how you can use a Custom Partitioner.
By default the
Partitioner instance will define the number of partitions as follows
1. First it will check the value of the Spark Configuration Parameter,
spark.default.parallelism. If it is configured it will define the default partitions to be equal to its value
spark.default.parallelism is not set, Spark will define the same number of partitions as existed in the largest upstream RDD. This setting is least likely to cause out-of-memory errors.
You can read more details from the source code for the Partitioner.scala file. The comments from lines 42 through 56 explain the above.
A Simple Example
Consider the simple WordCount program below
JavaSparkContext sc = new JavaSparkContext("local", "sparkwordcount");
JavaRDD<String> rdd = sc.textFile(inputPath);
JavaPairRDD<String, Integer> counts = rdd
.flatMap(x -> Arrays.asList(x.split(" ")))
.mapToPair(x -> new Tuple2<String, Integer>(x, 1
.reduceByKey((x, y) -> x + y);
In the above example, the number of partitions produced for the
reduceByKey will be the same as the number of input partitions created for the
sc.textFile(inputPath) invocation. The previous article defines how the partitions are created for the upstream RDD. The upstream RDD is produced by the call
Of course, the actual execution only happens when an action method is invoked which in our case is
counts.saveAsTextFile(outputPath). But when Spark executes the job, it needs to first define the number of input partitions for the
sc.textFile(inputPath). This decision making cannot be deferred until later. As we saw in the previous article , Spark has the information it needs to define the number of input partitions.
This brief article explains how Spark defines the default number of Shuffle partitions for transformations that need Data Shuffling by key.
By Sameer Wadkar - Big Data Architect/Programmer, Author and Open Source Contributor