PySpark 2.2.0, Kafka 0.10 DataFrames

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

PySpark 2.2.0, Kafka 0.10 DataFrames

salemi
Hi All,

we are trying to use DataFrames approach with Kafka 0.10 and PySpark 2.2.0.
We followed the instruction on the wiki
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.
We coded something similar to the code below using Python:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

But we are getting the the the exception below. Does PySpark 2.2.0 supports
DataFrames with Kafka 0.10? If yes, what could be the root cause for the
exception below?

Thank you,
Ali

Exception:
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.ClassNotFoundException: Failed to find data source: kafka.  
Please find packages at http://spark.apache.org/third-party-projects.html
        at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)
        at
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
        at
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:195)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
        at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
        at scala.util.Try$.apply(Try.scala:192)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
        at scala.util.Try.orElse(Try.scala:84)
        at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
        ... 18 more
 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: PySpark 2.2.0, Kafka 0.10 DataFrames

Holden Karau
What command did you use to launch your Spark application? The https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying documentation suggests using spark-submit with the `--packages` flag to include the required Kafka package. e.g.

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...



On Mon, Nov 20, 2017 at 3:07 PM, salemi <[hidden email]> wrote:
Hi All,

we are trying to use DataFrames approach with Kafka 0.10 and PySpark 2.2.0.
We followed the instruction on the wiki
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.
We coded something similar to the code below using Python:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

But we are getting the the the exception below. Does PySpark 2.2.0 supports
DataFrames with Kafka 0.10? If yes, what could be the root cause for the
exception below?

Thank you,
Ali

Exception:
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: java.lang.ClassNotFoundException: Failed to find data source: kafka.
Please find packages at http://spark.apache.org/third-party-projects.html
        at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:549)
        at
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
        at
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:195)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
        at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
        at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21$$anonfun$apply$12.apply(DataSource.scala:533)
        at scala.util.Try$.apply(Try.scala:192)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
        at
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$21.apply(DataSource.scala:533)
        at scala.util.Try.orElse(Try.scala:84)
        at
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
        ... 18 more




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Reply | Threaded
Open this post in threaded view
|

Re: PySpark 2.2.0, Kafka 0.10 DataFrames

Shixiong(Ryan) Zhu
In reply to this post by salemi
You are using Spark Streaming Kafka package. The correct package name is "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"

On Mon, Nov 20, 2017 at 4:15 PM, salemi <[hidden email]> wrote:
Yes, we are using --packages

$SPARK_HOME/bin/spark-submit --packages
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.2.0 --py-files shell.py



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]