spark streaming actor receiver doesn't play well with kryoserializer

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

spark streaming actor receiver doesn't play well with kryoserializer

Alan Ngai
it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more

Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Alan Ngai
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver]%20with%20dispatcher%20[akka.actor.default-dispatcher]%20and%20mailbox%20[akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver]%20with%20dispatcher%20[akka.actor.default-dispatcher]%20and%20mailbox%20[akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more


Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Tathagata Das
Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <[hidden email]> wrote:
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more



Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Alan Ngai
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both?  I enabled more logging and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[<a href="akka.tcp://spark@leungshwingchun:52156">akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [<a href="akka.tcp://spark@leungshwingchun:52156">akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
 14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
 java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
 14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
 creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from <a href="akka://spark">akka://spark
 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from <a href="akka://spark">akka://spark
 14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [<a href="akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox">akka://spark/user/Supervisor0/SampleReceiver] with dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000

On Jul 25, 2014, at 3:20 PM, Tathagata Das <[hidden email]> wrote:

Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <[hidden email]> wrote:
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more




Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Prashant Sharma
This looks like a bug to me. This happens because we serialize the code that starts the receiver and send it across. And since we have not registered the classes of akka library it does not work. I have not tried myself, but may be by including something like chill-akka (https://github.com/xitrum-framework/chill-akka) might help. I am not well aware about how kryo works internally, may be someone else can throw some light on this.

Prashant Sharma




On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <[hidden email]> wrote:
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both?  I enabled more logging and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
 14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
 java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
 14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
 creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000

On Jul 25, 2014, at 3:20 PM, Tathagata Das <[hidden email]> wrote:

Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <[hidden email]> wrote:
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more





Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Rohit Rai
In reply to this post by Alan Ngai
Alan/TD,

We are facing the problem in a project going to production.

Was there any progress on this? Are we able to confirm that this is a bug/limitation in the current streaming code? Or there is anything wrong in user scope?

Regards,
Rohit

Founder & CEO, Tuplejump, Inc.
____________________________
The Data Engineering Platform


On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <[hidden email]> wrote:
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both?  I enabled more logging and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
 14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
 java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
 14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
 creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000

On Jul 25, 2014, at 3:20 PM, Tathagata Das <[hidden email]> wrote:

Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <[hidden email]> wrote:
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more





Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

Tathagata Das
Another possible reason behind this maybe that there are two versions of Akka present in the classpath, which are interfering with each other. This could happen through many scenarios.

1. Launching Spark application with Scala brings in Akka from Scala, which interferes with Spark's Akka
2. Multiple Akka through some transitive dependencies

TD


On Thu, Aug 7, 2014 at 2:30 AM, Rohit Rai <[hidden email]> wrote:
Alan/TD,

We are facing the problem in a project going to production.

Was there any progress on this? Are we able to confirm that this is a bug/limitation in the current streaming code? Or there is anything wrong in user scope?

Regards,
Rohit

Founder & CEO, Tuplejump, Inc.
____________________________
The Data Engineering Platform


On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <[hidden email]> wrote:
The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both?  I enabled more logging and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
 14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
 14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
 14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
 14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
 14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
 14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
 14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
 14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at ReceiverTracker.scala:275
 14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
 14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at ReceiverTracker.scala:275)
 14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which has no missing parents
 14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator at time 1406336130000
 14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at 1406336130000 ms
 14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
 14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks: Set(ResultTask(0, 0))
 14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
 14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
 14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
 14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750 bytes in 8 ms
 14/07/25 17:55:28 [INFO] Executor: Running task ID 0
 14/07/25 17:55:28 [INFO] Executor: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with timestamp 1406336128212
 14/07/25 17:55:28 [INFO] Utils: Fetching http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
 14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
 14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home directory
 java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine. So not using it.
 14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
 14/07/25 17:55:28 [INFO] Executor: Adding file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar to class loader
 14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator: org.apache.spark.examples.streaming.SerializationRegistry
 14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
 14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for BlockGenerator at time 1406336129000
 14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
 14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
 14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers initialized at:akka://spark/user/Supervisor0
 14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
 creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79, class akka.actor.ActorCell
14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream 0 from akka://spark
 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more
14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129000
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129200
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129400
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129600
 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336129800
 14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator called at time 1406336130000

On Jul 25, 2014, at 3:20 PM, Tathagata Das <[hidden email]> wrote:

Is this error on the executor or on the driver? Can you provide a larger snippet of the logs, driver as well as if possible executor logs. 

TD


On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <[hidden email]> wrote:
bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <[hidden email]> wrote:

it looks like when you configure sparkconfig to use the kryoserializer in combination of using an ActorReceiver, bad things happen.  I modified the ActorWordCount example program from 

    val sparkConf = new SparkConf().setAppName("ActorWordCount")

to

    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)

and I get the stack trace below.  I figured it might be that Kryo doesn’t know how to serialize/deserialize the actor so I added a registry.  I also added a default empty constructor to SampleActorReceiver just for kicks

class SerializationRegistry extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[SampleActorReceiver])
  }
}


case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
extends Actor with ActorHelper {
  def this() = this(“”)
  ...
}

...
    val sparkConf = new SparkConf()
      .setAppName("ActorWordCount")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrator", "org.apache.spark.examples.streaming.SerializationRegistry")


None of this worked, same stack trace.  Any idea what’s going on?  Is this a known issue and is there a workaround?  

 akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
at akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
at org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
at akka.util.Reflect$.instantiate(Reflect.scala:69)
at akka.actor.Props.cachedActorClass(Props.scala:203)
at akka.actor.Props.actorClass(Props.scala:327)
at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
... 20 more
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
... 24 more






Reply | Threaded
Open this post in threaded view
|

Re: spark streaming actor receiver doesn't play well with kryoserializer

originalsosa
This post has NOT been accepted by the mailing list yet.
In reply to this post by Rohit Rai
I'm having the same issue even without Kryoserializer. I'm using CDH 5.5.2 which runs Spark 1.5.0 and Akka 2.2.3. Have you resolved the issue?