Watermarking without aggregation with Structured Streaming

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarking without aggregation with Structured Streaming

peay-2
Hello,

I am trying to use watermarking without aggregation, to filter out records that are just too late, instead of appending them to the output. My understanding is that aggregation is required for `withWatermark` to have any effect. Is that correct?

I am looking for something along the lines of

```
df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())
```

Is there any way to get the watermark value to achieve that?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

chandan prakash
Interesting question.
I do not think without any aggregation operation/groupBy , watermark is supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of watermark happen only in case of stateful operators (statefulOperators.scala)
Have not tried running code though and would like to know if someone can shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay <[hidden email]> wrote:
Hello,

I am trying to use watermarking without aggregation, to filter out records that are just too late, instead of appending them to the output. My understanding is that aggregation is required for `withWatermark` to have any effect. Is that correct?

I am looking for something along the lines of

```
df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())
```

Is there any way to get the watermark value to achieve that?

Thanks!


--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

Jungtaek Lim
The purpose of watermark is to set a limitation on handling records due to state going infinity. In other cases (non-stateful operations), it is pretty normal to handle all of records even they're pretty late.

Btw, there was some comments regarding this: while Spark delegates to filter out late records in stateful operations for now, some of us (including me) think filtering out late records in earlier phase (source, or just after source) makes more sense. It just didn't come out as action, but I think it is still valid.


If we move the phase of filtering out late records, what you would like to do may become the default behavior. This also means the output may be also changed for queries which use non-stateful operations, so it is not a trivial change and may require consensus like SPIP process.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 5:19, chandan prakash <[hidden email]>님이 작성:
Interesting question.
I do not think without any aggregation operation/groupBy , watermark is supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of watermark happen only in case of stateful operators (statefulOperators.scala)
Have not tried running code though and would like to know if someone can shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay <[hidden email]> wrote:
Hello,

I am trying to use watermarking without aggregation, to filter out records that are just too late, instead of appending them to the output. My understanding is that aggregation is required for `withWatermark` to have any effect. Is that correct?

I am looking for something along the lines of

```
df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())
```

Is there any way to get the watermark value to achieve that?

Thanks!


--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

peay-2
Thanks for the pointers. I guess right now the only workaround would be to apply a "dummy" aggregation (e.g., group by the timestamp itself) only to have the stateful processing logic kick in and apply the filtering?

For my purposes, an alternative solution to pushing it out to the source would be to make the watermark timestamp available through a function so that it can be used in a regular filter clause. Based on my experiments, the timestamp is computed and updated even when no stateful computations occur. I am not sure how easy that would be to contribute though, maybe someone can suggest a starting point?

Thanks,

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Sunday, 30 September 2018 10:41, Jungtaek Lim <[hidden email]> wrote:

The purpose of watermark is to set a limitation on handling records due to state going infinity. In other cases (non-stateful operations), it is pretty normal to handle all of records even they're pretty late.

Btw, there was some comments regarding this: while Spark delegates to filter out late records in stateful operations for now, some of us (including me) think filtering out late records in earlier phase (source, or just after source) makes more sense. It just didn't come out as action, but I think it is still valid.


If we move the phase of filtering out late records, what you would like to do may become the default behavior. This also means the output may be also changed for queries which use non-stateful operations, so it is not a trivial change and may require consensus like SPIP process.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 30일 (일) 오후 5:19, chandan prakash <[hidden email]>님이 작성:
Interesting question.
I do not think without any aggregation operation/groupBy , watermark is supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of watermark happen only in case of stateful operators (statefulOperators.scala)
Have not tried running code though and would like to know if someone can shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay <[hidden email]> wrote:
Hello,

I am trying to use watermarking without aggregation, to filter out records that are just too late, instead of appending them to the output. My understanding is that aggregation is required for `withWatermark` to have any effect. Is that correct?

I am looking for something along the lines of

```
df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())
```

Is there any way to get the watermark value to achieve that?

Thanks!


--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

sanjay_awat
Try this


