Data source v2 streaming sinks does not support Update mode

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Data source v2 streaming sinks does not support Update mode

Eric Beabes
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?


Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Jacek Laskowski
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?


Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Screen Shot 2021-01-12 at 7.32.55 PM.png (550K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Jungtaek Lim-2
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Jungtaek Lim-2
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Gabor Somogyi
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

pom.xml (9K) Download Attachment
QueryListener.scala (788 bytes) Download Attachment
Spark3Test.scala (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Gabor Somogyi
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

German Schiavon Matteo
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Jungtaek Lim-2
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <[hidden email]> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Jungtaek Lim-2
I see no issue from running this code in local dev. (changed the scope of Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In 3.0.0-preview update mode was restricted (as the error message says) and it was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your .m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <[hidden email]> wrote:
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <[hidden email]> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
>> "Could you please make sure you're not using "3.0.0-preview". 

This could be the reason. I will check with our Hadoop cluster administrator. It's quite possible that they installed the "Preview" mode. Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <[hidden email]> wrote:
I see no issue from running this code in local dev. (changed the scope of Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In 3.0.0-preview update mode was restricted (as the error message says) and it was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your .m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <[hidden email]> wrote:
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <[hidden email]> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Eric Beabes
Confirmed. The cluster Admin said his team installed the latest version from Cloudera which comes with Spark 3.0.0-preview2. They are going to try to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <[hidden email]> wrote:
>> "Could you please make sure you're not using "3.0.0-preview". 

This could be the reason. I will check with our Hadoop cluster administrator. It's quite possible that they installed the "Preview" mode. Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <[hidden email]> wrote:
I see no issue from running this code in local dev. (changed the scope of Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In 3.0.0-preview update mode was restricted (as the error message says) and it was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your .m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <[hidden email]> wrote:
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <[hidden email]> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <[hidden email]> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <[hidden email]> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <[hidden email]> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <[hidden email]> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <[hidden email]> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <[hidden email]> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <[hidden email]> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <[hidden email]> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Data source v2 streaming sinks does not support Update mode

Gabor Somogyi
Thanks for double checking the version. Please report back with 3.1 version whether it works or not.

G


On Tue, 19 Jan 2021, 07:41 Eric Beabes, <[hidden email]> wrote:
Confirmed. The cluster Admin said his team installed the latest version from Cloudera which comes with Spark 3.0.0-preview2. They are going to try to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <[hidden email]> wrote:
>> "Could you please make sure you're not using "3.0.0-preview". 

This could be the reason. I will check with our Hadoop cluster administrator. It's quite possible that they installed the "Preview" mode. Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <[hidden email]> wrote:
I see no issue from running this code in local dev. (changed the scope of Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In 3.0.0-preview update mode was restricted (as the error message says) and it was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your .m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <[hidden email]> wrote:
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <[hidden email]> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <[hidden email]> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <[hidden email]> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <[hidden email]> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>