Re: debugging NotSerializableException while using Kryo

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Ameet Kini
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Jie Deng
maybe try to implement your class with serializable...


2013/12/23 Ameet Kini <[hidden email]>
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Ameet Kini
Using Java serialization would make the NPE go away, but it would be a less preferable solution. My application is network-intensive, and serialization cost is significant. In other words, these objects are ideal candidates for Kryo. 





On Mon, Dec 23, 2013 at 3:41 PM, Jie Deng <[hidden email]> wrote:
maybe try to implement your class with serializable...


2013/12/23 Ameet Kini <[hidden email]>
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Michael (Bach) Bui
In reply to this post by Ameet Kini
What spark version are you using? By looking at the code Executor.scala line195, you will at least know what cause the NPE.
We can start from there.



On Dec 23, 2013, at 10:21 AM, Ameet Kini <[hidden email]> wrote:

Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: <a href="akka.tcp://[redacted]">akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [<a href="akka.tcp://[redacted">akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: <a href="akka.tcp://[redacted]">akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Dmitriy Lyubimov
In reply to this post by Ameet Kini
The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). 

there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. 

afaik the methods doing correct front/back end parameter serialization are:

collect()
take() (maybe)
parallelize()
reduce()

Everything else ("fold()", etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. 

This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands.



On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <[hidden email]> wrote:
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Ameet Kini
In reply to this post by Michael (Bach) Bui
Hi Michael,

I re-ran this on another machine which is on spark's master branch 0.9.0-SNAPSHOT from Dec 14 (right after the scala 2.10 branch was merged back into master) and recreated the NPE towards the end of this message. I can't tell looking at the relevant code what may have caused the exception because line 262 is part of the catch block of a pretty big try/catch block.


Executor.scala Line 262:  val metrics = attemptedTask.flatMap(t => t.metrics)

The surrounding lines are:
       case t: Throwable => {
          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
Line 262 -->  val metrics = attemptedTask.flatMap(t => t.metrics)  
          for (m <- metrics) {
            m.executorRunTime = serviceTime
            m.jvmGCTime = gcTime - startGCTime
          }
          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
          execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
          // have left some weird state around depending on when the exception was thrown, but on
          // the other hand, maybe we could detect that when future tasks fail and exit then.
          logError("Exception in task ID " + taskId, t)
          //System.exit(1)
        }




13/12/24 10:02:33 ERROR Executor: Uncaught exception in thread Thread[Executor task launch worker-4,5,main]
java.lang.NullPointerException
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1$$anonfun$8.apply(Executor.scala:262)
    at scala.Option.flatMap(Option.scala:170)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:262)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

Thanks,
Ameet



On Mon, Dec 23, 2013 at 4:58 PM, Michael (Bach) Bui <[hidden email]> wrote:
What spark version are you using? By looking at the code Executor.scala line195, you will at least know what cause the NPE.
We can start from there.




On Dec 23, 2013, at 10:21 AM, Ameet Kini <[hidden email]> wrote:

Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Ameet Kini
In reply to this post by Dmitriy Lyubimov

If Java serialization is the only one that properly works for closures, then I shouldn't be setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer", and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization. For option b, I'd be setting "spark.serializer" to "org.apache.spark.serializer.KryoSerializer" but leave "spark.closure.serializer" to its default value (Java).

If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly.

Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <[hidden email]> wrote:
The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). 

there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. 

afaik the methods doing correct front/back end parameter serialization are:

collect()
take() (maybe)
parallelize()
reduce()

Everything else ("fold()", etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. 

This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands.



On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <[hidden email]> wrote:
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 





Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Eugen Cepoi
In scala case classes are serializable by default, your TileIdWritable should be a case class. I usually enable Kryo ser for objects and keep default ser for closures, this works pretty well.

Eugen


2013/12/24 Ameet Kini <[hidden email]>

If Java serialization is the only one that properly works for closures, then I shouldn't be setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer", and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization. For option b, I'd be setting "spark.serializer" to "org.apache.spark.serializer.KryoSerializer" but leave "spark.closure.serializer" to its default value (Java).

If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly.

Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <[hidden email]> wrote:
The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). 

there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. 

afaik the methods doing correct front/back end parameter serialization are:

collect()
take() (maybe)
parallelize()
reduce()

