Using existing distribution for join when subset of keys

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Using existing distribution for join when subset of keys

Patrick Woody
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x", "y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- Exchange hashpartitioning(x#32, 200)
   :           :        +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- Exchange hashpartitioning(x#72, 200)
   :                    +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.

Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat




Reply | Threaded
Open this post in threaded view
|

Re: Using existing distribution for join when subset of keys

imback82

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody <[hidden email]> wrote:
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x", "y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- Exchange hashpartitioning(x#32, 200)
   :           :        +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- Exchange hashpartitioning(x#72, 200)
   :                    +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.

Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat




Reply | Threaded
Open this post in threaded view
|

Re: Using existing distribution for join when subset of keys

Patrick Woody
Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the bucketing still seems to require the exchange before the join. Both tables below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#29, y#30, 200)
   :     +- *(1) Project [x#29, y#30, z#31]
   :        +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :           +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#35, y#36, 200)
         +- *(3) Project [x#35, y#36, z#37]
            +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
               +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim <[hidden email]> wrote:

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody <[hidden email]> wrote:
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x", "y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- Exchange hashpartitioning(x#32, 200)
   :           :        +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- Exchange hashpartitioning(x#72, 200)
   :                    +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.

Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat




Reply | Threaded
Open this post in threaded view
|

Re: Using existing distribution for join when subset of keys

imback82
Is the following what you trying to do?

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain

I see no exchange:

== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
   :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [x#342, y#343]
   :     +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
   :        +- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
      +- *(2) Project [x#346, y#347]
         +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
            +- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int>, SelectedBucketsCount: 8 out of 8

On Sun, May 31, 2020 at 2:38 PM Patrick Woody <[hidden email]> wrote:
Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the bucketing still seems to require the exchange before the join. Both tables below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#29, y#30, 200)
   :     +- *(1) Project [x#29, y#30, z#31]
   :        +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :           +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#35, y#36, 200)
         +- *(3) Project [x#35, y#36, z#37]
            +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
               +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim <[hidden email]> wrote:

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody <[hidden email]> wrote:
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x", "y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- Exchange hashpartitioning(x#32, 200)
   :           :        +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- Exchange hashpartitioning(x#72, 200)
   :                    +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join.

Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat