How to...UNION ALL of two SELECTs over different data sources in parallel?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to...UNION ALL of two SELECTs over different data sources in parallel?

Jacek Laskowski
Hi,

I've been trying to find out the answer to the question about UNION ALL and SELECTs @ https://stackoverflow.com/q/47837955/1305344

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT [...], will the two SELECT statements be executed in parallel? In my specific use case the two SELECTs are querying two different database tables. In contrast to what I would have expected, the Spark UI seems to suggest that the two SELECT statements are performed sequentially.

How to know if the two separate SELECTs are executed in parallel or not? What are the tools to know it?

My answer was to use explain operator that would show...well...physical plan, but am not sure how to read it to know whether a query plan is going to be executed in parallel or not.

I then used the underlying RDD lineage (using rdd.toDebugString) hoping that gives me the answer, but...I'm not so sure.

For a query like the following:

val q = spark.range(1).union(spark.range(2))

I thought that since both SELECTs are codegen'ed they could be executed in parallel, but when switched to the RDD lineage I lost my confidence given there's just one single stage (!)

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at <console>:26 []
 |   MapPartitionsRDD[16] at rdd at <console>:26 []
 |   UnionRDD[15] at rdd at <console>:26 []
 |   MapPartitionsRDD[11] at rdd at <console>:26 []
 |   MapPartitionsRDD[10] at rdd at <console>:26 []
 |   ParallelCollectionRDD[9] at rdd at <console>:26 []
 |   MapPartitionsRDD[14] at rdd at <console>:26 []
 |   MapPartitionsRDD[13] at rdd at <console>:26 []
 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

What am I missing and how to be certain whether and what parts of a query are going to be executed in parallel?

Please help...

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Reply | Threaded
Open this post in threaded view
|

Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

Silvio Fiorito

Hi Jacek,

 

Just replied to the SO thread as well, but…

 

Yes, your first statement is correct. The DFs in the union are read in the same stage, so in your example where each DF has 8 partitions then you have a stage with 16 tasks to read the 2 DFs. There's no need to define the DF in a separate thread. You can verify this also in the Stage UI and looking at the Event Timeline. You should see the tasks across the DFs executing in parallel as expected.

 

Here’s the UI for the following example, in which case each DF only has 1 partition (so we get a stage with 2 tasks):

 

spark.range(1, 100, 1, 1).write.save("/tmp/df1")

spark.range(101, 200, 1, 1).write.save("/tmp/df2")

 

spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach { _ => }

 

 

From: Jacek Laskowski <[hidden email]>
Date: Saturday, December 16, 2017 at 6:40 AM
To: "user @spark" <[hidden email]>
Subject: How to...UNION ALL of two SELECTs over different data sources in parallel?

 

Hi,

 

I've been trying to find out the answer to the question about UNION ALL and SELECTs @ https://stackoverflow.com/q/47837955/1305344

 

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT [...], will the two SELECT statements be executed in parallel? In my specific use case the two SELECTs are querying two different database tables. In contrast to what I would have expected, the Spark UI seems to suggest that the two SELECT statements are performed sequentially.

 

How to know if the two separate SELECTs are executed in parallel or not? What are the tools to know it?

 

My answer was to use explain operator that would show...well...physical plan, but am not sure how to read it to know whether a query plan is going to be executed in parallel or not.

 

I then used the underlying RDD lineage (using rdd.toDebugString) hoping that gives me the answer, but...I'm not so sure.

 

For a query like the following:

 

val q = spark.range(1).union(spark.range(2))

 

I thought that since both SELECTs are codegen'ed they could be executed in parallel, but when switched to the RDD lineage I lost my confidence given there's just one single stage (!)

 

scala> q.rdd.toDebugString

res4: String =

