java.lang.StackOverflowError when calling count()

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

java.lang.StackOverflowError when calling count()

Guanhua Yan
Dear Sparkers:

I am using Python spark of version 0.9.0 to implement some iterative algorithm. I got some errors shown at the end of this email. It seems that it's due to the Java Stack Overflow error. The same error has been duplicated on a mac desktop and a linux workstation, both running the same version of Spark.

The same line of code works correctly after quite some iterations. At the line of error, rdd__new.count() could be 0. (In some previous rounds, this was also 0 without any problem).

Any thoughts on this?

Thank you very much,
- Guanhua


========================================
CODE:    print "round", round, rdd__new.count() 
========================================
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 542, in count
14/05/12 16:20:28 INFO TaskSetManager: Loss was due to java.lang.StackOverflowError [duplicate 1]
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; aborting job
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 533, in sum
14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED from TID 1774 because its task set is gone
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 499, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", line 463, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
  File "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
: org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times (most recent failure: Exception failure: java.lang.StackOverflowError)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

======================================
The stack overflow error is shown as follows:
======================================

14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
java.lang.StackOverflowError
at java.util.zip.Inflater.inflate(Inflater.java:259)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
 The above replicated many times after this … 
======================================
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Xiangrui Meng
You have a long lineage that causes the StackOverflow error. Try
rdd.checkPoint() and rdd.count() for every 20~30 iterations.
checkPoint can cut the lineage. -Xiangrui

On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:

> Dear Sparkers:
>
> I am using Python spark of version 0.9.0 to implement some iterative
> algorithm. I got some errors shown at the end of this email. It seems that
> it's due to the Java Stack Overflow error. The same error has been
> duplicated on a mac desktop and a linux workstation, both running the same
> version of Spark.
>
> The same line of code works correctly after quite some iterations. At the
> line of error, rdd__new.count() could be 0. (In some previous rounds, this
> was also 0 without any problem).
>
> Any thoughts on this?
>
> Thank you very much,
> - Guanhua
>
>
> ========================================
> CODE:    print "round", round, rdd__new.count()
> ========================================
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 542, in count
> 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
> java.lang.StackOverflowError [duplicate 1]
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
> aborting job
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 533, in sum
> 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
> from TID 1774 because its task set is gone
>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 499, in reduce
>     vals = self.mapPartitions(func).collect()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 463, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
> : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
> (most recent failure: Exception failure: java.lang.StackOverflowError)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ======================================
> The stack overflow error is shown as follows:
> ======================================
>
> 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
> java.lang.StackOverflowError
> at java.util.zip.Inflater.inflate(Inflater.java:259)
> at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
> at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>  The above replicated many times after this …
> ======================================
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Mayur Rustagi
We are running into same issue. After 700 or so files the stack overflows, cache, persist & checkpointing dont help. 
Basically checkpointing only saves the RDD when it is materialized & it only materializes in the end, then it runs out of stack. 

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257


On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden email]> wrote:
You have a long lineage that causes the StackOverflow error. Try
rdd.checkPoint() and rdd.count() for every 20~30 iterations.
checkPoint can cut the lineage. -Xiangrui

On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
> Dear Sparkers:
>
> I am using Python spark of version 0.9.0 to implement some iterative
> algorithm. I got some errors shown at the end of this email. It seems that
> it's due to the Java Stack Overflow error. The same error has been
> duplicated on a mac desktop and a linux workstation, both running the same
> version of Spark.
>
> The same line of code works correctly after quite some iterations. At the
> line of error, rdd__new.count() could be 0. (In some previous rounds, this
> was also 0 without any problem).
>
> Any thoughts on this?
>
> Thank you very much,
> - Guanhua
>
>
> ========================================
> CODE:    print "round", round, rdd__new.count()
> ========================================
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 542, in count
> 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
> java.lang.StackOverflowError [duplicate 1]
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
> aborting job
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 533, in sum
> 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
> from TID 1774 because its task set is gone
>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 499, in reduce
>     vals = self.mapPartitions(func).collect()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 463, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
> : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
> (most recent failure: Exception failure: java.lang.StackOverflowError)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ======================================
> The stack overflow error is shown as follows:
> ======================================
>
> 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
> java.lang.StackOverflowError
> at java.util.zip.Inflater.inflate(Inflater.java:259)
> at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
> at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>  The above replicated many times after this …
> ======================================

Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Xiangrui Meng
After checkPoint, call count directly to materialize it. -Xiangrui

On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <[hidden email]> wrote:

