RE: another updateStateByKey question - updated w possible Spark bug

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

RE: another updateStateByKey question - updated w possible Spark bug

amoc

I’ve encountered this issue again and am able to reproduce it about 10% of the time.

 

1. Here is the input:

RDD[ (a, 1262325660000, 1), (a, 1262325660000, 2) ]  

RDD[ (a, 1262325660000, 1), (a, 1262325660000, 3) ]

RDD[ (a, 1262325660000, 3) ]

RDD[ (a, 1262325660000, 4) ]

RDD[ (a, 1262325660000, 2) ]

RDD[ (a, 1262325660000, 5), (a, 1262325660000, 5) ]

 

2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line):

(((a,1262325660000),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)

(((a,1262325660000),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)

(((a,1262325660000),StateClass(10,5,ArrayBuffer(3.0))),20)

(((a,1262325660000),StateClass(10,5,ArrayBuffer())),26)   <-empty elements Seq[V]

(((a,1262325660000),StateClass(14,6,ArrayBuffer(4.0))),33)

(((a,1262325660000),StateClass(16,7,ArrayBuffer(2.0))),40)

(((a,1262325660000),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

(((a,1262325660000),StateClass(26,9,ArrayBuffer())),53)  <-empty elements Seq[V]

(((a,1262325660000),StateClass(26,9,ArrayBuffer())),59)  <-empty elements Seq[V]

 

3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] )

(((a,1262325660000),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)

(((a,1262325660000),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)

(((a,1262325660000),StateClass(10,5,ArrayBuffer(3.0))),20)

(((a,1262325660000),StateClass(14,6,ArrayBuffer(4.0))),33)

(((a,1262325660000),StateClass(16,7,ArrayBuffer(2.0))),40)

(((a,1262325660000),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

 

4. Here is the code:

    case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

 

    val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) => {

//      if (values.isEmpty) {

//        // if RDD cannot find values for this key (which is from prev RDD,

//        // the tuple will not be shown in this RDD w values of 0

//        None

//      } else {

        val previousState = state.getOrElse(StateClass(0, 0, Seq()))

        val currentCount = values.size + previousState.count

        var currentSum=0

        for (newValue <- values) yield ({

          currentSum = currentSum + newValue._3

        })

        currentSum= currentSum +previousState.sum

        val elements = for (newValues <- values) yield ({

          newValues._3.toDouble

        })

        Some(StateClass(currentSum, currentCount, elements))

//      }

    }

 

val partialResultSums= inputStream.map((x:(String, Long, Int)) =>((x._1), (x._1, x._2, x._3)))  //re map

.updateStateByKey[StateClass](updateSumFunc)  //update state

.transform(rdd=>rdd.map(t=>(t,rdd.id)))   //add RDD ID to RDD tuples

 

partialResultSums.print()

 

Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces:

 

    rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = List[(String, Long, Int)]( (a, 1262325660000, 1), (a, 1262325660000, 2) )

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= List[(String, Long, Int)]((a, 1262325660000, 1), (a, 1262325660000, 3))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= List[(String, Long, Int)]((a, 1262325660000, 3))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= List[(String, Long, Int)]((a, 1262325660000, 4))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= List[(String, Long, Int)]((a, 1262325660000, 2))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= List[(String, Long, Int)]((a, 1262325660000, 5), (a, 1262325660000, 5))

 

    Thread.sleep(3100)

    //ssc.awaitTermination()

    ssc.stop()

 

In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up.

I wonder how to bypass this bug/feature of Spark.

 

Thanks

-Adrian

From: Tathagata Das [mailto:[hidden email]]
Sent: May-02-14 3:10 PM
To: [hidden email]
Cc: [hidden email]
Subject: Re: another updateStateByKey question

 

Could be a bug. Can you share a code with data that I can use to reproduce this?

TD

On May 2, 2014 9:49 AM, "Adrian Mocanu" <[hidden email]> wrote:

Has anyone else noticed that sometimes the same tuple calls update state function twice?

I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]

When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2))

Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

 

In the update function I also save Seq[V] to state so I can see it in the RDD. I also show a count and sum of the values.

StateClass(sum, count, Seq[V])

 

Why is the update function called with empty Seq[V] on the same key when all values for that key have been already taken care of in a previous update?

 

-Adrian

 

Reply | Threaded
Open this post in threaded view
|

RE: another updateStateByKey question - updated w possible Spark bug

amoc

Forgot to mention my batch interval is 1 second:

val ssc = new StreamingContext(conf, Seconds(1))

hence the Thread.sleep(1100)

 

From: Adrian Mocanu [mailto:[hidden email]]
Sent: May-05-14 12:06 PM
To: [hidden email]
Cc: [hidden email]
Subject: RE: another updateStateByKey question - updated w possible Spark bug

 

I’ve encountered this issue again and am able to reproduce it about 10% of the time.

 

1. Here is the input:

RDD[ (a, 1262325660000, 1), (a, 1262325660000, 2) ]  

RDD[ (a, 1262325660000, 1), (a, 1262325660000, 3) ]

RDD[ (a, 1262325660000, 3) ]

RDD[ (a, 1262325660000, 4) ]

RDD[ (a, 1262325660000, 2) ]

RDD[ (a, 1262325660000, 5), (a, 1262325660000, 5) ]

 

2. Here are the actual results (printed DStream – each line is a new RDD with RDD Id being the last number on each line):

(((a,1262325660000),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)

(((a,1262325660000),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)

(((a,1262325660000),StateClass(10,5,ArrayBuffer(3.0))),20)

(((a,1262325660000),StateClass(10,5,ArrayBuffer())),26)   <-empty elements Seq[V]

(((a,1262325660000),StateClass(14,6,ArrayBuffer(4.0))),33)

(((a,1262325660000),StateClass(16,7,ArrayBuffer(2.0))),40)

(((a,1262325660000),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

(((a,1262325660000),StateClass(26,9,ArrayBuffer())),53)  <-empty elements Seq[V]

(((a,1262325660000),StateClass(26,9,ArrayBuffer())),59)  <-empty elements Seq[V]

 

3. Here are the expected results: (all tuples from #2 except those with empty Seq[V] )

(((a,1262325660000),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)

(((a,1262325660000),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)

(((a,1262325660000),StateClass(10,5,ArrayBuffer(3.0))),20)

(((a,1262325660000),StateClass(14,6,ArrayBuffer(4.0))),33)

(((a,1262325660000),StateClass(16,7,ArrayBuffer(2.0))),40)

(((a,1262325660000),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

 

4. Here is the code:

    case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

 

    val updateSumFunc = (values: Seq[(String, Long, Int)], state: Option[StateClass]) => {

//      if (values.isEmpty) {

//        // if RDD cannot find values for this key (which is from prev RDD,

//        // the tuple will not be shown in this RDD w values of 0

//        None

//      } else {

        val previousState = state.getOrElse(StateClass(0, 0, Seq()))

        val currentCount = values.size + previousState.count

        var currentSum=0

        for (newValue <- values) yield ({

          currentSum = currentSum + newValue._3

        })

        currentSum= currentSum +previousState.sum

        val elements = for (newValues <- values) yield ({

          newValues._3.toDouble

        })

        Some(StateClass(currentSum, currentCount, elements))

//      }

    }

 

val partialResultSums= inputStream.map((x:(String, Long, Int)) =>((x._1), (x._1, x._2, x._3)))  //re map

.updateStateByKey[StateClass](updateSumFunc)  //update state

.transform(rdd=>rdd.map(t=>(t,rdd.id)))   //add RDD ID to RDD tuples

 

partialResultSums.print()

 

Now this is how I generate the RDDs and I suspect the delay is why the issue surfaces:

 

    rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = List[(String, Long, Int)]( (a, 1262325660000, 1), (a, 1262325660000, 2) )

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= List[(String, Long, Int)]((a, 1262325660000, 1), (a, 1262325660000, 3))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= List[(String, Long, Int)]((a, 1262325660000, 3))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= List[(String, Long, Int)]((a, 1262325660000, 4))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= List[(String, Long, Int)]((a, 1262325660000, 2))

 

    Thread.sleep(1100)

    rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= List[(String, Long, Int)]((a, 1262325660000, 5), (a, 1262325660000, 5))

 

    Thread.sleep(3100)

    //ssc.awaitTermination()

    ssc.stop()

 

In my use case when I detect an empty Seq[V] in updateStateByKey function I return None so I can filter the tuples out. However, given that Spark calls updateStateByKey function with empty Seq[V] when it should not, messes my logic up.

I wonder how to bypass this bug/feature of Spark.

 

Thanks

-Adrian

From: Tathagata Das [[hidden email]]
Sent: May-02-14 3:10 PM
To: [hidden email]
Cc: [hidden email]
Subject: Re: another updateStateByKey question

 

Could be a bug. Can you share a code with data that I can use to reproduce this?

TD

On May 2, 2014 9:49 AM, "Adrian Mocanu" <[hidden email]> wrote:

Has anyone else noticed that sometimes the same tuple calls update state function twice?

I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]

When the update function is called the first time Seq[V] has data: 1, 2 which is correct: StateClass(3,2, ArrayBuffer(1, 2))

Then right away (in my output I see this) the same key is used and the function is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

 

In the update function I also save Seq[V] to state so I can see it in the RDD. I also show a count and sum of the values.

StateClass(sum, count, Seq[V])

 

Why is the update function called with empty Seq[V] on the same key when all values for that key have been already taken care of in a previous update?

 

-Adrian