Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

dineshdharme
I am trying to do few (union + reduceByKey) operations on a hiearchical
dataset in a iterative fashion in rdd. The first few loops run fine but on
the subsequent loops, the operations ends up using the whole scratch space
provided to it.

I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
having 100 GB space.
The heirarchical dataset, whose size is (< 400kB), remains constant
throughout the iterations.
I have tried the worker cleanup flag but it has no effect i.e.
"spark.worker.cleanup.enabled=true"

 

Error :
Caused by: java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 

What I am trying to do (High Level):

I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
Child22 ) which are related in a hierarchical fashion as shown below.

Parent-> Child1 -> Child2  -> Child21

Parent-> Child1 -> Child2  -> Child22

Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
cat2, num1, num2,....., num10)

I am trying to aggregate the values of one column of Child21 into Child1
(i.e. 2 levels up). I am doing the same for another column value of Child22
into Child1. Then I am merging these aggregated values at the same Child1
level.

This is present in the code at location :

spark.rddexample.dummyrdd.tree.child1.events.Function1
 

Code which replicates the issue:

1] https://github.com/dineshdharme/SparkRddShuffleIssue

 

Steps to reproduce the issue :

1] Clone the above repository.

2] Put the csvs in the "issue-data" folder in the above repository at a
hadoop location "hdfs:///tree/dummy/data/"

3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
large space. (> 100 GB)

4] Run "sbt assembly"

5] Run the following command at the project location

/path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
--class spark.rddexample.dummyrdd.FunctionExecutor \
--master local[2] \
--deploy-mode client \
--executor-memory 2G \
--driver-memory 2G \
target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
20 \
hdfs:///tree/dummy/data/ \
hdfs:///tree/dummy/results/  



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

Vadim Semenov-2
`spark.worker.cleanup.enabled=true` doesn't work for YARN.
On Fri, Jul 27, 2018 at 8:52 AM dineshdharme <[hidden email]> wrote:

>
> I am trying to do few (union + reduceByKey) operations on a hiearchical
> dataset in a iterative fashion in rdd. The first few loops run fine but on
> the subsequent loops, the operations ends up using the whole scratch space
> provided to it.
>
> I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> having 100 GB space.
> The heirarchical dataset, whose size is (< 400kB), remains constant
> throughout the iterations.
> I have tried the worker cleanup flag but it has no effect i.e.
> "spark.worker.cleanup.enabled=true"
>
>
>
> Error :
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> What I am trying to do (High Level):
>
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> Child22 ) which are related in a hierarchical fashion as shown below.
>
> Parent-> Child1 -> Child2  -> Child21
>
> Parent-> Child1 -> Child2  -> Child22
>
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
> cat2, num1, num2,....., num10)
>
> I am trying to aggregate the values of one column of Child21 into Child1
> (i.e. 2 levels up). I am doing the same for another column value of Child22
> into Child1. Then I am merging these aggregated values at the same Child1
> level.
>
> This is present in the code at location :
>
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>
>
> Code which replicates the issue:
>
> 1] https://github.com/dineshdharme/SparkRddShuffleIssue
>
>
>
> Steps to reproduce the issue :
>
> 1] Clone the above repository.
>
> 2] Put the csvs in the "issue-data" folder in the above repository at a
> hadoop location "hdfs:///tree/dummy/data/"
>
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
> large space. (> 100 GB)
>
> 4] Run "sbt assembly"
>
> 5] Run the following command at the project location
>
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> --class spark.rddexample.dummyrdd.FunctionExecutor \
> --master local[2] \
> --deploy-mode client \
> --executor-memory 2G \
> --driver-memory 2G \
> target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> 20 \
> hdfs:///tree/dummy/data/ \
> hdfs:///tree/dummy/results/
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


--
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

dineshdharme
Yeah, you are right. I ran the experiments locally not on YARN.

On Fri, Jul 27, 2018 at 11:54 PM, Vadim Semenov <[hidden email]> wrote:
`spark.worker.cleanup.enabled=true` doesn't work for YARN.
On Fri, Jul 27, 2018 at 8:52 AM dineshdharme <[hidden email]> wrote:
>
> I am trying to do few (union + reduceByKey) operations on a hiearchical
> dataset in a iterative fashion in rdd. The first few loops run fine but on
> the subsequent loops, the operations ends up using the whole scratch space
> provided to it.
>
> I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> having 100 GB space.
> The heirarchical dataset, whose size is (< 400kB), remains constant
> throughout the iterations.
> I have tried the worker cleanup flag but it has no effect i.e.
> "spark.worker.cleanup.enabled=true"
>
>
>
> Error :
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> What I am trying to do (High Level):
>
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> Child22 ) which are related in a hierarchical fashion as shown below.
>
> Parent-> Child1 -> Child2  -> Child21
>
> Parent-> Child1 -> Child2  -> Child22
>
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
> cat2, num1, num2,....., num10)
>
> I am trying to aggregate the values of one column of Child21 into Child1
> (i.e. 2 levels up). I am doing the same for another column value of Child22
> into Child1. Then I am merging these aggregated values at the same Child1
> level.
>
> This is present in the code at location :
>
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>
>
> Code which replicates the issue:
>
> 1] https://github.com/dineshdharme/SparkRddShuffleIssue
>
>
>
> Steps to reproduce the issue :
>
> 1] Clone the above repository.
>
> 2] Put the csvs in the "issue-data" folder in the above repository at a
> hadoop location "hdfs:///tree/dummy/data/"
>
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
> large space. (> 100 GB)
>
> 4] Run "sbt assembly"
>
> 5] Run the following command at the project location
>
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> --class spark.rddexample.dummyrdd.FunctionExecutor \
> --master local[2] \
> --deploy-mode client \
> --executor-memory 2G \
> --driver-memory 2G \
> target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> 20 \
> hdfs:///tree/dummy/data/ \
> hdfs:///tree/dummy/results/
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


--
Sent from my iPhone