> We are running into same issue. After 700 or so files the stack overflows,
> cache, persist & checkpointing dont help.
> Basically checkpointing only saves the RDD when it is materialized & it only
> materializes in the end, then it runs out of stack.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
>
> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden email]> wrote:
>>
>> You have a long lineage that causes the StackOverflow error. Try
>> rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>> checkPoint can cut the lineage. -Xiangrui
>>
>> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
>> > Dear Sparkers:
>> >
>> > I am using Python spark of version 0.9.0 to implement some iterative
>> > algorithm. I got some errors shown at the end of this email. It seems
>> > that
>> > it's due to the Java Stack Overflow error. The same error has been
>> > duplicated on a mac desktop and a linux workstation, both running the
>> > same
>> > version of Spark.
>> >
>> > The same line of code works correctly after quite some iterations. At
>> > the
>> > line of error, rdd__new.count() could be 0. (In some previous rounds,
>> > this
>> > was also 0 without any problem).
>> >
>> > Any thoughts on this?
>> >
>> > Thank you very much,
>> > - Guanhua
>> >
>> >
>> > ========================================
>> > CODE:    print "round", round, rdd__new.count()
>> > ========================================
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 542, in count
>> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> > java.lang.StackOverflowError [duplicate 1]
>> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> > aborting job
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 533, in sum
>> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>> > FAILED
>> > from TID 1774 because its task set is gone
>> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 499, in reduce
>> >     vals = self.mapPartitions(func).collect()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 463, in collect
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o4317.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>> > times
>> > (most recent failure: Exception failure: java.lang.StackOverflowError)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at scala.Option.foreach(Option.scala:236)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> > ======================================
>> > The stack overflow error is shown as follows:
>> > ======================================
>> >
>> > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
>> > java.lang.StackOverflowError
>> > at java.util.zip.Inflater.inflate(Inflater.java:259)
>> > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
>> > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
>> > at
>> >
>> > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
>> > at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
>> > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> >  The above replicated many times after this …
>> > ======================================
>
>
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Mayur Rustagi
Count causes the overall performance to drop drastically. Infact beyond 50 files it starts to hang. if i force materialization.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257


On Tue, May 13, 2014 at 9:34 PM, Xiangrui Meng <[hidden email]> wrote:
After checkPoint, call count directly to materialize it. -Xiangrui

On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <[hidden email]> wrote:
> We are running into same issue. After 700 or so files the stack overflows,
> cache, persist & checkpointing dont help.
> Basically checkpointing only saves the RDD when it is materialized & it only
> materializes in the end, then it runs out of stack.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257">+1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
>
> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden email]> wrote:
>>
>> You have a long lineage that causes the StackOverflow error. Try
>> rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>> checkPoint can cut the lineage. -Xiangrui
>>
>> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
>> > Dear Sparkers:
>> >
>> > I am using Python spark of version 0.9.0 to implement some iterative
>> > algorithm. I got some errors shown at the end of this email. It seems
>> > that
>> > it's due to the Java Stack Overflow error. The same error has been
>> > duplicated on a mac desktop and a linux workstation, both running the
>> > same
>> > version of Spark.
>> >
>> > The same line of code works correctly after quite some iterations. At
>> > the
>> > line of error, rdd__new.count() could be 0. (In some previous rounds,
>> > this
>> > was also 0 without any problem).
>> >
>> > Any thoughts on this?
>> >
>> > Thank you very much,
>> > - Guanhua
>> >
>> >
>> > ========================================
>> > CODE:    print "round", round, rdd__new.count()
>> > ========================================
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 542, in count
>> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> > java.lang.StackOverflowError [duplicate 1]
>> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> > aborting job
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 533, in sum
>> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>> > FAILED
>> > from TID 1774 because its task set is gone
>> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 499, in reduce
>> >     vals = self.mapPartitions(func).collect()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 463, in collect
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o4317.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>> > times
>> > (most recent failure: Exception failure: java.lang.StackOverflowError)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at scala.Option.foreach(Option.scala:236)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> > ======================================
>> > The stack overflow error is shown as follows:
>> > ======================================
>> >
>> > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
>> > java.lang.StackOverflowError
>> > at java.util.zip.Inflater.inflate(Inflater.java:259)
>> > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
>> > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
>> > at
>> >
>> > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
>> > at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
>> > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> >  The above replicated many times after this …
>> > ======================================
>
>

Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Nick Chammas
In reply to this post by Xiangrui Meng
Would cache() + count() every N iterations work just as well as checkPoint() + count() to get around this issue? 

