Spark Streaming windowing Driven by absolutely time?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming windowing Driven by absolutely time?

Aries Kong
hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming windowing Driven by absolutely time?

dachuan

I don't have a conclusive answer but I would like to discuss this.

If one node CPU is slower than the other, Windowing in absolute time won't cause any trouble because data are well partitioned.

On Feb 19, 2014 1:06 AM, "Aries Kong" <[hidden email]> wrote:
hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming windowing Driven by absolutely time?

Mayur Rustagi
also can we batch by other stuff like number of files as well as time?



On Wed, Feb 19, 2014 at 5:05 AM, dachuan <[hidden email]> wrote:

I don't have a conclusive answer but I would like to discuss this.

If one node CPU is slower than the other, Windowing in absolute time won't cause any trouble because data are well partitioned.

On Feb 19, 2014 1:06 AM, "Aries Kong" <[hidden email]> wrote:
hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming windowing Driven by absolutely time?

dachuan
How about existing solution, for example, mapreduce online model?


On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <[hidden email]> wrote:
also can we batch by other stuff like number of files as well as time?

Mayur Rustagi
Ph: <a href="tel:%2B919632149971" value="+919632149971" target="_blank">+919632149971


On Wed, Feb 19, 2014 at 5:05 AM, dachuan <[hidden email]> wrote:

I don't have a conclusive answer but I would like to discuss this.

If one node CPU is slower than the other, Windowing in absolute time won't cause any trouble because data are well partitioned.

On Feb 19, 2014 1:06 AM, "Aries Kong" <[hidden email]> wrote:
hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong




--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming windowing Driven by absolutely time?

Tathagata Das
The reason we chose to define windows based on time because of our underlying system design of Spark Streaming. Spark Streaming essentially divides received data in batches of fixed time interval and then runs Spark job on that data. So the system naturally maintains a mapping of <time interval> to <RDD containing data received in that interval> . So it is conceptually simple to defined windows based on this batch interval and combine RDDs accordingly. For example, if you are doing 1 seconds batches and you have generated RDD1 for data in 0 - 1 second interval, RDD2 for 1-2 second interval, and so on, defining the window of 4 seconds means combine 4 RDDs together. This keeps the system simple which providing a windowing functionality that works for many use cases. 

Defining windows based on the data-timestamp is actually a non-trivial problem to solve, to get all the semantics right. For example, what happens if some records arrive late / out of order and the corresponding window has been close? We may look at solving such problems in the future. 

In the meantime, here is a way for defining windows based on timestamps. Lets say you want to define a window of 5 minutes, and your data can be at most 1 minute late (i.e. out of order) based on your data-timestamp. Then you can define a DStream window (based on arrival time) of 10 seconds (more than 5 + 1 = 6) and then filter out only the timestamped based window that you want. 

|-------------------------------- 10 minute DStream.window --------------------------------------|          
          |_____________ filtered 5 minute window based on timestamp__|

In terms of code it will probably look something like this (in Scala)

yourInputDStream.window(Minutes(10)).transform(windowedRDD => {
     val timeStampBasedWindow = ....              // define the window over the timestamp that you want to process
     val filteredRDD = windowedRDD.filter( .... ) // filter and retain only the records that fall in the timstamp-based window
     ....                                         // do your processing on that filtered dataset
})

To answer what Mayur asked, there is no in-built way to batch by number of files. However, for files, you can write your batch logic to define the batches of files, and convert each batch to RDDs (using the standard sparkContext.hadoopFile, etc.) And then you can use StreamingContext.queueStream to push those manually generated RDDs into a DStream. 

Hope this helps!


On Wed, Feb 19, 2014 at 5:30 AM, dachuan <[hidden email]> wrote:
How about existing solution, for example, mapreduce online model?


On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <[hidden email]> wrote:
also can we batch by other stuff like number of files as well as time?

Mayur Rustagi
Ph: <a href="tel:%2B919632149971" value="+919632149971" target="_blank">+919632149971


On Wed, Feb 19, 2014 at 5:05 AM, dachuan <[hidden email]> wrote:

I don't have a conclusive answer but I would like to discuss this.

If one node CPU is slower than the other, Windowing in absolute time won't cause any trouble because data are well partitioned.

On Feb 19, 2014 1:06 AM, "Aries Kong" <[hidden email]> wrote:
hi all,

