Spark streaming with Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark streaming with Kafka

dwgw
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error.
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
        found org.apache.kafka#kafka-clients;2.4.1 in central
        found com.github.luben#zstd-jni;1.4.4-3 in central
        found org.lz4#lz4-java;1.7.1 in central
        found org.xerial.snappy#snappy-java;1.1.7.5 in central
        found org.slf4j#slf4j-api;1.7.30 in central
        found org.spark-project.spark#unused;1.0.0 in central
        found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.4-3 from central in [default]
        org.apache.commons#commons-pool2;2.6.2 from central in [default]
        org.apache.kafka#kafka-clients;2.4.1 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
        org.lz4#lz4-java;1.7.1 from central in [default]
        org.slf4j#slf4j-api;1.7.30 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
       
---------------------------------------------------------------------
        |                  |            modules            ||   artifacts  
|
        |       conf       | number| search|dwnlded|evicted||
number|dwnlded|
       
---------------------------------------------------------------------
        |      default     |   9   |   0   |   0   |   0   ||   9   |   0  
|
       
---------------------------------------------------------------------
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
        confs: [default]
        0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/
         
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
     | readStream.
     | format("kafka").
     | option("kafka.bootstrap.servers", "XXX").
     | option("subscribe", "XXX").
     | option("kafka.sasl.mechanisms", "XXX").
     | option("kafka.security.protocol", "XXX").
     | option("kafka.sasl.username","XXX").
     | option("kafka.sasl.password", "XXX").
     | option("startingOffsets", "earliest").
     | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming with Kafka

Jungtaek Lim-2
I can't reproduce. Could you please make sure you're running spark-shell with official spark 3.0.0 distribution? Please try out changing the directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw <[hidden email]> wrote:
Hi
I am trying to stream kafka topic from spark shell but i am getting the
following error.
I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
*Java 1.8.0_212*)

*[spark@hdp-dev ~]$ spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
:: loading settings :: url =
jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in
central
        found org.apache.kafka#kafka-clients;2.4.1 in central
        found com.github.luben#zstd-jni;1.4.4-3 in central
        found org.lz4#lz4-java;1.7.1 in central
        found org.xerial.snappy#snappy-java;1.1.7.5 in central
        found org.slf4j#slf4j-api;1.7.30 in central
        found org.spark-project.spark#unused;1.0.0 in central
        found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 502ms :: artifacts dl 10ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.4-3 from central in [default]
        org.apache.commons#commons-pool2;2.6.2 from central in [default]
        org.apache.kafka#kafka-clients;2.4.1 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
[default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
central in [default]
        org.lz4#lz4-java;1.7.1 from central in [default]
        org.slf4j#slf4j-api;1.7.30 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]

---------------------------------------------------------------------
        |                  |            modules            ||   artifacts 
|
        |       conf       | number| search|dwnlded|evicted||
number|dwnlded|

---------------------------------------------------------------------
        |      default     |   9   |   0   |   0   |   0   ||   9   |   0 
|

---------------------------------------------------------------------
:: retrieving ::
org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
        confs: [default]
        0 artifacts copied, 9 already retrieved (0kB/13ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http://hdp-dev.infodetics.com:4040
Spark context available as 'sc' (master = yarn, app id =
application_1593620640299_0015).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val df = spark.
     | readStream.
     | format("kafka").
     | option("kafka.bootstrap.servers", "XXX").
     | option("subscribe", "XXX").
     | option("kafka.sasl.mechanisms", "XXX").
     | option("kafka.security.protocol", "XXX").
     | option("kafka.sasl.username","XXX").
     | option("kafka.sasl.password", "XXX").
     | option("startingOffsets", "earliest").
     | load
java.lang.AbstractMethodError: Method
org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
is abstract
  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
  at
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
  ... 57 elided

Looking forward for a response.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming with Kafka

dwgw
Hi

I am able to correct the issue. The issue was due to wrong version of JAR
file I have used. I have removed the these JAR files and copied correct
version of JAR files and the error has gone away.

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]