Structured Streaming Spark 3.0.1

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming Spark 3.0.1

gshen
Hi all:

I am having a strange issue incorporating `groupBy` statements into a
structured streaming job when trying to write to Kafka or Delta. Weirdly it
only appears to work if I write to console, or to memory...

*I'm running Spark 3.0.1 with the following dependencies:
*
io.delta:delta-core_2.12:0.7.0
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
org.apache.hadoop:hadoop-azure:3.2.1"

*Here's a example of the pyspark job I've been testing with:*

/kafka = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","kafka-broker:29092")\
        .option("subscribe","test")\
        .option("startingOffsets", "earliest")\
        .option("maxOffsetsPerTrigger", 5) \
        .load()

rawDF = kafka.selectExpr("CAST(value AS STRING)")

groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))

kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")) AS
value")

kafka_stream_output \
    .writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", "kafka-broker:29092") \
    .option("topic", "sink") \
    .option("checkpointLocation", checkpoint_location) \
    .start()/
*
If I don't have a groupBy/aggregation, it's able to stream out to Kafka
perfectly fine; but when it's included, it writes a couple of messages to
the sink then throws an abstract error:*

Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

org.apache.spark.scheduler.TaskSetManager Task 1 in stage 1.0 failed 4
times; aborting
job"[org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec]
Data source write support
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@7337dcc is
aborting."


org.apache.spark.SparkException: Writing job aborted.
        at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataS
ourceV2Exec.scala:413)
        at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToData
SourceV2Exec.scala:361)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(Write
ToDataSourceV2Exec.scala:322)
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSo
urceV2Exec.scala:329)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2Command
Exec.scala:39)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:
39)
        at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExe
c.scala:45)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
        at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
        at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(Micro
BatchExecution.scala:576)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(Micro
BatchExecution.scala:571)

*I have tried using with and without watermark, and various output modes,
all seems to result in the same error.
*
Any help would be greatly appreciated!




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

gshen
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

German Schiavon Matteo
Hi, 

I couldn't reproduce this error :/ I wonder if there is something else underline causing it...

Input
➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}

Output
➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}


val rawDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.load
.selectExpr("CAST(value AS STRING)")


val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")

kafka_stream_output
.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.option("checkpointLocation", "/tmp/check")
.start()

spark.streams.awaitAnyTermination()

On Wed, 20 Jan 2021 at 23:22, gshen <[hidden email]> wrote:
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

Jungtaek Lim-2
I quickly looked into the attached log in SO post, and the problem doesn't seem to be related to Kafka. The error stack trace is from checkpointing to GCS, and the implementation of OutputStream for GCS seems to be provided with Google.

Could you please elaborate the stack trace or upload the log with redacting secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <[hidden email]> wrote:
Hi, 

I couldn't reproduce this error :/ I wonder if there is something else underline causing it...

Input
➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}

Output
➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}


val rawDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.load
.selectExpr("CAST(value AS STRING)")


val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")

kafka_stream_output
.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.option("checkpointLocation", "/tmp/check")
.start()

spark.streams.awaitAnyTermination()

On Wed, 20 Jan 2021 at 23:22, gshen <[hidden email]> wrote:
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

Gabor Somogyi
I've doubled checked this and came to the same conclusion just like Jungtaek.
I've added a comment to the stackoverflow post to reach more poeple with the answer.

G


On Thu, Jan 21, 2021 at 6:53 AM Jungtaek Lim <[hidden email]> wrote:
I quickly looked into the attached log in SO post, and the problem doesn't seem to be related to Kafka. The error stack trace is from checkpointing to GCS, and the implementation of OutputStream for GCS seems to be provided with Google.

Could you please elaborate the stack trace or upload the log with redacting secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <[hidden email]> wrote:
Hi, 

I couldn't reproduce this error :/ I wonder if there is something else underline causing it...

Input
➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}

Output
➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}


val rawDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.load
.selectExpr("CAST(value AS STRING)")


val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")

kafka_stream_output
.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.option("checkpointLocation", "/tmp/check")
.start()

spark.streams.awaitAnyTermination()

On Wed, 20 Jan 2021 at 23:22, gshen <[hidden email]> wrote:
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

Gabor Somogyi
If you have an exact version which version of the google connector is used then the source can be checked to see what really happened:

The linked code is the master but it just doesn't fit...

G


On Thu, Jan 21, 2021 at 9:18 AM Gabor Somogyi <[hidden email]> wrote:
I've doubled checked this and came to the same conclusion just like Jungtaek.
I've added a comment to the stackoverflow post to reach more poeple with the answer.

G


On Thu, Jan 21, 2021 at 6:53 AM Jungtaek Lim <[hidden email]> wrote:
I quickly looked into the attached log in SO post, and the problem doesn't seem to be related to Kafka. The error stack trace is from checkpointing to GCS, and the implementation of OutputStream for GCS seems to be provided with Google.

Could you please elaborate the stack trace or upload the log with redacting secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <[hidden email]> wrote:
Hi, 

I couldn't reproduce this error :/ I wonder if there is something else underline causing it...

Input
➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}

Output
➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}


val rawDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.load
.selectExpr("CAST(value AS STRING)")


val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")

kafka_stream_output
.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.option("checkpointLocation", "/tmp/check")
.start()

spark.streams.awaitAnyTermination()

On Wed, 20 Jan 2021 at 23:22, gshen <[hidden email]> wrote:
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

gshen
In reply to this post by Jungtaek Lim-2
I am now testing with to stream into a Delta table. Interestingly I have
gotten it working within a community version of Databricks, which leads me
to think there might be something to do with my dependencies. I am
checkpointing to ADLS Gen2 adding the following dependencies:

delta-core_2.12-0.7.0.jar
hadoop-azure-3.2.1.jar
hadoop-azure-datalake-3.2.1.jar
Rwildfly-openssl-java-1.1.3.Final.jar
spark-sql-kafka-0-10_2.12-3.0.1.jar
spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar
commons-pool2-2.8.0.jar
kafka-clients-0.10.2.2.jar

Here's a more detailed the stack trace:

{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:23,867
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 0.0 in stage 1.0 (TID 3, 10.1.88.2, executor 1):
org.apache.spark.util.TaskCompletionListene
rException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
"}

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,283
[org.apache.spark.scheduler.TaskSetManager
] Task 0 in stage 1.0 failed 4 times; aborting job"}

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,373
[org.apache.spark.sql.execution.datasource
s.FileFormatWriter] Aborting job 6115425c-9740-4e47-b2a1-e646c131e763."}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 1.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.1.28.7, executor
2): org.apache.spark.util.
TaskCompletionListenerException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:200
7)
        at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:97
3)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.
scala:973)
        at scala.Option.foreach(Option.scala:407)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:19
5)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite
.scala:162)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:134)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:116
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:107)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:106
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:99)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatch
Execution.scala:572)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatch
Execution.scala:570)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.sc
ala:570)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(M
icroBatchExecution.scala:223)
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(M
icroBatchExecution.scala:191)
        at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scal
a:57)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchEx
ecution.scala:185)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$stre
aming$StreamExecution$$runStream(StreamExecution.scala:334)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:2
45)
Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,389
[org.apache.spark.sql.execution.streaming.
MicroBatchExecution] Query [id = 94e815d4-294b-4d9c-bcd4-9c30c2557c0a, runId
= 4652cb0b-7f88-4a8b-bfc1-
aebb961249bb] terminated with error"}
org.apache.spark.SparkException: Job aborted.
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:22
6)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite
.scala:162)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:134)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:116
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:107)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:106
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:99)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at
org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatch
Execution.scala:572)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatch
Execution.scala:570)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.sc
ala:570)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(M
icroBatchExecution.scala:223)
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(M
icroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scal
a:57)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchEx
ecution.scala:185)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$stre
aming$StreamExecution$$runStream(StreamExecution.scala:334)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:2
45)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 faile
d 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11,
10.1.28.7, executor 2): org.apache.
spark.util.TaskCompletionListenerException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:200
7)
        at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:97
3)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.
scala:973)
        at scala.Option.foreach(Option.scala:407)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:19
5)
        ... 37 more
Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:26,462
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 82.3 in stage 1.0 (TID 12, 10.1.88.2, executor 1): TaskKilled
(Stage cancelled)"}
{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:26,559
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 1.3 in stage 1.0 (TID 13, 10.1.28.7, executor 2): TaskKilled
(Stage cancelled)"}
{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,623 [spark-job]
Something went wrong
. Exception: Job aborted.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

Jungtaek Lim-2
Looks like it's a driver side error log, and I think executor log would have much more warning/error logs and probably with stack traces.

I'd also suggest excluding the external dependency whatever possible while experimenting/investigating. If you're suspecting Apache Spark I'd rather say you'll want to stick with writing to Kafka on investigation, not changing to Delta Lake which adds the external dependency and harder to find where is the root cause.

Your dependencies are a bit odd. Could you please remove dependencies for spark-sql-kafka and try out "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1" in spark-submit/spark-shell instead?


On Fri, Jan 22, 2021 at 5:03 AM gshen <[hidden email]> wrote:
I am now testing with to stream into a Delta table. Interestingly I have
gotten it working within a community version of Databricks, which leads me
to think there might be something to do with my dependencies. I am
checkpointing to ADLS Gen2 adding the following dependencies:

delta-core_2.12-0.7.0.jar
hadoop-azure-3.2.1.jar
hadoop-azure-datalake-3.2.1.jar
Rwildfly-openssl-java-1.1.3.Final.jar
spark-sql-kafka-0-10_2.12-3.0.1.jar
spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar
commons-pool2-2.8.0.jar
kafka-clients-0.10.2.2.jar

Here's a more detailed the stack trace:

{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:23,867
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 0.0 in stage 1.0 (TID 3, 10.1.88.2, executor 1):
org.apache.spark.util.TaskCompletionListene
rException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
"}

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,283
[org.apache.spark.scheduler.TaskSetManager
] Task 0 in stage 1.0 failed 4 times; aborting job"}

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,373
[org.apache.spark.sql.execution.datasource
s.FileFormatWriter] Aborting job 6115425c-9740-4e47-b2a1-e646c131e763."}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 1.0 failed 4 times,
most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.1.28.7, executor
2): org.apache.spark.util.
TaskCompletionListenerException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:200
7)
        at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:97
3)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.
scala:973)
        at scala.Option.foreach(Option.scala:407)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:19