Everything else ("fold()", etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. 

This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands.



On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <[hidden email]> wrote:
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 






Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: debugging NotSerializableException while using Kryo

Dmitriy Lyubimov
In reply to this post by Ameet Kini



On Tue, Dec 24, 2013 at 7:29 AM, Ameet Kini <[hidden email]> wrote:

If Java serialization is the only one that properly works for closures, then I shouldn't be setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer",

My understanding is that it's not that it kryo wouldn't necessarily work for closures, it's just that closure serialization is used not only for user defined closures but also for a lot of closures internal to Spark. For them, of course, there's no way to enable kryo support. 

Data objects (and their serializer) on the other hand are not defined anywhere but the user's code. Therefore overriding object serializer is in general a safe assumption and overriding closure serializer is not.
 
and my only hope for getting lookup (and other such methods that still use closure serializers) to work is to either a) use only Java serialization for my objects, or b) have my objects implement Serializable but enable Kryo as well for object serialization.

or c) avoid using Spark api that currently uses closures to communicate data objects between front end and backend for kryo-only objects (as i do). The most annoying of those is fold().

In fact, d) you can always wrap a kryo object into byte array in front end, pass in java-serializable byte array thru a closure, and unwrap it in backend. This technique is extremely ugly though with methods like fold() which force you to use the same object type in front end and backend operation (and obviously it may be not the type you want for the purposes of elegant folding). I had this problem extensively with 3rd party types for which i have no desire to add java serialization (mostly, hadoop Writables of various kind) so the obvious desirable solution is to add kryo support for them without having to modify the original class. This mostly works for me.
 
For option b, I'd be setting "spark.serializer" to "org.apache.spark.serializer.KryoSerializer" but leave "spark.closure.serializer" to its default value (Java).
Yes. like i said changing closure serializer is undesirable unless you can guarantee proper serialization support for all Spark internal closures as well your own closures.
 

If the above is true, then seems like as it stands today, the best practice is for objects that use Kryo to also either implement Serializable or Externalizable for closures to work properly.
 
Again, this is only true for a smaller portion of spark api.  Most of spark api doesn't have this problem so you may well get away with either not using them or pre-serialize objects into byte arrays while using "problematic" api.


Thanks,
Ameet




On Mon, Dec 23, 2013 at 5:18 PM, Dmitriy Lyubimov <[hidden email]> wrote:
The problem really is that in certain cases task results -- and front-end-passed parameters -- are passed thru closures. For closures, only java serializer is properly supported (afaik). 

there has been a limited number of fixes for data parameter communication between front end and backend for using other-than-java serialization (e.g. for parallelize(), collect()-- these methods do not use closures to pass in/grab data objects anymore); however, a certain number of methods is still using closures to pass in a data object. 

afaik the methods doing correct front/back end parameter serialization are:

collect()
take() (maybe)
parallelize()
reduce()

Everything else ("fold()", etc.) that communicates data between front end and backend, still wraps data into closures. For a thing like fold() in fact you'd have to use type that has both Java and Kryo support at the same time, because it will always use both closure and object serializers while executing. 

This IMO is inconsistent of course with assumption that same data type should be supported uniformly regardless of where it serializes, but that's the state of things as it stands.



On Mon, Dec 23, 2013 at 8:21 AM, Ameet Kini <[hidden email]> wrote:
Thanks Imran. 

I tried setting "spark.closure.serializer" to "org.apache.spark.serializer.KryoSerializer" and now end up seeing NullPointerException when the executor starts up. This is a snippet of the executor's log. Notice how "registered TileIdWritable" and "registered ArgWritable" is called, so I see that my KryoRegistrator is being called. However, it's not clear why there's a follow-on NPE. My spark log level is set to DEBUG in log4j.properties (log4j.rootCategory=DEBUG) so not sure if there s 
some other way to get the executor to be more verbose as to the cause of the NPE. 

When I take out the spark.closure.serializer setting (i.e., go back to the default Java serialization), the executors start up fine, and executes other RDD actions, but of course not the lookup action (my original problem). With the spark.closure.serializer setting to kryo, it dies with an NPE during executor startup. 


