spark on yarn fail with IOException

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

spark on yarn fail with IOException

dong
This post has NOT been accepted by the mailing list yet.
Hi,

code works on input of 12GB, when running on 3TB data, kept getting this exception and job failed. I think it is doing the following code when exception throws

val richGraphSrc = getRichGraph(sc, inputGraph, 3000)
      .map { e => (e._1, (e._2, e._3, e._4))}     // line 90
      .persist(StorageLevel.DISK_ONLY_2)

val richGraph2 = richGraphSrc
        .filter { case (src: Long, rest : (Long, Double, Double)) => bc.value.contains(rest._1) }  // line 98
        .persist(StorageLevel.MEMORY_ONLY_2)

richGraph2.join(richGraphSrc, 3000).values
        .map { (e2e: ((Long, Double, Double), (Long, Double, Double))) => // line 104
...
}

14/01/10 09:01:47 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at SparkCosineSimilarity.scala:63), which has no missing parents
14/01/10 09:01:47 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 0 (MappedRDD[3] at map at SparkCosineSimilarity.scala:63)
14/01/10 09:02:00 INFO scheduler.DAGScheduler: Stage 0 (reduce at SparkCosineSimilarity.scala:63) finished in 12.129 s
14/01/10 09:02:01 INFO scheduler.DAGScheduler: Submitting Stage 4 (FilteredRDD[8] at filter at SparkCosineSimilarity.scala:98), which has no missing parents
14/01/10 09:02:02 INFO scheduler.DAGScheduler: Submitting 5743 missing tasks from Stage 4 (FilteredRDD[8] at filter at SparkCosineSimilarity.scala:98)
14/01/10 09:02:02 INFO scheduler.DAGScheduler: Submitting Stage 5 (MappedRDD[7] at map at SparkCosineSimilarity.scala:90), which has no missing parents
14/01/10 09:02:02 INFO scheduler.DAGScheduler: Submitting 5743 missing tasks from Stage 5 (MappedRDD[7] at map at SparkCosineSimilarity.scala:90)

14/01/10 09:05:13 WARN cluster.ClusterTaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:535)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:631)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:689)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:167)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:150)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
        at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
        at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
        at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
        at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Reply | Threaded
Open this post in threaded view
|

Re: spark on yarn fail with IOException

dong
This post has NOT been accepted by the mailing list yet.
The command line used is

./spark-class org.apache.spark.deploy.yarn.Client \
      --jar  SparkCosineSimilarity.jar \
      --class SparkCosineSimilarity \
      --args yarn-standalone \
      --args 3000 \
      --num-workers 120 \
      --master-memory 7g \
      --worker-memory 7g \
      --worker-cores 1
sam
Reply | Threaded
Open this post in threaded view
|

Re: spark on yarn fail with IOException

sam
In reply to this post by dong
I get a very similar stack trace and have no idea what could be causing it (see below).  I've created a SO: http://stackoverflow.com/questions/24038908/spark-fails-on-big-jobs-with-java-io-ioexception-filesystem-closed

14/06/02 20:44:04 INFO client.AppClient$ClientActor: Executor updated: app-20140602203435-0020/6 is now FAILED (Command exited with code 137)
14/06/02 20:44:05 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140602203435-0020/6 removed: Command exited with code 137
14/06/02 20:44:05 INFO cluster.SparkDeploySchedulerBackend: Executor 6 disconnected, so removing it
14/06/02 20:44:05 ERROR scheduler.TaskSchedulerImpl: Lost executor 6 on ip-172-31-23-17.ec2.internal: Unknown executor exit code (137) (died from signal 9?)
14/06/02 20:44:05 INFO scheduler.TaskSetManager: Re-queueing tasks for 6 from TaskSet 2.0
14/06/02 20:44:05 WARN scheduler.TaskSetManager: Lost TID 358 (task 2.0:66)
...
14/06/02 21:08:11 INFO cluster.SparkDeploySchedulerBackend: Executor 16 disconnected, so removing it
14/06/02 21:08:11 ERROR scheduler.TaskSchedulerImpl: Lost executor 16 on ip-172-31-28-73.ec2.internal: remote Akka client disassociated
14/06/02 21:08:11 INFO scheduler.TaskSetManager: Re-queueing tasks for 16 from TaskSet 5.5
14/06/02 21:08:11 INFO scheduler.DAGScheduler: Executor lost: 16 (epoch 24)
14/06/02 21:08:11 INFO storage.BlockManagerMasterActor: Trying to remove executor 16 from BlockManagerMaster.
14/06/02 21:08:11 INFO storage.BlockManagerMaster: Removed 16 successfully in removeExecutor
14/06/02 21:08:11 INFO scheduler.Stage: Stage 5 is now unavailable on executor 16 (207/234, false)
14/06/02 21:08:11 INFO client.AppClient$ClientActor: Executor updated: app-20140602203435-0020/16 is now FAILED (Command exited with code 137)
14/06/02 21:08:11 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140602203435-0020/16 removed: Command exited with code 137
14/06/02 21:08:11 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/02 21:08:11 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/02 21:08:12 INFO scheduler.TaskSchedulerImpl: Ignoring update with state FAILED from TID 1364 because its task set is gone
...
14/06/02 21:08:12 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
        at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

In a Spark Worker Log:

14/06/02 20:26:27 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-172-31-22-58.ec2.internal:43224] -> [akka.tcp://spark@ip-172-31-23-17.ec2.internal:35581] disassociated! Shutting down.