Renaming a DataFrame column makes Spark lose partitioning information

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Renaming a DataFrame column makes Spark lose partitioning information

Antoine Wendlinger
Hi,

When renaming a DataFrame column, it looks like Spark is forgetting the partition information:

    Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()


Gives the following plan:

    == Physical Plan ==
    Exchange hashpartitioning(c#40, 10)
    +- *(1) Project [a#36, b#37 AS c#40]
       +- Exchange hashpartitioning(b#37, 10)
          +- LocalTableScan [a#36, b#37]


As you can see, two shuffles are done, but the second one is unnecessary.
Is there a reason I don't know for this behavior ? Is there a way to work around it (other than not renaming my columns) ?

I'm using Spark 2.4.3.


Thanks for your help,

Antoine
Reply | Threaded
Open this post in threaded view
|

Re: Renaming a DataFrame column makes Spark lose partitioning information

imback82
This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:

scala> :paste
// Entering paste mode (ctrl-D to finish)

Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [a#7, b#8 AS c#11]
+- Exchange hashpartitioning(b#8, 200), false, [id=#12]
   +- LocalTableScan [a#7, b#8]

Thanks,
Terry

On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <[hidden email]> wrote:
Hi,

When renaming a DataFrame column, it looks like Spark is forgetting the partition information:

    Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()


Gives the following plan:

    == Physical Plan ==
    Exchange hashpartitioning(c#40, 10)
    +- *(1) Project [a#36, b#37 AS c#40]
       +- Exchange hashpartitioning(b#37, 10)
          +- LocalTableScan [a#36, b#37]


As you can see, two shuffles are done, but the second one is unnecessary.
Is there a reason I don't know for this behavior ? Is there a way to work around it (other than not renaming my columns) ?

I'm using Spark 2.4.3.


Thanks for your help,

Antoine
Reply | Threaded
Open this post in threaded view
|

Re: Renaming a DataFrame column makes Spark lose partitioning information

Antoine Wendlinger
Well that's great ! Thank you very much :)


Antoine

On Tue, Aug 4, 2020 at 11:22 PM Terry Kim <[hidden email]> wrote:
This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:

scala> :paste
// Entering paste mode (ctrl-D to finish)

Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [a#7, b#8 AS c#11]
+- Exchange hashpartitioning(b#8, 200), false, [id=#12]
   +- LocalTableScan [a#7, b#8]

Thanks,
Terry

On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <[hidden email]> wrote:
Hi,

When renaming a DataFrame column, it looks like Spark is forgetting the partition information:

    Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()


Gives the following plan:

    == Physical Plan ==
    Exchange hashpartitioning(c#40, 10)
    +- *(1) Project [a#36, b#37 AS c#40]
       +- Exchange hashpartitioning(b#37, 10)
          +- LocalTableScan [a#36, b#37]


As you can see, two shuffles are done, but the second one is unnecessary.
Is there a reason I don't know for this behavior ? Is there a way to work around it (other than not renaming my columns) ?

I'm using Spark 2.4.3.


Thanks for your help,

Antoine