I’m benchmarking our hardware and Spark deployment by repartitioning large datasets. We recently corrected a misconfigured aggregate network link (bonded) that was causing fatal network timeouts in long-running jobs. Now that it’s fixed,
we still observe less-than-desirable performance while simply repartitioning a randomly-generated dataset.
My primary question is: What is the low-level behavior of the “Exchange hashpartitioning(…)” operation, with respect to moving data to and from disk/hosts?
I used AWS EMR to set up a similar Spark cluster and run the exact same job, under nearly identical conditions. Our cluster performs the first stage nearly 2x faster than on the AWS cluster, but takes over 2x as long to complete the second
stage (Explicit partitioning appears to always require two stages). I want to better understand the Exchange operation in order to describe this performance discrepancy, and hopefully find a correlation to our cluster’s resource limitations (e.g. disk I/O
or IOPS capability).
I’ve already benchmarked our system resources and compared to AWS, so I can make some assumptions. I have investigated the Exchange (and some related) source code but it’s not clear to me what actually occurs with respect to I/O. It seems
to me that the first stage is basically a scan, and is very fast because it’s only really limited by sequential disk I/O speed. The second stage does not appear to stress any resource on the cluster, but yet can take 10x as long to complete as the first stage…
Finally, the only hint that something might be “wrong” is a proportionally high “Shuffle Read Blocked Time” for each task during the second stage (90% of task duration).
If I’m not mistaken, my assessment thus far can be applied to shuffles in general, since they often require repartitioning.
Current configuration, although the AWS EMR comparison used a significantly reduced set of Executors:
5 hosts, 5x 10 TB disks each
54 executors, 5 vcores and 23 GB each
50+ billion records of form Record(a: Long, b: Long, c: Long, d: Long), where ‘a’-‘d’ are randomly-generated values. 1.5+ TB total size, Parquet format
The job is as simple as `spark.read.parquet(“input.dat”).repartition(N, “a”, “b”, “c”, “d”).write.parquet(“output.dat”), where N is roughly (input_data_size / 128 MB).