We're basically trying to get Spark to avoid working on too lengthy a lineage at once, right?

Nick


On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng <[hidden email]> wrote:
After checkPoint, call count directly to materialize it. -Xiangrui

On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <[hidden email]> wrote:
> We are running into same issue. After 700 or so files the stack overflows,
> cache, persist & checkpointing dont help.
> Basically checkpointing only saves the RDD when it is materialized & it only
> materializes in the end, then it runs out of stack.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257">+1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
>
> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden email]> wrote:
>>
>> You have a long lineage that causes the StackOverflow error. Try
>> rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>> checkPoint can cut the lineage. -Xiangrui
>>
>> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
>> > Dear Sparkers:
>> >
>> > I am using Python spark of version 0.9.0 to implement some iterative
>> > algorithm. I got some errors shown at the end of this email. It seems
>> > that
>> > it's due to the Java Stack Overflow error. The same error has been
>> > duplicated on a mac desktop and a linux workstation, both running the
>> > same
>> > version of Spark.
>> >
>> > The same line of code works correctly after quite some iterations. At
>> > the
>> > line of error, rdd__new.count() could be 0. (In some previous rounds,
>> > this
>> > was also 0 without any problem).
>> >
>> > Any thoughts on this?
>> >
>> > Thank you very much,
>> > - Guanhua
>> >
>> >
>> > ========================================
>> > CODE:    print "round", round, rdd__new.count()
>> > ========================================
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 542, in count
>> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> > java.lang.StackOverflowError [duplicate 1]
>> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> > aborting job
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 533, in sum
>> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>> > FAILED
>> > from TID 1774 because its task set is gone
>> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 499, in reduce
>> >     vals = self.mapPartitions(func).collect()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 463, in collect
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o4317.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>> > times
>> > (most recent failure: Exception failure: java.lang.StackOverflowError)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at scala.Option.foreach(Option.scala:236)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> > ======================================
>> > The stack overflow error is shown as follows:
>> > ======================================
>> >
>> > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
>> > java.lang.StackOverflowError
>> > at java.util.zip.Inflater.inflate(Inflater.java:259)
>> > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
>> > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
>> > at
>> >
>> > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
>> > at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
>> > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> >  The above replicated many times after this …
>> > ======================================
>
>

Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Guanhua Yan
In reply to this post by Xiangrui Meng
Thanks Xiangrui. After some debugging efforts, it turns out that the
problem results from a bug in my code. But it's good to know that a long
lineage could also lead to this problem. I will also try checkpointing to
see whether the performance can be improved.

Best regards,
- Guanhua

On 5/13/14 12:10 AM, "Xiangrui Meng" <[hidden email]> wrote:

>You have a long lineage that causes the StackOverflow error. Try
>rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>checkPoint can cut the lineage. -Xiangrui
>
>On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
>> Dear Sparkers:
>>
>> I am using Python spark of version 0.9.0 to implement some iterative
>> algorithm. I got some errors shown at the end of this email. It seems
>>that
>> it's due to the Java Stack Overflow error. The same error has been
>> duplicated on a mac desktop and a linux workstation, both running the
>>same
>> version of Spark.
>>
>> The same line of code works correctly after quite some iterations. At
>>the
>> line of error, rdd__new.count() could be 0. (In some previous rounds,
>>this
>> was also 0 without any problem).
>>
>> Any thoughts on this?
>>
>> Thank you very much,
>> - Guanhua
>>
>>
>> ========================================
>> CODE:    print "round", round, rdd__new.count()
>> ========================================
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
>>rdd.py",
>> line 542, in count
>> 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> java.lang.StackOverflowError [duplicate 1]
>>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> aborting job
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
>>rdd.py",
>> line 533, in sum
>> 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>>FAILED
>> from TID 1774 because its task set is gone
>>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
>>rdd.py",
>> line 499, in reduce
>>     vals = self.mapPartitions(func).collect()
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/
>>rdd.py",
>> line 463, in collect
>>     bytesInJava = self._jrdd.collect().iterator()
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
>>-0.8.1-src.zip/py4j/java_gateway.py",
>> line 537, in __call__
>>   File
>>
>>"/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j
>>-0.8.1-src.zip/py4j/protocol.py",
>> line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>o4317.collect.
>> : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>>times
>> (most recent failure: Exception failure: java.lang.StackOverflowError)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
>>er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$schedul
>>er$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> at
>>
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSch
>>eduler$$abortStage(DAGScheduler.scala:1026)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
>>GScheduler.scala:619)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DA
>>GScheduler.scala:619)
>> at scala.Option.foreach(Option.scala:236)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:6
>>19)
>> at
>>
>>org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun
>>$receive$1.applyOrElse(DAGScheduler.scala:207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>>
>>akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstract
>>Dispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>>
>>scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav
>>a:1339)
>> at
>>scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>>
>>scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.j
>>ava:107)
>>
>> ======================================
>> The stack overflow error is shown as follows:
>> ======================================
>>
>> 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
>> java.lang.StackOverflowError
>> at java.util.zip.Inflater.inflate(Inflater.java:259)
>> at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
>> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>> at
>>
>>java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:231
>>0)
>> at
>>
>>java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.jav
>>a:2323)
>> at
>>
>>java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.
>>java:2818)
>> at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> at
>>
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>>java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> at
>>
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>>java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>>java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> at
>>java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>>  The above replicated many times after this Š
>> ======================================


Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

