Using Spark Accumulators with Structured Streaming

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Spark Accumulators with Structured Streaming

Something Something

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. 

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! 

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.


 

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Something Something
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:

class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]
When the job begins we register an instance of this class:
spark.sparkContext.register(myAccumulator, "MyAccumulator")
Is this working under Structured Streaming?
I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. 

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! 

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.


 

Reply | Threaded
Open this post in threaded view
|

unsubscribe

Basavaraj


smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei-2
In reply to this post by Something Something
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: unsubscribe

Hichame El Khalfi
In reply to this post by Basavaraj



From: Basavaraj <[hidden email]>
Sent: Friday, May 15, 2020 9:12:01 PM
To: spark users
Subject: unsubscribe
 

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Something Something
In reply to this post by ZHANG Wei-2
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.