Structured Streaming: mapGroupsWithState UDT serialization does not work

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming: mapGroupsWithState UDT serialization does not work

bryan.jeffrey@gmail.com
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

bryan.jeffrey@gmail.com
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

Tathagata Das
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

bryan.jeffrey@gmail.com
Perfect. I'll give this a shot and report back.


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 6:23:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Fwd: Structured Streaming: mapGroupsWithState UDT serialization does not work

bryan.jeffrey@gmail.com
In reply to this post by Tathagata Das

Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = TimestampType

override def serialize(obj: DateTime): Long = {
obj.getMillis
}

def deserialize(datum: Any): DateTime = {
datum match {
case value: Long => new DateTime(value, DateTimeZone.UTC)
}
}

override def userClass: Class[DateTime] = classOf[DateTime]

private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName)
}
}

This did not resolve the problem.  The results remain the same:

org.scalatest.exceptions.TestFailedException: Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

I included a couple of other test cases to validate that the UDT works fine:

"the joda time serializer" should "serialize and deserialize as expected" in {
val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val serializer = new JodaTimeUDT()
val serialized = serializer.serialize(input)
val deserialized = serializer.deserialize(serialized)

deserialized should be(input)
}

it should "correctly implement dataframe serialization & deserialization in data frames" in {
val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
val sqlContext = session.sqlContext
import sqlContext.implicits._
val ds = input.toDF().as[FooWithDate]
val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, Minutes(1)), x.s, x.i + 1)).collect()
val expected = List(FooWithDate(datePlusOne, "Foo", 2), FooWithDate(datePlusOne, "Foo", 4))

result should contain theSameElementsAs expected
}

Any other thoughts?

On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <[hidden email]> wrote:
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

Jungtaek Lim-2
I've investigated a bit, and looks like it's not an issue of mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It seems to miss handling UDT and the missing spot makes the internal code of Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)

I've filed an issue (sorry I missed you've already filed an issue) and submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993

It would be nice if you can try out my patch and see whether it fixes your issue (I've already copied your code and made it pass, but would like to double check). Thanks for reporting!

On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey <[hidden email]> wrote:

Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = TimestampType

override def serialize(obj: DateTime): Long = {
obj.getMillis
}

def deserialize(datum: Any): DateTime = {
datum match {
case value: Long => new DateTime(value, DateTimeZone.UTC)
}
}

override def userClass: Class[DateTime] = classOf[DateTime]

private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName)
}
}

This did not resolve the problem.  The results remain the same:

org.scalatest.exceptions.TestFailedException: Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

I included a couple of other test cases to validate that the UDT works fine:

"the joda time serializer" should "serialize and deserialize as expected" in {
val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val serializer = new JodaTimeUDT()
val serialized = serializer.serialize(input)
val deserialized = serializer.deserialize(serialized)

deserialized should be(input)
}

it should "correctly implement dataframe serialization & deserialization in data frames" in {
val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
val sqlContext = session.sqlContext
import sqlContext.implicits._
val ds = input.toDF().as[FooWithDate]
val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, Minutes(1)), x.s, x.i + 1)).collect()
val expected = List(FooWithDate(datePlusOne, "Foo", 2), FooWithDate(datePlusOne, "Foo", 4))

result should contain theSameElementsAs expected
}

Any other thoughts?

On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <[hidden email]> wrote:
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

Jungtaek Lim-2
Forgot to mention - it only occurs the SQL type of UDT is having fixed length. If the UDT is used to represent complex type like array, struct, or even string, it doesn't trigger the issue. So that's like an edge-case and the chance of encountering this issue may not be that huge, and that's why this issue pops up now whereas the relevant code lives very long time.

On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim <[hidden email]> wrote:
I've investigated a bit, and looks like it's not an issue of mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It seems to miss handling UDT and the missing spot makes the internal code of Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)

I've filed an issue (sorry I missed you've already filed an issue) and submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993

It would be nice if you can try out my patch and see whether it fixes your issue (I've already copied your code and made it pass, but would like to double check). Thanks for reporting!

On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey <[hidden email]> wrote:

Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = TimestampType

override def serialize(obj: DateTime): Long = {
obj.getMillis
}

def deserialize(datum: Any): DateTime = {
datum match {
case value: Long => new DateTime(value, DateTimeZone.UTC)
}
}

override def userClass: Class[DateTime] = classOf[DateTime]

private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName)
}
}

This did not resolve the problem.  The results remain the same:

org.scalatest.exceptions.TestFailedException: Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

I included a couple of other test cases to validate that the UDT works fine:

"the joda time serializer" should "serialize and deserialize as expected" in {
val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val serializer = new JodaTimeUDT()
val serialized = serializer.serialize(input)
val deserialized = serializer.deserialize(serialized)

deserialized should be(input)
}

it should "correctly implement dataframe serialization & deserialization in data frames" in {
val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
val sqlContext = session.sqlContext
import sqlContext.implicits._
val ds = input.toDF().as[FooWithDate]
val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, Minutes(1)), x.s, x.i + 1)).collect()
val expected = List(FooWithDate(datePlusOne, "Foo", 2), FooWithDate(datePlusOne, "Foo", 4))