13/12/23 11:00:36 INFO StandaloneExecutorBackend: Connecting to driver: akka.tcp://[redacted]:48147/user/StandaloneScheduler
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Successfully registered with driver
13/12/23 11:00:36 INFO Slf4jLogger: Slf4jLogger started
13/12/23 11:00:36 INFO Remoting: Starting remoting
13/12/23 11:00:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:/[redacted]:56483]
13/12/23 11:00:36 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[redacted]:56483]
13/12/23 11:00:36 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://[redacted]:48147/user/BlockManagerMaster
13/12/23 11:00:36 INFO MemoryStore: MemoryStore started with capacity 323.9 MB.
13/12/23 11:00:36 DEBUG DiskStore: Creating local directories at root dirs '/tmp'
13/12/23 11:00:36 INFO DiskStore: Created local directory at /tmp/spark-local-20131223110036-4335
13/12/23 11:00:36 INFO ConnectionManager: Bound socket to port 41617 with id = ConnectionManagerId([redacted],41617)
13/12/23 11:00:36 INFO BlockManagerMaster: Trying to register BlockManager
13/12/23 11:00:36 INFO BlockManagerMaster: Registered BlockManager
13/12/23 11:00:36 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp:/[redacted]:48147/user/MapOutputTracker
13/12/23 11:00:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e71e0a2b-a247-4bb8-b06d-19c12467b65a
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 0
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 1
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 2
13/12/23 11:00:36 INFO StandaloneExecutorBackend: Got assigned task 3
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
13/12/23 11:00:36 DEBUG KryoSerializer: Running user registrator: geotrellis.spark.KryoRegistrator
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered TileIdWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
registered ArgWritable
13/12/23 11:00:37 INFO Executor: Running task ID 2
13/12/23 11:00:37 INFO Executor: Running task ID 1
13/12/23 11:00:37 INFO Executor: Running task ID 3
13/12/23 11:00:37 INFO Executor: Running task ID 0
13/12/23 11:00:37 INFO Executor: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar with timestamp 1387814434436
13/12/23 11:00:37 INFO Utils: Fetching http://10.194.70.21:51166/jars/geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to /tmp/fetchFileTemp2456419097284083628.tmp
13/12/23 11:00:37 INFO Executor: Adding file[redacted]/spark/work/app-20131223110034-0000/0/./geotrellis-spark_2.10-0.9.0-SNAPSHOT.jar to class loader
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-4,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-2,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
       at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-1,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 ERROR Executor: Uncaught exception in thread Thread[pool-7-thread-3,5,main]
java.lang.NullPointerException
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:195)
        at scala.Option.flatMap(Option.scala:170)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
13/12/23 11:00:37 DEBUG DiskStore: Shutdown hook called

Thanks,
Ameet


On Fri, Dec 20, 2013 at 3:24 PM, Imran Rashid <[hidden email]> wrote:
there is a separate setting for serializing closures "spark.closure.serializer" (listed here http://spark.incubator.apache.org/docs/latest/configuration.html)

that is used to serialize whatever is used by all the fucntions on an RDD, eg., map, filter, and lookup.  Those closures include referenced variables, like your
TileIdWritable.

So you need to either change that to use kryo, or make your object serializable to java. 



On Fri, Dec 20, 2013 at 2:18 PM, Ameet Kini <[hidden email]> wrote:

I'm getting the below NotSerializableException despite using Kryo to serialize that class (TileIdWritable). 

The offending line: awtestRdd.lookup(TileIdWritable(200))

Initially I thought Kryo is not being registered properly, so I tried running operations over awtestRDD which force a shuffle (e.g., groupByKey), and that seemed to work fine. So it seems to be specific to the "TileIdWritable(200)" argument to lookup().  Is there anything unique about companion objects and Kryo serialization? I even replaced "TileIdWritable(200)" by "new TileIdWritable" but still see that exception


class TileIdWritable { 
 //
}

object TileIdWritable {
 def apply(value: Long) = new TileIdWritable
}


My Kryo registrator:
class KryoRegistrator extends SparkKryoRegistrator {
    override def registerClasses(kryo: Kryo) {
      println("Called KryoRegistrator")  // I see this printed during shuffle operations
      val r = kryo.register(classOf[TileIdWritable])
      val s = kryo.register(classOf[ArgWritable])
    }
}

Then just before creating a Spark Context, I have these two lines:
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "geotrellis.spark.KryoRegistrator")




The exception itself:
Exception in thread "main" org.apache.spark.SparkException: Job failed: Task not serializable: java.io.NotSerializableException: geotrellis.spark.formats.TileIdWritable
    - field (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", name: "key$1", type: "class java.lang.Object")
    - object (class "org.apache.spark.rdd.PairRDDFunctions$$anonfun$4", <function1>)
    - field (class "org.apache.spark.SparkContext$$anonfun$runJob$4", name: "func$1", type: "interface scala.Function1")
    - root object (class "org.apache.spark.SparkContext$$anonfun$runJob$4", <function2>)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:763)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:761)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:761)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Regards,
Ameet


 






Loading...