Structured Streaming metric for count of delayed/late data

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming metric for count of delayed/late data

GOEL Rajat

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Structured Streaming metric for count of delayed/late data

GOEL Rajat

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming metric for count of delayed/late data

liyuanjian
The metrics have been added in https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 3.1.
Maybe you can backport for testing since it's not a big change.

Best,
Yuanjian

GOEL Rajat <[hidden email]> 于2020年8月20日周四 下午9:14写道:

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming metric for count of delayed/late data

Jungtaek Lim-2
One more thing to say, unfortunately, the number is not accurate compared to the input rows on streaming aggregation, because Spark does local-aggregate and counts dropped inputs based on "pre-locally-aggregated" rows. You may want to treat the number as whether dropping inputs is happening or not.

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li <[hidden email]> wrote:
The metrics have been added in https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 3.1.
Maybe you can backport for testing since it's not a big change.

Best,
Yuanjian

GOEL Rajat <[hidden email]> 于2020年8月20日周四 下午9:14写道:

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming metric for count of delayed/late data

GOEL Rajat

Thanks for pointing me to the Spark ticket and its limitations. Will try these changes. 

Is there any workaround for this limitation of inaccurate count, maybe by adding some additional streaming operation in SS job without impacting perf too much ?

 

Regards,

Rajat

 

From: Jungtaek Lim <[hidden email]>
Date: Friday, 21 August 2020 at 12:07 PM
To: Yuanjian Li <[hidden email]>
Cc: GOEL Rajat <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Structured Streaming metric for count of delayed/late data

 

One more thing to say, unfortunately, the number is not accurate compared to the input rows on streaming aggregation, because Spark does local-aggregate and counts dropped inputs based on "pre-locally-aggregated" rows. You may want to treat the number as whether dropping inputs is happening or not.

 

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li <[hidden email]> wrote:

The metrics have been added in https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 3.1.

Maybe you can backport for testing since it's not a big change.

 

Best,

Yuanjian

 

GOEL Rajat <[hidden email]> 2020820日周四 下午9:14写道:

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming metric for count of delayed/late data

Jungtaek Lim-2
I proposed another approach which provided accurate count, though the number doesn't always mean they're dropped. (https://github.com/apache/spark/pull/24936 for details)

Btw, the limitation only applies to streaming aggregation, so you can implement the aggregation by yourself via (flat)MapGroupsWithState - note that the local aggregation is "optimization", so you may need to account the performance impact.

On Sat, Aug 22, 2020 at 1:29 PM GOEL Rajat <[hidden email]> wrote:

Thanks for pointing me to the Spark ticket and its limitations. Will try these changes. 

Is there any workaround for this limitation of inaccurate count, maybe by adding some additional streaming operation in SS job without impacting perf too much ?

 

Regards,

Rajat

 

From: Jungtaek Lim <[hidden email]>
Date: Friday, 21 August 2020 at 12:07 PM
To: Yuanjian Li <[hidden email]>
Cc: GOEL Rajat <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Structured Streaming metric for count of delayed/late data

 

One more thing to say, unfortunately, the number is not accurate compared to the input rows on streaming aggregation, because Spark does local-aggregate and counts dropped inputs based on "pre-locally-aggregated" rows. You may want to treat the number as whether dropping inputs is happening or not.

 

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li <[hidden email]> wrote:

The metrics have been added in https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 3.1.

Maybe you can backport for testing since it's not a big change.

 

Best,

Yuanjian

 

GOEL Rajat <[hidden email]> 2020820日周四 下午9:14写道:

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming metric for count of delayed/late data

GOEL Rajat

Thanks for the pointers. I will try these changes.

 

From: Jungtaek Lim <[hidden email]>
Date: Saturday, 22 August 2020 at 2:41 PM
To: GOEL Rajat <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Structured Streaming metric for count of delayed/late data

 

I proposed another approach which provided accurate count, though the number doesn't always mean they're dropped. (https://github.com/apache/spark/pull/24936 for details)

 

Btw, the limitation only applies to streaming aggregation, so you can implement the aggregation by yourself via (flat)MapGroupsWithState - note that the local aggregation is "optimization", so you may need to account the performance impact.

 

On Sat, Aug 22, 2020 at 1:29 PM GOEL Rajat <[hidden email]> wrote:

Thanks for pointing me to the Spark ticket and its limitations. Will try these changes. 

Is there any workaround for this limitation of inaccurate count, maybe by adding some additional streaming operation in SS job without impacting perf too much ?

 

Regards,

Rajat

 

From: Jungtaek Lim <[hidden email]>
Date: Friday, 21 August 2020 at 12:07 PM
To: Yuanjian Li <[hidden email]>
Cc: GOEL Rajat <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Structured Streaming metric for count of delayed/late data

 

One more thing to say, unfortunately, the number is not accurate compared to the input rows on streaming aggregation, because Spark does local-aggregate and counts dropped inputs based on "pre-locally-aggregated" rows. You may want to treat the number as whether dropping inputs is happening or not.

 

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li <[hidden email]> wrote:

The metrics have been added in https://issues.apache.org/jira/browse/SPARK-24634, but the target version is 3.1.

Maybe you can backport for testing since it's not a big change.

 

Best,

Yuanjian

 

GOEL Rajat <[hidden email]> 2020820日周四 下午9:14写道:

Hi All,

 

I have a query if someone can please help. Is there any metric or mechanism of printing count of input records dropped due to watermarking (late data count) in a stream, during a window based aggregation, in Structured Streaming ? I am using Spark 3.0.

 

Thanks & Regards,

Rajat