Using Spark Accumulators with Structured Streaming

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Using Spark Accumulators with Structured Streaming

Eric Beabes

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. 

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! 

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.


 

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:

class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]
When the job begins we register an instance of this class:
spark.sparkContext.register(myAccumulator, "MyAccumulator")
Is this working under Structured Streaming?
I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected. 

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that! 

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.


 

Reply | Threaded
Open this post in threaded view
|

unsubscribe

Basavaraj


smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei-2
In reply to this post by Eric Beabes
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: unsubscribe

Hichame El Khalfi
In reply to this post by Basavaraj



From: Basavaraj <[hidden email]>
Sent: Friday, May 15, 2020 9:12:01 PM
To: spark users
Subject: unsubscribe
 

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
In reply to this post by ZHANG Wei-2
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei
May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
    // Add the following while loop before invoking awaitTermination
    while (true) {
      println("My acc: " + myAcc.value)
      Thread.sleep(5 * 1000)
    }

    //query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

--
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V <[hidden email]> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> [hidden email]> wrote:
>
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >         updateAcrossEvents
> >       )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> [hidden email]> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
> >>>
> >>>> Hello,
> >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> >>>> But one consolation is that when I send metrics to Graphana, the values
> >>>> are coming there.
> >>>>
> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> >>>> [hidden email]> wrote:
> >>>>
> >>>>> No this is not working even if I use LongAccumulator.
> >>>>>
> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
> >>>>>
> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> >>>>>> and test if the StreamingListener and other codes are able to work?
> >>>>>>
> >>>>>> ---
> >>>>>> Cheers,
> >>>>>> -z
> >>>>>> [1]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> >>>>>> [2]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> >>>>>> [3]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> >>>>>>
> >>>>>> ________________________________________
> >>>>>> From: Something Something <[hidden email]>
> >>>>>> Sent: Saturday, May 16, 2020 0:38
> >>>>>> To: spark-user
> >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> >>>>>>
> >>>>>> Can someone from Spark Development team tell me if this functionality
> >>>>>> is supported and tested? I've spent a lot of time on this but can't get it
> >>>>>> to work. Just to add more context, we've our own Accumulator class that
> >>>>>> extends from AccumulatorV2. In this class we keep track of one or more
> >>>>>> accumulators. Here's the definition:
> >>>>>>
> >>>>>>
> >>>>>> class CollectionLongAccumulator[T]
> >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> >>>>>>
> >>>>>> When the job begins we register an instance of this class:
> >>>>>>
> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> >>>>>>
> >>>>>> Is this working under Structured Streaming?
> >>>>>>
> >>>>>> I will keep looking for alternate approaches but any help would be
> >>>>>> greatly appreciated. Thanks.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> In my structured streaming job I am updating Spark Accumulators in
> >>>>>> the updateAcrossEvents method but they are always 0 when I try to print
> >>>>>> them in my StreamingListener. Here's the code:
> >>>>>>
> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >>>>>>         updateAcrossEvents
> >>>>>>       )
> >>>>>>
> >>>>>>
> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> >>>>>> StreamingListener which writes values of the accumulators in
> >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> >>>>>> ZERO!
> >>>>>>
> >>>>>> When I added log statements in the updateAcrossEvents, I could see
> >>>>>> that these accumulators are getting incremented as expected.
> >>>>>>
> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
> >>>>>> works fine which implies that the Accumulators are not getting distributed
> >>>>>> correctly - or something like that!
> >>>>>>
> >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
> >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> >>>>>> SparkContext.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
Giving the code below:
//accumulators is a class level variable in driver. 

 sparkSession.streams().addListener(new StreamingQueryListener() {
            @Override
            public void onQueryStarted(QueryStartedEvent queryStarted) {
                logger.info("Query started: " + queryStarted.id());
            }
            @Override
            public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
                logger.info("Query terminated: " + queryTerminated.id());
            }
            @Override
            public void onQueryProgress(QueryProgressEvent queryProgress) {
                accumulators.eventsReceived(queryProgress.progress().numInputRows());
                long eventsReceived = 0;
                long eventsExpired = 0;
                long eventSentSuccess = 0;
                try {
                    eventsReceived = accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
                    eventsExpired = accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
                    eventSentSuccess = accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
                } catch (MissingKeyException e) {
                    logger.error("Accumulator key not found due to Exception {}", e.getMessage());
                }
                logger.info("Events Received:{}", eventsReceived);
                logger.info("Events State Expired:{}", eventsExpired);
                logger.info("Events Sent Success:{}", eventSentSuccess);
                logger.info("Query made progress - batchId: {} numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{} durationMs:{}" ,
                        queryProgress.progress().batchId(), queryProgress.progress().numInputRows(), queryProgress.progress().inputRowsPerSecond(),
                        queryProgress.progress().processedRowsPerSecond(), queryProgress.progress().durationMs());
             

On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
    // Add the following while loop before invoking awaitTermination
    while (true) {
      println("My acc: " + myAcc.value)
      Thread.sleep(5 * 1000)
    }

    //query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

--
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V <[hidden email]> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> [hidden email]> wrote:
>
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >         updateAcrossEvents
> >       )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> [hidden email]> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
> >>>
> >>>> Hello,
> >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> >>>> But one consolation is that when I send metrics to Graphana, the values
> >>>> are coming there.
> >>>>
> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> >>>> [hidden email]> wrote:
> >>>>
> >>>>> No this is not working even if I use LongAccumulator.
> >>>>>
> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
> >>>>>
> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> >>>>>> and test if the StreamingListener and other codes are able to work?
> >>>>>>
> >>>>>> ---
> >>>>>> Cheers,
> >>>>>> -z
> >>>>>> [1]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> >>>>>> [2]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> >>>>>> [3]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> >>>>>>
> >>>>>> ________________________________________
> >>>>>> From: Something Something <[hidden email]>
> >>>>>> Sent: Saturday, May 16, 2020 0:38
> >>>>>> To: spark-user
> >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> >>>>>>
> >>>>>> Can someone from Spark Development team tell me if this functionality
> >>>>>> is supported and tested? I've spent a lot of time on this but can't get it
> >>>>>> to work. Just to add more context, we've our own Accumulator class that
> >>>>>> extends from AccumulatorV2. In this class we keep track of one or more
> >>>>>> accumulators. Here's the definition:
> >>>>>>
> >>>>>>
> >>>>>> class CollectionLongAccumulator[T]
> >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> >>>>>>
> >>>>>> When the job begins we register an instance of this class:
> >>>>>>
> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> >>>>>>
> >>>>>> Is this working under Structured Streaming?
> >>>>>>
> >>>>>> I will keep looking for alternate approaches but any help would be
> >>>>>> greatly appreciated. Thanks.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> In my structured streaming job I am updating Spark Accumulators in
> >>>>>> the updateAcrossEvents method but they are always 0 when I try to print
> >>>>>> them in my StreamingListener. Here's the code:
> >>>>>>
> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >>>>>>         updateAcrossEvents
> >>>>>>       )
> >>>>>>
> >>>>>>
> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> >>>>>> StreamingListener which writes values of the accumulators in
> >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> >>>>>> ZERO!
> >>>>>>
> >>>>>> When I added log statements in the updateAcrossEvents, I could see
> >>>>>> that these accumulators are getting incremented as expected.
> >>>>>>
> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
> >>>>>> works fine which implies that the Accumulators are not getting distributed
> >>>>>> correctly - or something like that!
> >>>>>>
> >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
> >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> >>>>>> SparkContext.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
In reply to this post by Srinivas V
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei
In reply to this post by Srinivas V
I can't reproduce the issue with my simple code:
```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
        ...
```