peay-2 wrote
> For my purposes, an alternative solution to pushing it out to the source
> would be to make the watermark timestamp available through a function so
> that it can be used in a regular filter clause. Based on my experiments,
> the timestamp is computed and updated even when no stateful computations
> occur. I am not sure how easy that would be to contribute though, maybe
> someone can suggest a starting point?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

Sanjay Awatramani
In reply to this post by peay-2
Try if this works...

println(query.lastProgress.eventTime.get("watermark"))

Regards,
Sanjay

On 2018/09/30 09:05:40, peay <[hidden email]> wrote: 

> Thanks for the pointers. I guess right now the only workaround would be to apply a "dummy" aggregation (e.g., group by the timestamp itself) only to have the stateful processing logic kick in and apply the filtering?>

>

> For my purposes, an alternative solution to pushing it out to the source would be to make the watermark timestamp available through a function so that it can be used in a regular filter clause. Based on my experiments, the timestamp is computed and updated even when no stateful computations occur. I am not sure how easy that would be to contribute though, maybe someone can suggest a starting point?> 

>

> Thanks,> 

>

> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐> 

> On Sunday, 30 September 2018 10:41, Jungtaek Lim <[hidden email]> wrote:> 

>

> > The purpose of watermark is to set a limitation on handling records due to state going infinity. In other cases (non-stateful operations), it is pretty normal to handle all of records even they're pretty late.> 

> >> 

> > Btw, there was some comments regarding this: while Spark delegates to filter out late records in stateful operations for now, some of us (including me) think filtering out late records in earlier phase (source, or just after source) makes more sense. It just didn't come out as action, but I think it is still valid.> 

> >> 

> > https://github.com/apache/spark/pull/21617#issuecomment-400119049>

> >> 

> > If we move the phase of filtering out late records, what you would like to do may become the default behavior. This also means the output may be also changed for queries which use non-stateful operations, so it is not a trivial change and may require consensus like SPIP process.> 

> >> 

> > Thanks,> 

> > Jungtaek Lim (HeartSaVioR)> 

> >> 

> > 2018930(오후5:19, chandan prakash <[hidden email]>님이작성:> 

> >> 

> >> Interesting question.> 

> >> I do not think without any aggregation operation/groupBy , watermark is supported currently .> 

> >>> 

> >> Reason:> 

> >> Watermark in Structured Streaming is used for limiting the size of state needed to keep intermediate information in-memory.> 

> >> And state only comes in picture in case of stateful processing.> 

> >> Also in the code, it seems that  filtering out records on basis of watermark happen only in case of stateful operators (statefulOperators.scala)> 

> >> Have not tried running code though and would like to know if someone can shed more light on this.> 

> >>> 

> >> Regards,> 

> >> Chandan> 

> >>> 

> >> On Sat, Sep 22, 2018 at 7:43 PM peay <[hidden email]> wrote:> 

> >>> 

> >>> Hello,> 

> >>>> 

> >>> I am trying to use watermarking without aggregation, to filter out records that are just too late, instead of appending them to the output. My understanding is that aggregation is required for `withWatermark` to have any effect. Is that correct?> 

> >>>> 

> >>> I am looking for something along the lines of> 

> >>>> 

> >>> ```> 

> >>> df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())> 

> >>> ```> 

> >>>> 

> >>> Is there any way to get the watermark value to achieve that?> 

> >>>> 

> >>> Thanks!> 

> >>> 

> >> --> 

> >> Chandan Prakash> 

Reply | Threaded
Open this post in threaded view
|

Re: Watermarking without aggregation with Structured Streaming

sanjay_awat
In reply to this post by peay-2
Hello peay-2,

Were you able to get a solution to your problem ? Were you able to get
watermark timestamp available through a function ?

Regards,
Sanjay


peay-2 wrote

> Thanks for the pointers. I guess right now the only workaround would be to
> apply a "dummy" aggregation (e.g., group by the timestamp itself) only to
> have the stateful processing logic kick in and apply the filtering?
>
> For my purposes, an alternative solution to pushing it out to the source
> would be to make the watermark timestamp available through a function so
> that it can be used in a regular filter clause. Based on my experiments,
> the timestamp is computed and updated even when no stateful computations
> occur. I am not sure how easy that would be to contribute though, maybe
> someone can suggest a starting point?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]