closure and ExceptionInInitializerError

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

closure and ExceptionInInitializerError

Hao Ren
Hi,

When playing with spark, I encountered an ExceptionInInitializerError. I am confused with the closure which spark uses.

For example, I have 2 scala files in my project:

// Entry.scala
object Entry {
  def main(args: Array[String]) = {
    anonymous.run()
  }
}

// anonymous.scala
object anonymous {
    val rdd1 = sparkContext.parallelize(List(1, 2, 3, 4))
    val a = 1    

    def run() {
        val rdd2 = rdd1.map(_ + a)
        println(rdd2.count)
    }
}


The code above will give an ExceptionInInitializerError which says:
java.lang.ExceptionInInitializerError
        at job.analysis.anonymous$.<init>(anonymous.scala:37)
        at job.analysis.anonymous$.<clinit>(anonymous.scala)
...
java.lang.NoClassDefFoundError: Could not initialize class  job.analysis..anonymous$


But if I move "val a = 1" into the run()  function, it works. So I think the problem is the object initialization.

The object "anonymous" here is only created in the main process, not on the worker process, thus, when the workers want to access to "val a" which is defined as an attribute of "anonymous". They just can not find it the initialized object. Correct me if I am wrong.

I have also tested more about it:

// snippet 1
def run() = {
    class t extends Serializable {
      val a = 2
    }
    val tt = new t()
    val rdd2 = rdd1.map(_ + tt.a)
    println(rdd2.count)
} // this one works

// snippet 2
def run() = {
    object t extends Serializable {
      val a = 2
    }
    val rdd2 = rdd1.map(_ + t.a)
    println(rdd2.count)
} // I encountered the same ExceptionInInitializerError


It seems strange to me, because, in snippet 1, the instance tt is created in the main process the same as snippet 2, except,
in snippet 2, t is a standalone object.

Could someone explain why it occurs ? Maybe it's just a basic question, but it really makes me confused a lot. Am I missing anything here ?

Hao
Bao
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: closure and ExceptionInInitializerError

Bao
Redocpot, I tried your 2 snippets with spark-shell and both work fine. I only see problem if closure is not serializeable.

scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:12

scala> val a = 1  
a: Int = 1

scala> val rdd2 = rdd1.map(_ + a)
rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at <console>:16

scala> rdd2.count
13/12/30 03:50:59 INFO SparkContext: Starting job: count at <console>:19
13/12/30 03:50:59 INFO DAGScheduler: Got job 4 (count at <console>:19) with 2 output partitions (allowLocal=false)
13/12/30 03:50:59 INFO DAGScheduler: Final stage: Stage 4 (count at <console>:19)
13/12/30 03:50:59 INFO DAGScheduler: Parents of final stage: List()
13/12/30 03:50:59 INFO DAGScheduler: Missing parents: List()
13/12/30 03:50:59 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[5] at map at <console>:16), which has no missing parents
13/12/30 03:50:59 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4 (MappedRDD[5] at map at <console>:16)
13/12/30 03:50:59 INFO ClusterScheduler: Adding task set 4.0 with 2 tasks
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:0 as TID 8 on executor 0: worker1 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:0 as 1839 bytes in 1 ms
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:1 as TID 9 on executor 1: worker2 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:1 as 1839 bytes in 1 ms
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 8 in 152 ms on worker1 (progress: 1/2)
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 0)
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 9 in 171 ms on worker2 (progress: 2/2)
13/12/30 03:51:00 INFO ClusterScheduler: Remove TaskSet 4.0 from pool
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 1)
13/12/30 03:51:00 INFO DAGScheduler: Stage 4 (count at <console>:19) finished in 0.131 s
13/12/30 03:51:00 INFO SparkContext: Job finished: count at <console>:19, took 0.212351498 s
res5: Long = 4
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: closure and ExceptionInInitializerError

Hao Ren
Thank you for your reply, Bao.

Actually, I am running the code on EC2 with SBT. 1 master and 3 slaves

In order to reproduce the issue, I just created a directory with one scala souce file and a "lib" directory where the assembly jar is.

$ sbt package run

will run the project.

The scala file is showed as below:


// job.scala
import org.apache.spark.SparkContext
import scala.io.Source

object anonymous extends App {
  val nameNodeURL = Source.fromFile("/root/spark-ec2/masters").mkString.trim
  val sparkPort = 7077
  val sc = new SparkContext("spark://" + nameNodeURL + ":" + sparkPort, "Test",
    System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/test_2.9.3-0.1.jar"))
  val rdd1 = sc.parallelize(List(1, 2, 3, 4))
  val a = 1

  def run() {
    val rdd2 = rdd1.map(_ + a)
    println(rdd2.count)
  }

  run
}


The code works, this may explains why it works in shell.
But if I changed the into an object with a main method, instead of using "extends App", like:


// job.scala
import org.apache.spark.SparkContext
import scala.io.Source

object anonymous {
  val nameNodeURL = Source.fromFile("/root/spark-ec2/masters").mkString.trim
  val sparkPort = 7077
  val sc = new SparkContext("spark://" + nameNodeURL + ":" + sparkPort, "Test",
    System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/test_2.9.3-0.1.jar"))
  val rdd1 = sc.parallelize(List(1, 2, 3, 4))
  val a = 1

  def run() {
    val rdd2 = rdd1.map(_ + a)
    println(rdd2.count)
  }

  def main(args: Array[String]){
    run
  }
}


Two exceptions is thrown:

13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.ExceptionInInitializerError
java.lang.ExceptionInInitializerError
        at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13)
        at anonymous$$anonfun$1.apply(job.scala:13)
        at anonymous$$anonfun$1.apply(job.scala:13)
        at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
        at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 0: ip-10-202-35-76.ec2.internal (PROCESS_LOCAL)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1351 bytes in 1 ms
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: Could not initialize class anonymous$
        at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13)
        at anonymous$$anonfun$1.apply(job.scala:13)
        at anonymous$$anonfun$1.apply(job.scala:13)
        at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
        at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

The two snippets are doing the same thing, right?
So what's the difference ? Any thoughts?

Thank you.

Hao.
Loading...