[bug?] streaming window unexpected behaviour

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[bug?] streaming window unexpected behaviour

amoc

I have what I would call unexpected behaviour when using window on a stream.

I have 2 windowed streams with a 5s batch interval. One window stream is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow

What I’ve noticed is that the 1st RDD produced by bigWindow is incorrect and is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.

Why is this happening? To me it looks like a bug; Matei or TD can you verify that this is correct behaviour?

 

 

I have the following code

val ssc = new StreamingContext(conf, Seconds(5))

 

val smallWindowStream = ssc.queueStream(smallWindowRddQueue)

val bigWindowStream = ssc.queueStream(bigWindowRddQueue)

 

val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))

      .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))

val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))

        .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))

 

-Adrian

 

Reply | Threaded
Open this post in threaded view
|

Re: [bug?] streaming window unexpected behaviour

Tathagata Das
Yes, I believe that is current behavior. Essentially, the first few
RDDs will be partial windows (assuming window duration > sliding
interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu
<[hidden email]> wrote:

> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream is
> (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is incorrect and
> is of the size 5s not 10s. So instead of waiting 10s and producing 1 RDD
> with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you verify
> that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5), Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10), Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [bug?] streaming window unexpected behaviour

amoc
Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are.

-A

-----Original Message-----
From: Tathagata Das [mailto:[hidden email]]
Sent: March-24-14 6:55 PM
To: [hidden email]
Cc: [hidden email]
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <[hidden email]> wrote:

> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [bug?] streaming window unexpected behaviour

amoc
Let me rephrase that,
Do you think it is possible to use an accumulator to skip the first few incomplete RDDs?

-----Original Message-----
From: Adrian Mocanu [mailto:[hidden email]]
Sent: March-25-14 9:57 AM
To: [hidden email]
Cc: [hidden email]
Subject: RE: [bug?] streaming window unexpected behaviour

Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are.

-A

-----Original Message-----
From: Tathagata Das [mailto:[hidden email]]
Sent: March-24-14 6:55 PM
To: [hidden email]
Cc: [hidden email]
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <[hidden email]> wrote:

> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [bug?] streaming window unexpected behaviour

Tathagata Das
You can probably do it in a simpler but sort of hacky way!

If your window size is W and sliding interval S, you can do some math to figure out how many of the first windows are actually partial windows. Its probably math.ceil(W/S) . So in a windowDStream.foreachRDD() you can increment a global counter to count how many RDDs have been generated and ignore the first few RDDs.

windowDStream.foreachRDD(rdd => {
    Global.counter += 1
    if (Global.counter < math.ceil(W/S)) {
      return  // ignore
    } else {
         // do something awesome
    }
})


On Tue, Mar 25, 2014 at 7:29 AM, Adrian Mocanu <[hidden email]> wrote:
Let me rephrase that,
Do you think it is possible to use an accumulator to skip the first few incomplete RDDs?

-----Original Message-----
From: Adrian Mocanu [mailto:[hidden email]]
Sent: March-25-14 9:57 AM
To: [hidden email]
Cc: [hidden email]
Subject: RE: [bug?] streaming window unexpected behaviour

Thanks TD!
Is it possible to perhaps add another window method that doesn't not generate partial windows? Or, Is it possible to remove the first few partial windows? I'm thinking of using an accumulator to count how many windows there are.

-A

-----Original Message-----
From: Tathagata Das [mailto:[hidden email]]
Sent: March-24-14 6:55 PM
To: [hidden email]
Cc: [hidden email]
Subject: Re: [bug?] streaming window unexpected behaviour

Yes, I believe that is current behavior. Essentially, the first few RDDs will be partial windows (assuming window duration > sliding interval).

TD


On Mon, Mar 24, 2014 at 1:12 PM, Adrian Mocanu <[hidden email]> wrote:
> I have what I would call unexpected behaviour when using window on a stream.
>
> I have 2 windowed streams with a 5s batch interval. One window stream
> is (5s,5s)=smallWindow and the other (10s,5s)=bigWindow
>
> What I've noticed is that the 1st RDD produced by bigWindow is
> incorrect and is of the size 5s not 10s. So instead of waiting 10s and
> producing 1 RDD with size 10s, Spark produced the 1st 10s RDD of size 5s.
>
> Why is this happening? To me it looks like a bug; Matei or TD can you
> verify that this is correct behaviour?
>
>
>
>
>
> I have the following code
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
>
> val smallWindowStream = ssc.queueStream(smallWindowRddQueue)
>
> val bigWindowStream = ssc.queueStream(bigWindowRddQueue)
>
>
>
> val smallWindow = smallWindowReshapedStream.window(Seconds(5),
> Seconds(5))
>
>       .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
> val bigWindow = bigWindowReshapedStream.window(Seconds(10),
> Seconds(5))
>
>         .reduceByKey((t1, t2) => (t1._1, t1._2, t1._3 + t2._3))
>
>
>
> -Adrian
>
>