# Using existing distribution for join when subset of keys Classic List Threaded 4 messages Open this post in threaded view
|

## Using existing distribution for join when subset of keys

 Hey all,I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.I'd like to perform the following join:A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))This outputs the following physical plan:== Physical Plan ==*(6) Project [x#32, z#34, y#33, z#74, y#53]+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0   :  +- Exchange hashpartitioning(x#32, z#34, 200)   :     +- *(3) Project [x#32, y#33, z#34, z#74]   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)   :           :     +- LocalTableScan [x#32, y#33, z#34]   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0   :              +- Exchange hashpartitioning(x#72, y#73, 200)   :                 +- LocalTableScan [x#72, y#73, z#74]   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0      +- Exchange hashpartitioning(x#52, z#54, 200)         +- LocalTableScan [x#52, y#53, z#54]I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:A.repartition(\$"x").join(B.repartition(\$"x"), Seq("x", "y")).join(C.repartition(\$"x"), Seq("x", "z"))*(6) Project [x#32, z#34, y#33, z#74, y#53]+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0   :  +- Exchange hashpartitioning(x#32, z#34, 200)   :     +- *(3) Project [x#32, y#33, z#34, z#74]   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)   :           :     +- Exchange hashpartitioning(x#32, 200)   :           :        +- LocalTableScan [x#32, y#33, z#34]   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0   :              +- Exchange hashpartitioning(x#72, y#73, 200)   :                 +- Exchange hashpartitioning(x#72, 200)   :                    +- LocalTableScan [x#72, y#73, z#74]   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0      +- Exchange hashpartitioning(x#52, z#54, 200)         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?Thanks,Pat
Open this post in threaded view
|

## Re: Using existing distribution for join when subset of keys

 You can use bucketBy to avoid shuffling in your scenario. This test suite has some examples: https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343Thanks,TerryOn Sun, May 31, 2020 at 7:43 AM Patrick Woody <[hidden email]> wrote:Hey all,I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.I'd like to perform the following join:A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))This outputs the following physical plan:== Physical Plan ==*(6) Project [x#32, z#34, y#33, z#74, y#53]+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0   :  +- Exchange hashpartitioning(x#32, z#34, 200)   :     +- *(3) Project [x#32, y#33, z#34, z#74]   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)   :           :     +- LocalTableScan [x#32, y#33, z#34]   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0   :              +- Exchange hashpartitioning(x#72, y#73, 200)   :                 +- LocalTableScan [x#72, y#73, z#74]   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0      +- Exchange hashpartitioning(x#52, z#54, 200)         +- LocalTableScan [x#52, y#53, z#54]I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:A.repartition(\$"x").join(B.repartition(\$"x"), Seq("x", "y")).join(C.repartition(\$"x"), Seq("x", "z"))*(6) Project [x#32, z#34, y#33, z#74, y#53]+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0   :  +- Exchange hashpartitioning(x#32, z#34, 200)   :     +- *(3) Project [x#32, y#33, z#34, z#74]   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)   :           :     +- Exchange hashpartitioning(x#32, 200)   :           :        +- LocalTableScan [x#32, y#33, z#34]   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0   :              +- Exchange hashpartitioning(x#72, y#73, 200)   :                 +- Exchange hashpartitioning(x#72, 200)   :                    +- LocalTableScan [x#72, y#73, z#74]   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0      +- Exchange hashpartitioning(x#52, z#54, 200)         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?Thanks,Pat