Stalling during large iterative PySpark jobs

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Stalling during large iterative PySpark jobs

Jeremy Freeman
I'm reliably getting a bug in PySpark where jobs with many iterative calculations on cached data stall out.

Data is a folder of ~40 text files, each with 2 mil rows and 360 entries per row, total size is ~250GB.

I'm testing with the KMeans analyses included as examples (though I see the same error on my own iterative algorithms). The scala version completes 50+ iterations fine. In PySpark, it successfully completes 9 iterations, and then stalls. On the driver, I'll get this:

java.net.NoRouteToHostException: Cannot assign requested address
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:579)
        at java.net.Socket.connect(Socket.java:528)
        at java.net.Socket.<init>(Socket.java:425)
        at java.net.Socket.<init>(Socket.java:208)
        at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:328)
        at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:311)
        at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:70)
        at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:253)
        at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:251)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
        at org.apache.spark.Accumulators$.add(Accumulators.scala:251)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:598)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

But the job will continue after delivering this error, appearing to finish the remaining tasks, until it displays:

INFO ClusterScheduler: Remove TaskSet 18.0 from pool

And then just stalls.

The web UI shows a subset of tasks completed; the number of the current task is the number displayed on the driver around the time the error message displayed. I don't see any errors in the stdout or stderr on the worker executing that task, just on the driver. Memory usage on all workers and driver are well below 50%.

Other observations:
- It's data size dependent. If I load ~50 GB, it finishes 20 iterations before stalling. If I load ~10 GB, it finishes 35.
- It's not due to the multiple files; I see the same error on a single large file.
- I always get the error with 30 or 60 nodes, but I don't see it when using 20.
- For a given cluster/data size, it stalls at the same point on every run.

I was going to test all this on EC2, in case it's something specific to our set up (private HPC running Spark in standalone mode, 16 cores and 100 GB used per node). But it'd be great if anyone had ideas in the meantime.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Stalling during large iterative PySpark jobs

Matei Zaharia
Administrator
Hi Jeremy,

If you look at the stdout and stderr files on that worker, do you see any earlier errors? I wonder if one of the Python workers crashed earlier.

It would also be good to run “top” and see if more memory is used during the computation. I guess the cached RDD itself fits in less than 50% of the RAM as you said?

Matei


On Jan 12, 2014, at 8:45 PM, Jeremy Freeman <[hidden email]> wrote:

> I'm reliably getting a bug in PySpark where jobs with many iterative
> calculations on cached data stall out.
>
> Data is a folder of ~40 text files, each with 2 mil rows and 360 entries per
> row, total size is ~250GB.
>
> I'm testing with the KMeans analyses included as examples (though I see the
> same error on my own iterative algorithms). The scala version completes 50+
> iterations fine. In PySpark, it successfully completes 9 iterations, and
> then stalls. On the driver, I'll get this:
>
> java.net.NoRouteToHostException: Cannot assign requested address
>        at java.net.PlainSocketImpl.socketConnect(Native Method)
>        at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
>        at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
>        at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
>        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>        at java.net.Socket.connect(Socket.java:579)
>        at java.net.Socket.connect(Socket.java:528)
>        at java.net.Socket.<init>(Socket.java:425)
>        at java.net.Socket.<init>(Socket.java:208)
>        at
> org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:328)
>        at
> org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:311)
>        at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:70)
>        at
> org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:253)
>        at
> org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:251)
>        at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>        at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>        at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>        at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>        at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>        at org.apache.spark.Accumulators$.add(Accumulators.scala:251)
>        at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:598)
>        at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
>        at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>        at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>
> But the job will continue after delivering this error, appearing to finish
> the remaining tasks, until it displays:
>
> INFO ClusterScheduler: Remove TaskSet 18.0 from pool
>
> And then just stalls.
>
> The web UI shows a subset of tasks completed; the number of the current task
> is the number displayed on the driver around the time the error message
> displayed. I don't see any errors in the stdout or stderr on the worker
> executing that task, just on the driver. Memory usage on all workers and
> driver are well below 50%.
>
> Other observations:
> - It's data size dependent. If I load ~50 GB, it finishes 20 iterations
> before stalling. If I load ~10 GB, it finishes 35.
> - It's not due to the multiple files; I see the same error on a single large
> file.
> - I always get the error with 30 or 60 nodes, but I don't see it when using
> 20.
> - For a given cluster/data size, it stalls at the same point on every run.
>
> I was going to test all this on EC2, in case it's something specific to our
> set up (private HPC running Spark in standalone mode, 16 cores and 100 GB
> used per node). But it'd be great if anyone had ideas in the meantime.
>
> Thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stalling-during-large-iterative-PySpark-jobs-tp492.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Stalling during large iterative PySpark jobs

Jeremy Freeman
Thanks for the thoughts Matei! I poked at this some more. I ran top on each of the workers during the job (I'm testing with the example kmeans), and confirmed that the run dies when memory usage (of the java process) is still around 30%. I do notice it going up, from around 20% after the first iteration, to 30% by the time it dies, so definitely stays under 50%. Also, memory is around 30% when running KMeans in scala, and I never get the error.

I can't find anything suspect in any of the worker logs (I'm looking at stdout and stderr in spark.local.dir). The only error is that one reported to the driver.

Still haven't tried reproducing on EC2, will let you know if I can...

-- Jeremy
Reply | Threaded
Open this post in threaded view
|

Re: Stalling during large iterative PySpark jobs

Matei Zaharia
Administrator
Jeremy, do you happen to have a small test case that reproduces it? Is it with the kmeans example that comes with PySpark?

Matei

On Jan 22, 2014, at 3:03 PM, Jeremy Freeman <[hidden email]> wrote:

> Thanks for the thoughts Matei! I poked at this some more. I ran top on each
> of the workers during the job (I'm testing with the example kmeans), and
> confirmed that the run dies when memory usage (of the java process) is still
> around 30%. I do notice it going up, from around 20% after the first
> iteration, to 30% by the time it dies, so definitely stays under 50%. Also,
> memory is around 30% when running KMeans in scala, and I never get the
> error.
>
> I can't find anything suspect in any of the worker logs (I'm looking at
> stdout and stderr in spark.local.dir). The only error is that one reported
> to the driver.
>
> Still haven't tried reproducing on EC2, will let you know if I can...
>
> -- Jeremy
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stalling-during-large-iterative-PySpark-jobs-tp492p792.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Stalling during large iterative PySpark jobs

Jeremy Freeman
Yup, hitting it with the included PySpark kmeans example (v0.8.1). So the code for reproducing is simple. But note that I only get it with pretty many nodes (in our set up, 30 or more). So you should see it if you run KMeans with that many nodes, on any fairly large data set with many iterations (e.g. 50GB, 20 iterations of kmeans, k=3).

Happy to try anything on our end to help debug...