result should contain theSameElementsAs expected
}

Any other thoughts?

On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <[hidden email]> wrote:
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey






Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

bryan.jeffrey@gmail.com
Jungtaek,

Thank you for taking a look at this issue. I would be happy to test your patch - but I am unsure how to do so.  Can you help me to understand how I can:
1. Obtain an updated version of the JAR in question either via a nightly build or by building myself?
2. Substitute this JAR, ideally via changing a dependency in our code to point to a local dependency (via Maven?) 
3. Determine how I can substitute the JAR on the Spark cluster via changes to our Maven dependency selection or less ideally by replacing the JAR on a Spark cluster?

I would appreciate any pointers in this area as I'd like to come up to speed on the right way to validate changes and perhaps contribute myself.

Regards,

Bryan Jeffrey

On Sat, Feb 29, 2020 at 9:52 AM Jungtaek Lim <[hidden email]> wrote:
Forgot to mention - it only occurs the SQL type of UDT is having fixed length. If the UDT is used to represent complex type like array, struct, or even string, it doesn't trigger the issue. So that's like an edge-case and the chance of encountering this issue may not be that huge, and that's why this issue pops up now whereas the relevant code lives very long time.

On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim <[hidden email]> wrote:
I've investigated a bit, and looks like it's not an issue of mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It seems to miss handling UDT and the missing spot makes the internal code of Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)

I've filed an issue (sorry I missed you've already filed an issue) and submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993

It would be nice if you can try out my patch and see whether it fixes your issue (I've already copied your code and made it pass, but would like to double check). Thanks for reporting!

On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey <[hidden email]> wrote:

Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = TimestampType

override def serialize(obj: DateTime): Long = {
obj.getMillis
}

def deserialize(datum: Any): DateTime = {
datum match {
case value: Long => new DateTime(value, DateTimeZone.UTC)
}
}

override def userClass: Class[DateTime] = classOf[DateTime]

private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = {
UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName)
}
}

This did not resolve the problem.  The results remain the same:

org.scalatest.exceptions.TestFailedException: Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

I included a couple of other test cases to validate that the UDT works fine:

"the joda time serializer" should "serialize and deserialize as expected" in {
val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val serializer = new JodaTimeUDT()
val serialized = serializer.serialize(input)
val deserialized = serializer.deserialize(serialized)

deserialized should be(input)
}

it should "correctly implement dataframe serialization & deserialization in data frames" in {
val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
val sqlContext = session.sqlContext
import sqlContext.implicits._
val ds = input.toDF().as[FooWithDate]
val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, Minutes(1)), x.s, x.i + 1)).collect()
val expected = List(FooWithDate(datePlusOne, "Foo", 2), FooWithDate(datePlusOne, "Foo", 4))

result should contain theSameElementsAs expected
}

Any other thoughts?

On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <[hidden email]> wrote:
Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. 


The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType` on the class definition. You dont seem to have done it, maybe thats the reason? 

I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. 

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <[hidden email]> wrote:
Tathagata,

The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations).

We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by.

Regards,

Bryan Jeffrey


From: Tathagata Das <[hidden email]>
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work
 
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? 

Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT.

TD

On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <[hidden email]> wrote:
Hello.  

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda.  This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state.

I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
override def sqlType: DataType = LongType
override def serialize(obj: DateTime): Long = obj.getMillis
def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) }
override def userClass: Class[DateTime] = classOf[DateTime]
private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) }
}

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

@RunWith(classOf[JUnitRunner])
class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll {
val application = this.getClass.getName
var session: SparkSession = _

override def beforeAll(): Unit = {
System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
val sparkConf = new SparkConf()
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.testing", "true")
.set("spark.memory.fraction", "1")
.set("spark.ui.enabled", "false")
.set("spark.streaming.gracefulStopTimeout", "1000")
.setAppName(application).setMaster("local[*]")


session = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setCheckpointDir("/")
JodaTimeUDTRegister.register
}

override def afterAll(): Unit = {
session.stop()
}

it should "work correctly for a streaming input with stateful transformation" in {
val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
val sqlContext = session.sqlContext
import sqlContext.implicits._

val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3))
val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext)
streamInput.addData(input)
val ds: Dataset[FooWithDate] = streamInput.toDS()

val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
val result: Dataset[FooWithDate] = ds
.groupByKey(x => x.i)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
val writeTo = s"random_table_name"

result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect()
val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6))
combinedResults should contain theSameElementsAs(expected)
}
}

object TestJodaTimeUdt {
def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = {
if (state.hasTimedOut) {
state.remove()
state.getOption.get
} else {
val inputsSeq: Seq[FooWithDate] = inputs.toSeq
val startingState = state.getOption.getOrElse(inputsSeq.head)
val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail
val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

state.update(updatedFoo)
state.setTimeoutDuration("1 minute")
updatedFoo
}
}

def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i)
}

The test output shows the invalid date:

org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as
Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey