|
|
Hi,
Trying to make PySpark with PyCharm work with Structured Streaming
spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0
Basic code
from __future__ import print_function from src.config import config, hive_url import sys from sparkutils import sparkstuff as s
class MDStreaming: def __init__(self, spark_session,spark_context): self.spark = spark_session self.config = config
def startStreaming(self): self.sc.setLogLevel("ERROR") try: kafkaReaderWithHeaders = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("subscribe", config['MDVariables']['topic']) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "earliest") \ .load() except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \ .writeStream \ .format("console") \ .option("truncate","false") \ .start() \ .awaitTermination() kafkaReaderWithHeaders.printSchema()
if __name__ == "__main__": appName = config['common']['appName'] spark_session = s.spark_session(appName) spark_context = s.sparkcontext() mdstreaming = MDStreaming(spark_session, spark_context) mdstreaming.startStreaming()
I have used the following jars in $SYBASE_HOME/jars
spark-sql-kafka-0-10_2.12-3.0.1.jar
kafka-clients-2.7.0.jar spark-streaming-kafka-0-10_2.12-3.0.1.jar spark-token-provider-kafka-0-10_2.12-3.0.1.jar
and also in $SPARK_HOME/conf/spark-defaults.conf
spark.driver.extraClassPath $SPARK_HOME/jars/*.jar spark.executor.extraClassPath $SPARK_HOME/jars/*.jar
The error is this:
2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3) java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$ at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52) at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
pyspark.sql.utils.StreamingQueryException: Writing job aborted. === Streaming Query === Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36] Current Committed Offsets: {} Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
Current State: ACTIVE Thread State: RUNNABLE
Logical Plan: WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false] +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15] +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]
Process finished with exit code 1
The thing is that the class is in the jar file below in $SPARK_HOME/jars
find $SPARK_HOME/jars/ -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches
Appreciate any feedback.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar). On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh < [hidden email]> wrote: Hi,
Trying to make PySpark with PyCharm work with Structured Streaming
spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0
Basic code
from __future__ import print_function from src.config import config, hive_url import sys from sparkutils import sparkstuff as s
class MDStreaming: def __init__(self, spark_session,spark_context): self.spark = spark_session self.config = config
def startStreaming(self): self.sc.setLogLevel("ERROR") try: kafkaReaderWithHeaders = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("subscribe", config['MDVariables']['topic']) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "earliest") \ .load() except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \ .writeStream \ .format("console") \ .option("truncate","false") \ .start() \ .awaitTermination() kafkaReaderWithHeaders.printSchema()
if __name__ == "__main__": appName = config['common']['appName'] spark_session = s.spark_session(appName) spark_context = s.sparkcontext() mdstreaming = MDStreaming(spark_session, spark_context) mdstreaming.startStreaming()
I have used the following jars in $SYBASE_HOME/jars
spark-sql-kafka-0-10_2.12-3.0.1.jar
kafka-clients-2.7.0.jar spark-streaming-kafka-0-10_2.12-3.0.1.jar spark-token-provider-kafka-0-10_2.12-3.0.1.jar
and also in $SPARK_HOME/conf/spark-defaults.conf
spark.driver.extraClassPath $SPARK_HOME/jars/*.jar spark.executor.extraClassPath $SPARK_HOME/jars/*.jar
The error is this:
2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3) java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$ at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52) at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
pyspark.sql.utils.StreamingQueryException: Writing job aborted. === Streaming Query === Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36] Current Committed Offsets: {} Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
Current State: ACTIVE Thread State: RUNNABLE
Logical Plan: WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false] +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15] +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]
Process finished with exit code 1
The thing is that the class is in the jar file below in $SPARK_HOME/jars
find $SPARK_HOME/jars/ -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches
Appreciate any feedback.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|
Many thanks Muru. That was a great help!
------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+ |key |value |headers| +------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+ |b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY", "timeissued":"2021-02-20T19:10:18", "price":374.6} |null | |d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL", "timeissued":"2021-02-20T19:10:22", "price":19.24} |null | |1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW", "timeissued":"2021-02-20T19:10:25", "price":263.05} |null |
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
You should include commons-pool2-2.9.0.jar and remove spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).
On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh < [hidden email]> wrote: Hi,
Trying to make PySpark with PyCharm work with Structured Streaming
spark-3.0.1-bin-hadoop3.2
kafka_2.12-1.1.0
Basic code
from __future__ import print_function from src.config import config, hive_url import sys from sparkutils import sparkstuff as s
class MDStreaming: def __init__(self, spark_session,spark_context): self.spark = spark_session self.config = config
def startStreaming(self): self.sc.setLogLevel("ERROR") try: kafkaReaderWithHeaders = self.spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \ .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \ .option("group.id", config['common']['appName']) \ .option("subscribe", config['MDVariables']['topic']) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "earliest") \ .load() except Exception as e: print(f"""{e}, quitting""") sys.exit(1)
kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") \ .writeStream \ .format("console") \ .option("truncate","false") \ .start() \ .awaitTermination() kafkaReaderWithHeaders.printSchema()
if __name__ == "__main__": appName = config['common']['appName'] spark_session = s.spark_session(appName) spark_context = s.sparkcontext() mdstreaming = MDStreaming(spark_session, spark_context) mdstreaming.startStreaming()
I have used the following jars in $SYBASE_HOME/jars
spark-sql-kafka-0-10_2.12-3.0.1.jar
kafka-clients-2.7.0.jar spark-streaming-kafka-0-10_2.12-3.0.1.jar spark-token-provider-kafka-0-10_2.12-3.0.1.jar
and also in $SPARK_HOME/conf/spark-defaults.conf
spark.driver.extraClassPath $SPARK_HOME/jars/*.jar spark.executor.extraClassPath $SPARK_HOME/jars/*.jar
The error is this:
2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3) java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$ at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52) at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
pyspark.sql.utils.StreamingQueryException: Writing job aborted. === Streaming Query === Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId = d61d9807-6f6c-4de1-a60f-8ae31c8a3c36] Current Committed Offsets: {} Current Available Offsets: {KafkaV2[Subscribe[md]]: {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
Current State: ACTIVE Thread State: RUNNABLE
Logical Plan: WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false] +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS value#25, headers#15] +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d, KafkaV2[Subscribe[md]]
Process finished with exit code 1
The thing is that the class is in the jar file below in $SPARK_HOME/jars
find $SPARK_HOME/jars/ -name "*.jar" | xargs grep org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches
Appreciate any feedback.
Thanks
Mich
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
|
|