Shuffle write explosion

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Shuffle write explosion

周浥尘
Hi All,

When running a spark job, I have 100MB+ input and get more than 700GB shuffle write, which is really weird. And this job finally end up with the OOM error. Does anybody know why this happened?
Screen Shot 2018-11-05 at 15.20.35.png
My code is like:
JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class); 
 inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration());

Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5
java version "1.8.0_162"
Spark 2.1.2

Any help is greatly appreciated.

Regards,
Yichen
Reply | Threaded
Open this post in threaded view
|

RE: Shuffle write explosion

Taylor Cox

At first glance, I wonder if your tables are partitioned? There may not be enough parallelism happening. You can also pass in the number of partitions and/or a custom partitioner to help Spark “guess” how to organize the shuffle.

 

Have you seen any of these docs?

https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

https://spark.apache.org/docs/latest/tuning.html

 

Taylor

 

 

From: Yichen Zhou <[hidden email]>
Sent: Sunday, November 4, 2018 11:42 PM
To: [hidden email]
Subject: Shuffle write explosion

 

Hi All,

 

When running a spark job, I have 100MB+ input and get more than 700GB shuffle write, which is really weird. And this job finally end up with the OOM error. Does anybody know why this happened?

Screen Shot 2018-11-05 at 15.20.35.png

My code is like:

JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class); 

 inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration())


Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5

java version "1.8.0_162"

Spark 2.1.2


Any help is greatly appreciated.

 

Regards,

Yichen

Reply | Threaded
Open this post in threaded view
|

Re: Shuffle write explosion

Dillon Dukek
What is your function in mapToPair doing?

-Dillon

On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox <[hidden email]> wrote:

At first glance, I wonder if your tables are partitioned? There may not be enough parallelism happening. You can also pass in the number of partitions and/or a custom partitioner to help Spark “guess” how to organize the shuffle.

 

Have you seen any of these docs?

https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

https://spark.apache.org/docs/latest/tuning.html

 

Taylor

 

 

From: Yichen Zhou <[hidden email]>
Sent: Sunday, November 4, 2018 11:42 PM
To: [hidden email]
Subject: Shuffle write explosion

 

Hi All,

 

When running a spark job, I have 100MB+ input and get more than 700GB shuffle write, which is really weird. And this job finally end up with the OOM error. Does anybody know why this happened?

My code is like:

JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class); 

 inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration())


Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5

java version "1.8.0_162"

Spark 2.1.2


Any help is greatly appreciated.

 

Regards,

Yichen


image002.png (31K) Download Attachment
image002.png (31K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Shuffle write explosion

周浥尘
Hi Dillon,

Thank you for your reply.
mapToPair use a PairFunction to transform format to a particular parquet format. I have tried to replace the mapToPair() function with other action operators like count() or collect(), but it didn't work. So I guess the shuffle write explosion problem have no concern with mapToPair().

Best Regrads,
Yichen

Dillon Dukek <[hidden email]> 于2018年11月6日周二 上午7:21写道:
What is your function in mapToPair doing?

-Dillon

On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox <[hidden email]> wrote:

At first glance, I wonder if your tables are partitioned? There may not be enough parallelism happening. You can also pass in the number of partitions and/or a custom partitioner to help Spark “guess” how to organize the shuffle.

 

Have you seen any of these docs?

https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf

https://spark.apache.org/docs/latest/tuning.html

 

Taylor

 

 

From: Yichen Zhou <[hidden email]>
Sent: Sunday, November 4, 2018 11:42 PM
To: [hidden email]
Subject: Shuffle write explosion

 

Hi All,

 

When running a spark job, I have 100MB+ input and get more than 700GB shuffle write, which is really weird. And this job finally end up with the OOM error. Does anybody know why this happened?

Screen Shot 2018-11-05 at 15.20.35.png

My code is like:

JavaPairRDD<Text, Text> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class); 

 inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration())


Environment:
CPU 32 core; Memory 256G; Storage 7.5G
CentOS 7.5

java version "1.8.0_162"

Spark 2.1.2


Any help is greatly appreciated.

 

Regards,

Yichen