5)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite
.scala:162)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:134)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:116
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:107)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:106
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:99)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatch
Execution.scala:572)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatch
Execution.scala:570)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.sc
ala:570)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(M
icroBatchExecution.scala:223)
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(M
icroBatchExecution.scala:191)
        at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scal
a:57)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchEx
ecution.scala:185)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$stre
aming$StreamExecution$$runStream(StreamExecution.scala:334)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:2
45)
Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,389
[org.apache.spark.sql.execution.streaming.
MicroBatchExecution] Query [id = 94e815d4-294b-4d9c-bcd4-9c30c2557c0a, runId
= 4652cb0b-7f88-4a8b-bfc1-
aebb961249bb] terminated with error"}
org.apache.spark.SparkException: Job aborted.
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:22
6)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite
.scala:162)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:134)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:116
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:107)
        at
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:106
)
        at
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:80)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1(DeltaSink.scala:99)
        at
org.apache.spark.sql.delta.sources.DeltaSink.$anonfun$addBatch$1$adapted(DeltaSink.scala:54)
        at
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at
org.apache.spark.sql.delta.sources.DeltaSink.addBatch(DeltaSink.scala:54)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatch
Execution.scala:572)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scal
a:100)
        at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scal
a:87)
        at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatch
Execution.scala:570)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.sc
ala:570)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(M
icroBatchExecution.scala:223)
        at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.s
cala:352)
        at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.
scala:350)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.sca
la:69)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(M
icroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scal
a:57)
        at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchEx
ecution.scala:185)
        at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$stre
aming$StreamExecution$$runStream(StreamExecution.scala:334)
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:2
45)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 faile
d 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11,
10.1.28.7, executor 2): org.apache.
spark.util.TaskCompletionListenerException: Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:200
7)
        at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:97
3)
        at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.
scala:973)
        at scala.Option.foreach(Option.scala:407)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:19
5)
        ... 37 more
Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
        at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
        at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
        at org.apache.spark.scheduler.Task.run(Task.scala:143)
        at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:26,462
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 82.3 in stage 1.0 (TID 12, 10.1.88.2, executor 1): TaskKilled
(Stage cancelled)"}
{"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:26,559
[org.apache.spark.scheduler.TaskSetManager]
 Lost task 1.3 in stage 1.0 (TID 13, 10.1.28.7, executor 2): TaskKilled
(Stage cancelled)"}
{"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,623 [spark-job]
Something went wrong
. Exception: Job aborted.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

gshen
Thanks for the tips!

I think I figured out what might be causing it. It's the checkpointing to
Microsoft Azure Data Lake Storage (ADLS).

When I use "local checkpointing" it works, but when i use fails when there's
a groupBy in the stream. Weirdly it works when there is no groupBy clause in
the stream.

It's able to create the checkpoint location and the base files on ADLS, but
it can't write any commits. Hence, why I see the first batch of records, but
then it crashes on writing the first commit.

Here's what my checkpointing location looks like:

abfss://"+container_name+"@"+storage_account_name+".dfs.core.windows.net/"+file_name

and my SparkSession:

spark = pyspark.sql.SparkSession.builder\
    .appName("pyspark-stream-read")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.memory", "512m")\
    .config("spark.jars.packages",
"io.delta:delta-core_2.12:0.7.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.hadoop:hadoop-azure:3.2.1")
\
    .config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class",
"org.apache.spark.sql.delta.storage.AzureLogStore") \
    .getOrCreate()






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Spark 3.0.1

Gabor Somogyi
The most interesting part is that you've added this: kafka-clients-0.10.2.2.jar
Spark 3.0.1 uses Kafka clients 2.4.1. Downgrading with such a big step doesn't help. Please remove that also togrther w/ Spark-Kafka dependency.

G


On Thu, 21 Jan 2021, 22:45 gshen, <[hidden email]> wrote:
Thanks for the tips!

I think I figured out what might be causing it. It's the checkpointing to
Microsoft Azure Data Lake Storage (ADLS).

When I use "local checkpointing" it works, but when i use fails when there's
a groupBy in the stream. Weirdly it works when there is no groupBy clause in
the stream.

It's able to create the checkpoint location and the base files on ADLS, but
it can't write any commits. Hence, why I see the first batch of records, but
then it crashes on writing the first commit.

Here's what my checkpointing location looks like:

abfss://"+container_name+"@"+storage_account_name+".dfs.core.windows.net/"+file_name

and my SparkSession:

spark = pyspark.sql.SparkSession.builder\
    .appName("pyspark-stream-read")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.memory", "512m")\
    .config("spark.jars.packages",
"io.delta:delta-core_2.12:0.7.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.hadoop:hadoop-azure:3.2.1")
\
    .config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class",
"org.apache.spark.sql.delta.storage.AzureLogStore") \
    .getOrCreate()






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]