Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

gaurav.dasgupta
Hi,

I have written my own custom Spark streaming code which connects to Kafka server and fetch data. I have tested the code on local mode and it is working fine. But when I am executing the same code on YARN mode, I am getting KafkaReceiver class not found exception. I am providing the Spark Kafka jar in the classpath and ensured that the path is correct for all the nodes in my cluster.

I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10 node cluster) in the YARN cluster.
I am using the following command to run my code on YARN mode:

SPARK_YARN_MODE=true SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/ SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic NewTestTable 1

Below is the error message I am getting:

14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set 2.0 with 1 tasks
14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2 ms
14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)
14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:247)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)


What might be the problem? Can someone help me solving this issue?

Regards,
Gaurav
Reply | Threaded
Open this post in threaded view
|

Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

Tobias Pfeiffer
Gaurav,

I am not sure that the "*" expands to what you expect it to do.
Normally the bash expands "*" to a space-separated string, not
colon-separated. Try specifying all the jars manually, maybe?

Tobias

On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta <[hidden email]> wrote:

> Hi,
>
> I have written my own custom Spark streaming code which connects to Kafka
> server and fetch data. I have tested the code on local mode and it is
> working fine. But when I am executing the same code on YARN mode, I am
> getting KafkaReceiver class not found exception. I am providing the Spark
> Kafka jar in the classpath and ensured that the path is correct for all the
> nodes in my cluster.
>
> I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10
> node cluster) in the YARN cluster.
> I am using the following command to run my code on YARN mode:
>
> SPARK_YARN_MODE=true
> SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
> SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
> /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
> SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
> NewTestTable 1
>
> Below is the error message I am getting:
>
> 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
> 2.0 with 1 tasks
> 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
> 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
> 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
> 2971 bytes in 2 ms
> 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)
> 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaReceiver
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:247)
>     at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>     at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>     at
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
>     at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
>     at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>     at
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
>     at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:396)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:662)
>
> What might be the problem? Can someone help me solving this issue?
>
> Regards,
> Gaurav
Reply | Threaded
Open this post in threaded view
|

Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

gaurav.dasgupta
Thanks Tobias for replying.

The problem was that, I have to provide the dependency jars' paths to the StreamingContext within the code. So, providing all the jar paths, resolved my problem. Refer the below code snippet:

JavaStreamingContext ssc = new JavaStreamingContext(args[0],
                "SparkStreamExample", new Duration(1000),
                System.getenv("SPARK_HOME"),
                new String[] {JavaStreamingContext.jarOfClass(SparkStreamExample.class)[0],
            "/usr/local/spark-0.9.1-bin-hadoop2/external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar",
            "/usr/lib/hbase/lib/zookeeper.jar", "/usr/local/kafka/kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar",
            "/usr/local/kafka/kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar",
            "/usr/local/scala/lib/scala-library.jar",
            "/usr/local/shark-0.9.1-bin-hadoop2/lib_managed/jars/com.yammer.metrics/metrics-core/metrics-core-2.1.2.jar",
            "/usr/local/hbase.jar"});


The question is that isn't there any other way of doing this? The above approach doesn't seem good to me. For example, what if I execute the application on some other cluster where dependency paths are different? It is also not feasible to parametrize these jar paths as user arguments.

Any advise will be appreciated.

Regards,
Gaurav

On Mon, Jun 9, 2014 at 6:23 AM, Tobias Pfeiffer [via Apache Spark User List] <[hidden email]> wrote:
Gaurav,

I am not sure that the "*" expands to what you expect it to do.
Normally the bash expands "*" to a space-separated string, not
colon-separated. Try specifying all the jars manually, maybe?

Tobias

On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta <[hidden email]> wrote:

> Hi,
>
> I have written my own custom Spark streaming code which connects to Kafka
> server and fetch data. I have tested the code on local mode and it is
> working fine. But when I am executing the same code on YARN mode, I am
> getting KafkaReceiver class not found exception. I am providing the Spark
> Kafka jar in the classpath and ensured that the path is correct for all the
> nodes in my cluster.
>
> I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10
> node cluster) in the YARN cluster.
> I am using the following command to run my code on YARN mode:
>
> SPARK_YARN_MODE=true
> SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
> SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
> /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
> SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
> NewTestTable 1
>
> Below is the error message I am getting:
>
> 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
> 2.0 with 1 tasks
> 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
> 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
> 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
> 2971 bytes in 2 ms
> 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)
> 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaReceiver
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:247)
>     at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>     at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>     at
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
>     at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
>     at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>     at
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
>     at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:396)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:662)
>
> What might be the problem? Can someone help me solving this issue?
>
> Regards,
> Gaurav



To start a new topic under Apache Spark User List, email [hidden email]
To unsubscribe from Apache Spark User List, click here.
NAML