Spark Structured Streaming with PySpark throwing error in execution

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Structured Streaming with PySpark throwing error in execution

Mich Talebzadeh
Hi,

Trying to make PySpark with PyCharm work with Structured Streaming

spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0

Basic code

from __future__ import print_function
from src.config import config, hive_url
import sys
from sparkutils import sparkstuff as s

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        try:
            kafkaReaderWithHeaders = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \
            .writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
        kafkaReaderWithHeaders.printSchema()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    mdstreaming.startStreaming()

I have used the following jars in $SYBASE_HOME/jars

  spark-sql-kafka-0-10_2.12-3.0.1.jar
 kafka-clients-2.7.0.jar
 spark-streaming-kafka-0-10_2.12-3.0.1.jar
 spark-token-provider-kafka-0-10_2.12-3.0.1.jar

and also in $SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar


The error is this:

2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15]
   +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]

Process finished with exit code 1

The thing is that the class is in the jar file below in $SPARK_HOME/jars


find $SPARK_HOME/jars/  -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer


Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches


Appreciate any feedback.


Thanks


Mich

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Spark Structured Streaming with PySpark throwing error in execution

mmuru
You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar). 

On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

Trying to make PySpark with PyCharm work with Structured Streaming

spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0

Basic code

from __future__ import print_function
from src.config import config, hive_url
import sys
from sparkutils import sparkstuff as s

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        try:
            kafkaReaderWithHeaders = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \
            .writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
        kafkaReaderWithHeaders.printSchema()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    mdstreaming.startStreaming()

I have used the following jars in $SYBASE_HOME/jars

  spark-sql-kafka-0-10_2.12-3.0.1.jar
 kafka-clients-2.7.0.jar
 spark-streaming-kafka-0-10_2.12-3.0.1.jar
 spark-token-provider-kafka-0-10_2.12-3.0.1.jar

and also in $SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar


The error is this:

2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15]
   +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]

Process finished with exit code 1

The thing is that the class is in the jar file below in $SPARK_HOME/jars


find $SPARK_HOME/jars/  -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer


Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches


Appreciate any feedback.


Thanks


Mich

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Spark Structured Streaming with PySpark throwing error in execution

Mich Talebzadeh
Many thanks Muru. That was a great help!

------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|key                                 |value                                                                                                                |headers|
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY", "timeissued":"2021-02-20T19:10:18", "price":374.6} |null   |
|d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL", "timeissued":"2021-02-20T19:10:22", "price":19.24} |null   |
|1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW", "timeissued":"2021-02-20T19:10:25", "price":263.05} |null   |




Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 22 Feb 2021 at 22:46, muru <[hidden email]> wrote:
You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar). 

On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

Trying to make PySpark with PyCharm work with Structured Streaming

spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0

Basic code

from __future__ import print_function
from src.config import config, hive_url
import sys
from sparkutils import sparkstuff as s

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        try:
            kafkaReaderWithHeaders = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \
            .writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
        kafkaReaderWithHeaders.printSchema()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    mdstreaming.startStreaming()

I have used the following jars in $SYBASE_HOME/jars

  spark-sql-kafka-0-10_2.12-3.0.1.jar
 kafka-clients-2.7.0.jar
 spark-streaming-kafka-0-10_2.12-3.0.1.jar
 spark-token-provider-kafka-0-10_2.12-3.0.1.jar

and also in $SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar


The error is this:

2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15]
   +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]

Process finished with exit code 1

The thing is that the class is in the jar file below in $SPARK_HOME/jars


find $SPARK_HOME/jars/  -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer


Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches


Appreciate any feedback.


Thanks


Mich

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Spark Structured Streaming with PySpark throwing error in execution

Mich Talebzadeh
Hi all,

Following the upgrade to 3.1.1, I see a couple of issues.

Spark Structured Streaming (SSS) does not seem to work with the newer spark-sql-kafka-0-10_2.12-3.1.1.jar for Spark. It throws 


java.lang.NoSuchMethodError: org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z


So I have to use the previous jar file spark-sql-kafka-0-10_2.12-3.0.1.jar 

However, we can set aside that for now

The second point is that with the following jars under $SPARK_HOME/jars

  1. spark-sql-kafka-0-10_2.12-3.0.1.jar
  2. commons-pool2-2.9.0.jar
  3. kafka-clients-2.7.0.jar

The SSS job runs in local mode as a single JVM

In Yarn mode this fails with the following error

java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer

Even other executors running on the same node fail as well. with the above error.

I have ensured that those jar files are available on all three nodes of the cluster (on-prem)  but still no luck,

Any ideas appreciated.

Thanks



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 22 Feb 2021 at 22:55, Mich Talebzadeh <[hidden email]> wrote:
Many thanks Muru. That was a great help!

------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|key                                 |value                                                                                                                |headers|
+------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
|b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY", "timeissued":"2021-02-20T19:10:18", "price":374.6} |null   |
|d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL", "timeissued":"2021-02-20T19:10:22", "price":19.24} |null   |
|1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW", "timeissued":"2021-02-20T19:10:25", "price":263.05} |null   |




Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 22 Feb 2021 at 22:46, muru <[hidden email]> wrote:
You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar). 

On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <[hidden email]> wrote:
Hi,

Trying to make PySpark with PyCharm work with Structured Streaming

spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0

Basic code

from __future__ import print_function
from src.config import config, hive_url
import sys
from sparkutils import sparkstuff as s

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def startStreaming(self):
        self.sc.setLogLevel("ERROR")
        try:
            kafkaReaderWithHeaders = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \
            .writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
        kafkaReaderWithHeaders.printSchema()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    mdstreaming.startStreaming()

I have used the following jars in $SYBASE_HOME/jars

  spark-sql-kafka-0-10_2.12-3.0.1.jar
 kafka-clients-2.7.0.jar
 spark-streaming-kafka-0-10_2.12-3.0.1.jar
 spark-token-provider-kafka-0-10_2.12-3.0.1.jar

and also in $SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar


The error is this:

2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
+- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15]
   +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]

Process finished with exit code 1

The thing is that the class is in the jar file below in $SPARK_HOME/jars


find $SPARK_HOME/jars/  -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer


Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches


Appreciate any feedback.


Thanks


Mich

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.