It seems that the Windowing in Spark Streaming Driven by absolutely
time not conventionally by the timestamp of the data, can anybody
kindly explains why? How can I do if I need Windowing driven by the
data-timestamp?

Thanks!


Aries.Kong




--
Dachuan Huang
Cellphone: <a href="tel:614-390-7234" value="+16143907234" target="_blank">614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming windowing Driven by absolutely time?

Aries Kong
A larger window does not work in my case,  i will try to change the
algorithm to avoid sliding window.

Thanks very much!

2014-02-21 6:17 GMT+08:00 Tathagata Das <[hidden email]>:

> The reason we chose to define windows based on time because of our
> underlying system design of Spark Streaming. Spark Streaming essentially
> divides received data in batches of fixed time interval and then runs Spark
> job on that data. So the system naturally maintains a mapping of <time
> interval> to <RDD containing data received in that interval> . So it is
> conceptually simple to defined windows based on this batch interval and
> combine RDDs accordingly. For example, if you are doing 1 seconds batches
> and you have generated RDD1 for data in 0 - 1 second interval, RDD2 for 1-2
> second interval, and so on, defining the window of 4 seconds means combine 4
> RDDs together. This keeps the system simple which providing a windowing
> functionality that works for many use cases.
>
> Defining windows based on the data-timestamp is actually a non-trivial
> problem to solve, to get all the semantics right. For example, what happens
> if some records arrive late / out of order and the corresponding window has
> been close? We may look at solving such problems in the future.
>
> In the meantime, here is a way for defining windows based on timestamps.
> Lets say you want to define a window of 5 minutes, and your data can be at
> most 1 minute late (i.e. out of order) based on your data-timestamp. Then
> you can define a DStream window (based on arrival time) of 10 seconds (more
> than 5 + 1 = 6) and then filter out only the timestamped based window that
> you want.
>
> |-------------------------------- 10 minute DStream.window
> --------------------------------------|
>           |_____________ filtered 5 minute window based on timestamp__|
>
> In terms of code it will probably look something like this (in Scala)
>
> yourInputDStream.window(Minutes(10)).transform(windowedRDD => {
>      val timeStampBasedWindow = ....              // define the window over
> the timestamp that you want to process
>      val filteredRDD = windowedRDD.filter( .... ) // filter and retain only
> the records that fall in the timstamp-based window
>      ....                                         // do your processing on
> that filtered dataset
> })
>
> To answer what Mayur asked, there is no in-built way to batch by number of
> files. However, for files, you can write your batch logic to define the
> batches of files, and convert each batch to RDDs (using the standard
> sparkContext.hadoopFile, etc.) And then you can use
> StreamingContext.queueStream to push those manually generated RDDs into a
> DStream.
>
> Hope this helps!
>
>
> On Wed, Feb 19, 2014 at 5:30 AM, dachuan <[hidden email]> wrote:
>>
>> How about existing solution, for example, mapreduce online model?
>>
>>
>> On Wed, Feb 19, 2014 at 8:15 AM, Mayur Rustagi <[hidden email]>
>> wrote:
>>>
>>> also can we batch by other stuff like number of files as well as time?
>>>
>>> Mayur Rustagi
>>> Ph: +919632149971
>>> http://www.sigmoidanalytics.com
>>> https://twitter.com/mayur_rustagi
>>>
>>>
>>>
>>> On Wed, Feb 19, 2014 at 5:05 AM, dachuan <[hidden email]> wrote:
>>>>
>>>> I don't have a conclusive answer but I would like to discuss this.
>>>>
>>>> If one node CPU is slower than the other, Windowing in absolute time
>>>> won't cause any trouble because data are well partitioned.
>>>>
>>>> On Feb 19, 2014 1:06 AM, "Aries Kong" <[hidden email]> wrote:
>>>>>
>>>>> hi all,
>>>>>
>>>>> It seems that the Windowing in Spark Streaming Driven by absolutely
>>>>> time not conventionally by the timestamp of the data, can anybody
>>>>> kindly explains why? How can I do if I need Windowing driven by the
>>>>> data-timestamp?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> Aries.Kong
>>>
>>>
>>
>>
>>
>> --
>> Dachuan Huang
>> Cellphone: 614-390-7234
>> 2015 Neil Avenue
>> Ohio State University
>> Columbus, Ohio
>> U.S.A.
>> 43210
>
>