I'm wondering if there were any errors can be found from driver logs? The micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:
>                 .mapGroupsWithState(
>                         new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());

--
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V <[hidden email]> wrote:

> Giving the code below:
> //accumulators is a class level variable in driver.
>
>  sparkSession.streams().addListener(new StreamingQueryListener() {
>             @Override
>             public void onQueryStarted(QueryStartedEvent queryStarted) {
>                 logger.info("Query started: " + queryStarted.id());
>             }
>             @Override
>             public void onQueryTerminated(QueryTerminatedEvent
> queryTerminated) {
>                 logger.info("Query terminated: " + queryTerminated.id());
>             }
>             @Override
>             public void onQueryProgress(QueryProgressEvent queryProgress) {
>
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>                 long eventsReceived = 0;
>                 long eventsExpired = 0;
>                 long eventSentSuccess = 0;
>                 try {
>                     eventsReceived =
> accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>                     eventsExpired =
> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>                     eventSentSuccess =
> accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>                 } catch (MissingKeyException e) {
>                     logger.error("Accumulator key not found due to
> Exception {}", e.getMessage());
>                 }
>                 logger.info("Events Received:{}", eventsReceived);
>                 logger.info("Events State Expired:{}", eventsExpired);
>                 logger.info("Events Sent Success:{}", eventSentSuccess);
>                 logger.info("Query made progress - batchId: {}
> numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> durationMs:{}" ,
>                         queryProgress.progress().batchId(),
> queryProgress.progress().numInputRows(),
> queryProgress.progress().inputRowsPerSecond(),
>                         queryProgress.progress().processedRowsPerSecond(),
> queryProgress.progress().durationMs());
>
>
> On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
>
> > May I get how the accumulator is accessed in the method
> > `onQueryProgress()`?
> >
> > AFAICT, the accumulator is incremented well. There is a way to verify that
> > in cluster like this:
> > ```
> >     // Add the following while loop before invoking awaitTermination
> >     while (true) {
> >       println("My acc: " + myAcc.value)
> >       Thread.sleep(5 * 1000)
> >     }
> >
> >     //query.awaitTermination()
> > ```
> >
> > And the accumulator value updated can be found from driver stdout.
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 17:12:48 +0530
> > Srinivas V <[hidden email]> wrote:
> >
> > > yes, I am using stateful structured streaming. Yes similar to what you
> > do.
> > > This is in Java
> > > I do it this way:
> > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > >                 .groupByKey(
> > >                         (MapFunction<InputEventModel, String>) event ->
> > > event.getId(), Encoders.STRING())
> > >                 .mapGroupsWithState(
> > >                         new
> > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > >                         Encoders.bean(ModelStateInfo.class),
> > >                         Encoders.bean(ModelUpdate.class),
> > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > StateUpdateTask contains the update method.
> > >
> > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > [hidden email]> wrote:
> > >
> > > > Yes, that's exactly how I am creating them.
> > > >
> > > > Question... Are you using 'Stateful Structured Streaming' in which
> > you've
> > > > something like this?
> > > >
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >         updateAcrossEvents
> > > >       )
> > > >
> > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > experiencing this only under 'Stateful Structured Streaming'. In other
> > streaming applications it works as expected.
> > > >
> > > >
> > > >
> > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > wrote:
> > > >
> > > >> Yes, I am talking about Application specific Accumulators. Actually I
> > am
> > > >> getting the values printed in my driver log as well as sent to
> > Grafana. Not
> > > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> > yarn
> > > >> cluster(not local Mac) where I submit from master node. It should
> > work the
> > > >> same for cluster mode as well.
> > > >> Create accumulators like this:
> > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > >>
> > > >>
> > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > >> [hidden email]> wrote:
> > > >>
> > > >>> Hmm... how would they go to Graphana if they are not getting
> > computed in
> > > >>> your code? I am talking about the Application Specific Accumulators.
> > The
> > > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> > are
> > > >>> getting populated correctly!
> > > >>>
> > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > wrote:
> > > >>>
> > > >>>> Hello,
> > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > cluster.
> > > >>>> But one consolation is that when I send metrics to Graphana, the
> > values
> > > >>>> are coming there.
> > > >>>>
> > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > >>>> [hidden email]> wrote:
> > > >>>>
> > > >>>>> No this is not working even if I use LongAccumulator.
> > > >>>>>
> > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]>
> > wrote:
> > > >>>>>
> > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > should
> > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> > to replace
> > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > LongAccumulator[3]
> > > >>>>>> and test if the StreamingListener and other codes are able to
> > work?
> > > >>>>>>
> > > >>>>>> ---
> > > >>>>>> Cheers,
> > > >>>>>> -z
> > > >>>>>> [1]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3D&amp;reserved=0
> > > >>>>>> [2]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=QuFZKdKLRWRahfheHS5Zvc9GWApj2QCsBmiSEmOYsdo%3D&amp;reserved=0
> > > >>>>>> [3]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=3xFOiNXNSuJ8w%2FQY7th2qip16ykMjbjXFLN6NXxXGAo%3D&amp;reserved=0
> > > >>>>>>
> > > >>>>>> ________________________________________
> > > >>>>>> From: Something Something <[hidden email]>
> > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > >>>>>> To: spark-user
> > > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > > >>>>>>
> > > >>>>>> Can someone from Spark Development team tell me if this
> > functionality
> > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > can't get it
> > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > class that
> > > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> > more
> > > >>>>>> accumulators. Here's the definition:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> class CollectionLongAccumulator[T]
> > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > >>>>>>
> > > >>>>>> When the job begins we register an instance of this class:
> > > >>>>>>
> > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > >>>>>>
> > > >>>>>> Is this working under Structured Streaming?
> > > >>>>>>
> > > >>>>>> I will keep looking for alternate approaches but any help would be
> > > >>>>>> greatly appreciated. Thanks.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> > > >>>>>>
> > > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> > print
> > > >>>>>> them in my StreamingListener. Here's the code:
> > > >>>>>>
> > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >>>>>>         updateAcrossEvents
> > > >>>>>>       )
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > > >>>>>> StreamingListener which writes values of the accumulators in
> > > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> > ALWAYS
> > > >>>>>> ZERO!
> > > >>>>>>
> > > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > > >>>>>> that these accumulators are getting incremented as expected.
> > > >>>>>>
> > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> > it
> > > >>>>>> works fine which implies that the Accumulators are not getting
> > distributed
> > > >>>>>> correctly - or something like that!
> > > >>>>>>
> > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > >>>>>> perform an "Action". That's not a solution here. This is a
> > 'Stateful
> > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > > >>>>>> SparkContext.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> >

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
In reply to this post by Eric Beabes
Yes it is application specific class. This is how java Spark Functions work.

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <[hidden email]> wrote:
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
In reply to this post by ZHANG Wei
Did you try this on the Cluster? Note: This works just fine under 'Local' mode. 

On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
I can't reproduce the issue with my simple code:
```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
        ...
```

I'm wondering if there were any errors can be found from driver logs? The micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:
>                 .mapGroupsWithState(
>                         new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());

--
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V <[hidden email]> wrote:

> Giving the code below:
> //accumulators is a class level variable in driver.
>
>  sparkSession.streams().addListener(new StreamingQueryListener() {
>             @Override
>             public void onQueryStarted(QueryStartedEvent queryStarted) {
>                 logger.info("Query started: " + queryStarted.id());
>             }
>             @Override
>             public void onQueryTerminated(QueryTerminatedEvent
> queryTerminated) {
>                 logger.info("Query terminated: " + queryTerminated.id());
>             }
>             @Override
>             public void onQueryProgress(QueryProgressEvent queryProgress) {
>
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>                 long eventsReceived = 0;
>                 long eventsExpired = 0;
>                 long eventSentSuccess = 0;
>                 try {
>                     eventsReceived =
> accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>                     eventsExpired =
> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>                     eventSentSuccess =
> accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>                 } catch (MissingKeyException e) {
>                     logger.error("Accumulator key not found due to
> Exception {}", e.getMessage());
>                 }
>                 logger.info("Events Received:{}", eventsReceived);
>                 logger.info("Events State Expired:{}", eventsExpired);
>                 logger.info("Events Sent Success:{}", eventSentSuccess);
>                 logger.info("Query made progress - batchId: {}
> numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> durationMs:{}" ,
>                         queryProgress.progress().batchId(),
> queryProgress.progress().numInputRows(),
> queryProgress.progress().inputRowsPerSecond(),
>                         queryProgress.progress().processedRowsPerSecond(),
> queryProgress.progress().durationMs());
>
>
> On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
>
> > May I get how the accumulator is accessed in the method
> > `onQueryProgress()`?
> >
> > AFAICT, the accumulator is incremented well. There is a way to verify that
> > in cluster like this:
> > ```
> >     // Add the following while loop before invoking awaitTermination
> >     while (true) {
> >       println("My acc: " + myAcc.value)
> >       Thread.sleep(5 * 1000)
> >     }
> >
> >     //query.awaitTermination()
> > ```
> >
> > And the accumulator value updated can be found from driver stdout.
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 17:12:48 +0530
> > Srinivas V <[hidden email]> wrote:
> >
> > > yes, I am using stateful structured streaming. Yes similar to what you
> > do.
> > > This is in Java
> > > I do it this way:
> > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > >                 .groupByKey(
> > >                         (MapFunction<InputEventModel, String>) event ->
> > > event.getId(), Encoders.STRING())
> > >                 .mapGroupsWithState(
> > >                         new
> > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > >                         Encoders.bean(ModelStateInfo.class),
> > >                         Encoders.bean(ModelUpdate.class),
> > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > StateUpdateTask contains the update method.
> > >
> > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > [hidden email]> wrote:
> > >
> > > > Yes, that's exactly how I am creating them.
> > > >
> > > > Question... Are you using 'Stateful Structured Streaming' in which
> > you've
> > > > something like this?
> > > >
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >         updateAcrossEvents
> > > >       )
> > > >
> > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > experiencing this only under 'Stateful Structured Streaming'. In other
> > streaming applications it works as expected.
> > > >
> > > >
> > > >
> > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > wrote:
> > > >
> > > >> Yes, I am talking about Application specific Accumulators. Actually I
> > am
> > > >> getting the values printed in my driver log as well as sent to
> > Grafana. Not
> > > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> > yarn
> > > >> cluster(not local Mac) where I submit from master node. It should
> > work the
> > > >> same for cluster mode as well.
> > > >> Create accumulators like this:
> > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > >>
> > > >>
> > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > >> [hidden email]> wrote:
> > > >>
> > > >>> Hmm... how would they go to Graphana if they are not getting
> > computed in
> > > >>> your code? I am talking about the Application Specific Accumulators.
> > The
> > > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> > are
> > > >>> getting populated correctly!
> > > >>>
> > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > wrote:
> > > >>>
> > > >>>> Hello,
> > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > cluster.
> > > >>>> But one consolation is that when I send metrics to Graphana, the
> > values
> > > >>>> are coming there.
> > > >>>>
> > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > >>>> [hidden email]> wrote:
> > > >>>>
> > > >>>>> No this is not working even if I use LongAccumulator.
> > > >>>>>
> > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]>
> > wrote:
> > > >>>>>
> > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > should
> > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> > to replace
> > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > LongAccumulator[3]
> > > >>>>>> and test if the StreamingListener and other codes are able to
> > work?
> > > >>>>>>
> > > >>>>>> ---
> > > >>>>>> Cheers,
> > > >>>>>> -z
> > > >>>>>> [1]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3D&amp;reserved=0
> > > >>>>>> [2]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=QuFZKdKLRWRahfheHS5Zvc9GWApj2QCsBmiSEmOYsdo%3D&amp;reserved=0
> > > >>>>>> [3]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=3xFOiNXNSuJ8w%2FQY7th2qip16ykMjbjXFLN6NXxXGAo%3D&amp;reserved=0
> > > >>>>>>
> > > >>>>>> ________________________________________
> > > >>>>>> From: Something Something <[hidden email]>
> > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > >>>>>> To: spark-user
> > > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > > >>>>>>
> > > >>>>>> Can someone from Spark Development team tell me if this
> > functionality
> > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > can't get it
> > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > class that
> > > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> > more
> > > >>>>>> accumulators. Here's the definition:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> class CollectionLongAccumulator[T]
> > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > >>>>>>
> > > >>>>>> When the job begins we register an instance of this class:
> > > >>>>>>
> > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > >>>>>>
> > > >>>>>> Is this working under Structured Streaming?
> > > >>>>>>
> > > >>>>>> I will keep looking for alternate approaches but any help would be
> > > >>>>>> greatly appreciated. Thanks.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> > > >>>>>>
> > > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> > print
> > > >>>>>> them in my StreamingListener. Here's the code:
> > > >>>>>>
> > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >>>>>>         updateAcrossEvents
> > > >>>>>>       )
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > > >>>>>> StreamingListener which writes values of the accumulators in
> > > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> > ALWAYS
> > > >>>>>> ZERO!
> > > >>>>>>
> > > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > > >>>>>> that these accumulators are getting incremented as expected.
> > > >>>>>>
> > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> > it
> > > >>>>>> works fine which implies that the Accumulators are not getting
> > distributed
> > > >>>>>> correctly - or something like that!
> > > >>>>>>
> > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > >>>>>> perform an "Action". That's not a solution here. This is a
> > 'Stateful
> > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > > >>>>>> SparkContext.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> >
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
In reply to this post by Srinivas V
Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <[hidden email]> wrote:
Yes it is application specific class. This is how java Spark Functions work.

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <[hidden email]> wrote:
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V

Yes, accumulators are updated in the call method of StateUpdateTask. Like when state times out or when the data is pushed to next Kafka topic etc.  

On Fri, May 29, 2020 at 11:55 PM Something Something <[hidden email]> wrote:
Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <[hidden email]> wrote:
Yes it is application specific class. This is how java Spark Functions work.

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <[hidden email]> wrote:
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
I mean... I don't see any reference to 'accumulator' in your Class definition. How can you access it in the class if it's not in your definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {.  --> I was expecting to see 'accumulator' here in the definition.

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V <[hidden email]> wrote:

Yes, accumulators are updated in the call method of StateUpdateTask. Like when state times out or when the data is pushed to next Kafka topic etc.  

On Fri, May 29, 2020 at 11:55 PM Something Something <[hidden email]> wrote:
Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <[hidden email]> wrote:
Yes it is application specific class. This is how java Spark Functions work.

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <[hidden email]> wrote:
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



12