lalit1303
In reply to this post by Nick Chammas
If we do cache() + count() after say every 50 iterations. The whole process becomes very slow.
I have tried checkpoint() , cache() + count(), saveAsObjectFiles().
Nothing works. 
Materializing RDD's lead to drastic decrease in performance & if we don't materialize, we face stackoverflowerror.


On Wed, May 14, 2014 at 10:25 AM, Nick Chammas [via Apache Spark User List] <[hidden email]> wrote:
Would cache() + count() every N iterations work just as well as checkPoint() + count() to get around this issue? 

We're basically trying to get Spark to avoid working on too lengthy a lineage at once, right?

Nick


On Tue, May 13, 2014 at 12:04 PM, Xiangrui Meng <[hidden email]> wrote:
After checkPoint, call count directly to materialize it. -Xiangrui

On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <[hidden email]> wrote:
> We are running into same issue. After 700 or so files the stack overflows,
> cache, persist & checkpointing dont help.
> Basically checkpointing only saves the RDD when it is materialized & it only
> materializes in the end, then it runs out of stack.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257">+1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi

>
>
>
> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <[hidden email]> wrote:
>>
>> You have a long lineage that causes the StackOverflow error. Try
>> rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>> checkPoint can cut the lineage. -Xiangrui
>>
>> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <[hidden email]> wrote:
>> > Dear Sparkers:
>> >
>> > I am using Python spark of version 0.9.0 to implement some iterative
>> > algorithm. I got some errors shown at the end of this email. It seems
>> > that
>> > it's due to the Java Stack Overflow error. The same error has been
>> > duplicated on a mac desktop and a linux workstation, both running the
>> > same
>> > version of Spark.
>> >
>> > The same line of code works correctly after quite some iterations. At
>> > the
>> > line of error, rdd__new.count() could be 0. (In some previous rounds,
>> > this
>> > was also 0 without any problem).
>> >
>> > Any thoughts on this?
>> >
>> > Thank you very much,
>> > - Guanhua
>> >
>> >
>> > ========================================
>> > CODE:    print "round", round, rdd__new.count()
>> > ========================================
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 542, in count
>> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> > java.lang.StackOverflowError [duplicate 1]
>> >     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> > aborting job
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 533, in sum
>> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>> > FAILED
>> > from TID 1774 because its task set is gone
>> >     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 499, in reduce
>> >     vals = self.mapPartitions(func).collect()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 463, in collect
>> >     bytesInJava = self._jrdd.collect().iterator()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o4317.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>> > times
>> > (most recent failure: Exception failure: java.lang.StackOverflowError)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at scala.Option.foreach(Option.scala:236)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> > at
>> >
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> > at
>> >
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> > ======================================
>> > The stack overflow error is shown as follows:
>> > ======================================
>> >
>> > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
>> > java.lang.StackOverflowError
>> > at java.util.zip.Inflater.inflate(Inflater.java:259)
>> > at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
>> > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
>> > at
>> >
>> > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
>> > at
>> >
>> > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
>> > at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
>> > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:606)
>> > at
>> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at
>> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> > at
>> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>> >  The above replicated many times after this …
>> > ======================================
>
>




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p5683.html
To start a new topic under Apache Spark User List, email [hidden email]
To unsubscribe from Apache Spark User List, click here.
NAML



--
-- 
Thanks & Regards,
Lalit Yadav
+91-9901007692
Lalit Yadav
lalit@sigmoidanalytics.com
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