(16) MapPartitionsRDD[17] at rdd at <console>:26 []

 |   MapPartitionsRDD[16] at rdd at <console>:26 []

 |   UnionRDD[15] at rdd at <console>:26 []

 |   MapPartitionsRDD[11] at rdd at <console>:26 []

 |   MapPartitionsRDD[10] at rdd at <console>:26 []

 |   ParallelCollectionRDD[9] at rdd at <console>:26 []

 |   MapPartitionsRDD[14] at rdd at <console>:26 []

 |   MapPartitionsRDD[13] at rdd at <console>:26 []

 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

 

What am I missing and how to be certain whether and what parts of a query are going to be executed in parallel?

 

Please help...

 

Pozdrawiam,

Jacek Laskowski

----

Spark Structured Streaming https://bit.ly/spark-structured-streaming

Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

Reply | Threaded
Open this post in threaded view
|

Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

Jacek Laskowski
Thanks Silvio!

In the meantime, with help of Adam and code review of WholeStageCodegenExec and CollapseCodegenStages, I found out that anything that's codegend is as fast as the tasks in a stage. In this case, union of two codegend subtrees is indeed parallel.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Sat, Dec 16, 2017 at 7:12 PM, Silvio Fiorito <[hidden email]> wrote:

Hi Jacek,

 

Just replied to the SO thread as well, but…

 

Yes, your first statement is correct. The DFs in the union are read in the same stage, so in your example where each DF has 8 partitions then you have a stage with 16 tasks to read the 2 DFs. There's no need to define the DF in a separate thread. You can verify this also in the Stage UI and looking at the Event Timeline. You should see the tasks across the DFs executing in parallel as expected.

 

Here’s the UI for the following example, in which case each DF only has 1 partition (so we get a stage with 2 tasks):

 

spark.range(1, 100, 1, 1).write.save("/tmp/df1")

spark.range(101, 200, 1, 1).write.save("/tmp/df2")

 

spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach { _ => }

 

 

From: Jacek Laskowski <[hidden email]>
Date: Saturday, December 16, 2017 at 6:40 AM
To: "user @spark" <[hidden email]>
Subject: How to...UNION ALL of two SELECTs over different data sources in parallel?

 

Hi,

 

I've been trying to find out the answer to the question about UNION ALL and SELECTs @ https://stackoverflow.com/q/47837955/1305344

 

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT [...], will the two SELECT statements be executed in parallel? In my specific use case the two SELECTs are querying two different database tables. In contrast to what I would have expected, the Spark UI seems to suggest that the two SELECT statements are performed sequentially.

 

How to know if the two separate SELECTs are executed in parallel or not? What are the tools to know it?

 

My answer was to use explain operator that would show...well...physical plan, but am not sure how to read it to know whether a query plan is going to be executed in parallel or not.

 

I then used the underlying RDD lineage (using rdd.toDebugString) hoping that gives me the answer, but...I'm not so sure.

 

For a query like the following:

 

val q = spark.range(1).union(spark.range(2))

 

I thought that since both SELECTs are codegen'ed they could be executed in parallel, but when switched to the RDD lineage I lost my confidence given there's just one single stage (!)

 

scala> q.rdd.toDebugString

res4: String =

(16) MapPartitionsRDD[17] at rdd at <console>:26 []

 |   MapPartitionsRDD[16] at rdd at <console>:26 []

 |   UnionRDD[15] at rdd at <console>:26 []

 |   MapPartitionsRDD[11] at rdd at <console>:26 []

 |   MapPartitionsRDD[10] at rdd at <console>:26 []

 |   ParallelCollectionRDD[9] at rdd at <console>:26 []

 |   MapPartitionsRDD[14] at rdd at <console>:26 []

 |   MapPartitionsRDD[13] at rdd at <console>:26 []

 |   ParallelCollectionRDD[12] at rdd at <console>:26 []

 

What am I missing and how to be certain whether and what parts of a query are going to be executed in parallel?

 

Please help...

 

Pozdrawiam,

Jacek Laskowski

----

Spark Structured Streaming https://bit.ly/spark-structured-streaming

Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark