Spark Dataset API for secondary sorting

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Spark Dataset API for secondary sorting

Daniel Zhang
Hi, Spark Users:

I have a question related to the way I use the spark Dataset API for my case.

If the "ds_old" dataset is having 100 records, with 10 unique $"col1", and for the following pseudo-code:
val ds_new = ds_old.repartition(5, $"col1").sortWithinPartitions($"col2").mapPartitions(new MergeFuc)

class MergeFun extends MapPartitionsFunction[InputCaseClass, OutputCaseClass] {
  override def call(input: util.Iterator[InputCaseClass]): util.Iterator[OutputCaseClass] = {}

I have some questions related to "partition" defined in the above API, and below is my understanding:

1) repartition(5, $"col1") means distributing all 100 records based on 10 unique col1 values to 5 partitions. There is no guarantee each of these 5 partitions will have how many/which unique col1 value, but in a well-balanced hash algorithm, each partition will have close to the average count (10/5 = 2) for a large unique count of values.  
2) sortWithPartitions($"col2) is one of the parts I want to clear out here. What is exactly the sortWithPartitions meaning here? I want the data sorted by "col2" within each unique value of "col1" here, but the Spark API uses the "partition" term so much in this case. I DON'T WANT the 100 records sorted within each of the 5 partitions, but within each unique of "col1". I believe this assumption is right, as we use "repartition" with "col1" first. Please confirm this.
3) mapPartitions(new MergeFuc) is another part I want to clear out. I originally assumed that my merge function will be called/invoked per unique col1 value (in this case we have 10 partitions). But after the test, I found out that indeed it is called ONCE per partition of the 5 partitions. So in this sense, the partition meaning in this API (mapPartitions) IS DIFFERENT as the partition meaning defined in "sortWithPartitions", correct? Or my understanding of "partition" in sortWithPartitions is also WRONG?

In summary, here are my questions:
1) We don't want to use "aggregation" API is due to that in my case, some unique value of "col1" COULD contain a big number of records, and sorting the data in a specified order per col1 helps our business case for the merge logic a lot.
2) We don't want to use "window" function, as the merge logic is indeed an aggregation logic. There will be only one record output as per grouping (col1). So even "window" function comes with sorting, but it doesn't fit in this case.
3) The unique value count of "col1" is unpredictable for spark, I understand that. But I wonder if there is an API that can be used to be called per grouping (per col1), instead of per partition (as defined as 5 partitions in this case).
4) If such API doesn't exist, and we have to use MapPartitionsFunction (The Iterator is much preferred here, as we don't need to worry OOM due to data skew), my following question is if Spark guarantees that the data comes within each partition is (col1, col2) order, in the API usage shown above? Or if Spark will delivery the data of each partition, sorted by "col2" for the first unique value of col1; then sorted by "col2" for the second unique value of col1, going forward, etc? 
Another challenge is that if our merge function can expect the data in this order, but have to generate the output per grouping of col1, in an Iterator format, does Spark have an existing example to refer?