lalit1303
In reply to this post by Nick Chammas
If we do cache() + count() after say every 50 iterations. The whole process
becomes very slow.
I have tried checkpoint() , cache() + count(), saveAsObjectFiles().
Nothing works.
Materializing RDD's lead to drastic decrease in performance & if we don't
materialize, we face stackoverflowerror.



-----
Lalit Yadav
[hidden email]
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p5699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Lalit Yadav
lalit@sigmoidanalytics.com
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Tathagata Das
Just to add some more clarity in the discussion, there is a difference between caching to memory and checkpointing, when considered from the lineage point of view. 

When an RDD in checkpointed, the data of the RDD is saved to HDFS (or any Hadoop API compatible fault-tolerant storage) and the lineage of the RDD is truncated. This is okay because in case of the worker failure, the RDD data can be read back from the fault-tolerant storage.

When an RDD is cached, the data of the RDD is cached in memory, but the lineage is not truncated. This is because if the in-memory data is lost, the lineage is required to recompute the data.

So to deal with stackoverflow errors due to long lineage, just caching is not going to be useful. You have to checkpoint the RDD, and as far as I think, its correct way to do this is to do the following
1. Mark RDD of every Nth iteration for caching and checkpointing (both). 
2. And before generating N+1 th iteration RDD, force the materialization of this RDD by doing a rdd.count(). This will persist the RDD in memory as well as save to HDFS and truncate the lineage. If you just mark all Nth iteration RDD for checkpointing, but only force the materialization after ALL the iterations (not after every N+1 th iteration as I suggested) that will still lead to stackoverflow errors.

Yes this checkpointing and materialization is definitely decrease performance, but that is the limitation of the current implementation. 

If you are brave enough, you can try the following. Instead of relying on checkpointing to HDFS for truncating lineage, you can do the following.
1. Persist Nth RDD with replication (see different StorageLevels), this would replicated the in-memory RDD between workers within Spark. Lets call this RDD as R.
2. Force it materialize in the memory. 
3. Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R (can elaborate on that if you want).

This will avoid writing to HDFS (replication in the Spark memory), but truncate the lineage (by creating new BlockRDDs), and avoid stackoverflow error.

Hope this helps

TD



On Wed, May 14, 2014 at 3:33 AM, lalit1303 <[hidden email]> wrote:
If we do cache() + count() after say every 50 iterations. The whole process
becomes very slow.
I have tried checkpoint() , cache() + count(), saveAsObjectFiles().
Nothing works.
Materializing RDD's lead to drastic decrease in performance & if we don't
materialize, we face stackoverflowerror.



-----
Lalit Yadav
[hidden email]
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p5699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

lalit1303
Hi,
Thanks TD for your reply. I am still not able to resolve the problem for my use case.
I have let's say 1000 different RDD's, and I am applying a transformation function on each RDD and I want the output of all rdd's combined to a single output RDD. For, this I am doing the following:

<Loop Start>
tempRDD = jaRDD.rdd().repartition(1).mapPartitions(....).toJavaRDD();  //creating new rdd in every loop 
outRDD = outRDD.union(tempRDD); //keep joining RDD's to get the output into a single RDD

//after every 10 iteration, in order to truncate the lineage
cachRDD = outRDD.cache();
cachRDD.checkpoint();
System.out.println(cachRDD.collect().size());
outRDD = new JavaRDD<String>(cachRDD.rdd(),cachRDD.classTag());
<Loop Ends>

//finally after whole computation
outRDD.saveAsTextFile(..)

The above operations is overall slow, runs successfully when performed less iterations i.e. ~100. But, when the num of iterations in increased to ~1000, The whole job is taking more than 30 mins and ultimately break down giving OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I also observed that when collect() operation is performed, the number of tasks keeps on increasing as the loop proceeds, like on first collect() 22 total task, then ~40 total tasks ... ~300 task for single collect.
Does this means that all the operations are repeatedly performed, and RDD lineage is not broken??


Can you please elaborate on the point from your last post i.e. how to perform: "Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R"
Lalit Yadav
lalit@sigmoidanalytics.com
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Tathagata Das
Responses inline.

On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 <[hidden email]> wrote:

> Hi,
> Thanks TD for your reply. I am still not able to resolve the problem for my
> use case.
> I have let's say 1000 different RDD's, and I am applying a transformation
> function on each RDD and I want the output of all rdd's combined to a single
> output RDD. For, this I am doing the following:
>
> *<Loop Start>*
> tempRDD = jaRDD.rdd().repartition(1).mapPartitions(....).toJavaRDD();
> *//creating new rdd in every loop*
> outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into
> a single RDD*
>
> *//after every 10 iteration, in order to truncate the lineage*
> cachRDD = outRDD.cache();
> cachRDD.checkpoint();
> System.out.println(cachRDD.collect().size());
> outRDD = new JavaRDD<String>(cachRDD.rdd(),cachRDD.classTag());
> *<Loop Ends>*
>
> *//finally after whole computation*
> outRDD.saveAsTextFile(..)
>
> The above operations is overall slow, runs successfully when performed less
> iterations i.e. ~100. But, when the num of iterations in increased to ~1000,
> The whole job is taking more than *30 mins* and ultimately break down giving
> OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am
> running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I think this is happening because how you are caching the output RDD
that are being generated repeatedly. In every iteration, it is
building this new union RDD which contains the data of the previous
union RDD plus some new data. Since each of these union RDDs are
cached, the underlying data is being cached repeatedly. So the cached
Iteration 1: union RDD: X MB
Iteration 2: union RDD: 2X MB   |  Total size cached: 3X
Iteration 3: union RDD: 3X MB   |  Total size cached: 6X MB
Iteration 4: union RDD: 4X MB   |  Total size cached: 10X MB
...

If you do the math, that is a quadratic increase in the size of the
data being processed and cached, wrt the # iterations. This leads to
both increase in run time and memory usage.


> I also observed that when collect() operation is performed, the number of
> tasks keeps on increasing as the loop proceeds, like on first collect() 22
> total task, then ~40 total tasks ... ~300 task for single collect.
> Does this means that all the operations are repeatedly performed, and RDD
> lineage is not broken??
>
Same reason as above. Each union RDD is build by appending the
partitions of the previous union RDD plus the new set of partitions
(~22 partitions). So Nth union RDD has N * 22 partitions, hence that
many tasks.
You could change this by also doing repartitioning when you want to
cache+checkpoint the union RDD (therefore,
outRDD.repartition(100).cache().checkpoint().count()).

And do you really need all the data to be collected at the driver? If
you are doing the cachRDD.collect() just to forced the checkpoint,
then use cachRDD.count()

>
> Can you please elaborate on the point from your last post i.e. how to
> perform: "*Create a modified RDD R` which has the same data as RDD R but
> does not have the lineage. This is done by creating a new BlockRDD using the
> ids of blocks of data representing the in-memory R*"
>
Please refer to the lines in the function:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74
What those lines do is save the data of the associated RDD to HDFS
files, and then create a new CheckpointRDD from the same files.Then
the dependency of the associated RDD is changed to use the new RDD.
This truncates the lineage because the associated RDD's parent is not
the new RDD which has a very short lineage (links to checkpoint
files). And the previous dependencies (parent RDDs) are forgotten.

This implementation can be modified by forcing the data of the
associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And
then instead of CheckpointRDD, you can create a new BlockRDD (using
the names of the blocks that are used to cache the RDD), which is then
set as the new dependency. This is definitely a behind-the-public API
implementation, that is


>
>
> -----
> Lalit Yadav
> [hidden email]
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

randylu
In reply to this post by Tathagata Das
hi, TD. I also fall into the trap of long lineage, and your suggestions do work well. But i don't understand why the long lineage can cause stackover, and where it takes effect?
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Tathagata Das
The long lineage causes a long/deep Java object tree (DAG of RDD objects), which needs to be serialized as part of the task creation. When serializing, the whole object DAG needs to be traversed leading to the stackoverflow error. 

TD


On Mon, Aug 11, 2014 at 7:14 PM, randylu <[hidden email]> wrote:
hi, TD. I also fall into the trap of long lineage, and your suggestions do
work well. But i don't understand why the long lineage can cause stackover,
and where it takes effect?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

randylu
hi, TD. Thanks very much! I got it.
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

alfa
This post has NOT been accepted by the mailing list yet.
In reply to this post by Tathagata Das
Hello,

Can you please elaborate the 3rd point you mention ?  (i.e. "3. Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R (can elaborate on that if you want).")  

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: java.lang.StackOverflowError when calling count()

Anuj
This post has NOT been accepted by the mailing list yet.
In reply to this post by Guanhua Yan
We were getting the same problem also. Funny thing our code worked with larger data set and failed for a reduced data set. Anyway we are thinking on passing stacksize override params to jvm may be that can help you.

Please give it a try and let me know.

--conf spark.executor.extraJavaOptions=-Xss8m --conf spark.driver.extraJavaOptions=-Xss8m

taking 8m is kind a overkill for stack size, so you can start with 4m.