Using Spark Accumulators with Structured Streaming

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
It’s in constructor 

On Sat, May 30, 2020 at 4:15 AM Something Something <[hidden email]> wrote:
I mean... I don't see any reference to 'accumulator' in your Class definition. How can you access it in the class if it's not in your definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {.  --> I was expecting to see 'accumulator' here in the definition.

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V <[hidden email]> wrote:

Yes, accumulators are updated in the call method of StateUpdateTask. Like when state times out or when the data is pushed to next Kafka topic etc.  

On Fri, May 29, 2020 at 11:55 PM Something Something <[hidden email]> wrote:
Thanks! I will take a look at the link. Just one question, you seem to be passing 'accumulators' in the constructor but where do you use it in the StateUpdateTask class? I am still missing that connection. Sorry, if my question is dumb. I must be missing something. Thanks for your help so far. It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <[hidden email]> wrote:
Yes it is application specific class. This is how java Spark Functions work.

public class StateUpdateTask implements MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel> eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <[hidden email]> wrote:
I am assuming StateUpdateTask is your application specific class. Does it have 'updateState' method or something? I googled but couldn't find any documentation about doing it this way. Can you please direct me to some documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <[hidden email]> wrote:
yes, I am using stateful structured streaming. Yes similar to what you do. This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event -> event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <[hidden email]> wrote:
Yes, that's exactly how I am creating them. 

Question... Are you using 'Stateful Structured Streaming' in which you've something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.


On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]> wrote:
Yes, I am talking about Application specific Accumulators. Actually I am getting the values printed in my driver log as well as sent to Grafana. Not sure where and when I saw 0 before. My deploy mode is “client” on a yarn cluster(not local Mac) where I submit from master node. It should work the same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <[hidden email]> wrote:
Hmm... how would they go to Graphana if they are not getting computed in your code? I am talking about the Application Specific Accumulators. The other standard counters such as 'event.progress.inputRowsPerSecond' are getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]> wrote:
Hello, 
Even for me it comes as 0 when I print in OnQueryProgress. I use LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are coming there. 

On Tue, May 26, 2020 at 3:10 AM Something Something <[hidden email]> wrote:
No this is not working even if I use LongAccumulator. 

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]> wrote:
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <[hidden email]>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <[hidden email]<mailto:[hidden email]>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.



Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei
In reply to this post by Eric Beabes
Yes, verified on the cluster with 5 executors.

--
Cheers,
-z

On Fri, 29 May 2020 11:16:12 -0700
Something Something <[hidden email]> wrote:

> Did you try this on the Cluster? Note: This works just fine under 'Local'
> mode.
>
> On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
>
> > I can't reproduce the issue with my simple code:
> > ```scala
> >     spark.streams.addListener(new StreamingQueryListener {
> >       override def onQueryProgress(event:
> > StreamingQueryListener.QueryProgressEvent): Unit = {
> >         println(event.progress.id + " is on progress")
> >         println(s"My accu is ${myAcc.value} on query progress")
> >       }
> >         ...
> >     })
> >
> >     def mappingFunc(key: Long, values: Iterator[String], state:
> > GroupState[Long]): ... = {
> >       myAcc.add(1)
> >       println(s">>> key: $key => state: ${state}")
> >         ...
> >     }
> >
> >     val wordCounts = words
> >       .groupByKey(v => ...)
> >       .mapGroupsWithState(timeoutConf =
> > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> >
> >     val query = wordCounts.writeStream
> >       .outputMode(OutputMode.Update)
> >         ...
> > ```
> >
> > I'm wondering if there were any errors can be found from driver logs? The
> > micro-batch
> > exceptions won't terminate the streaming job running.
> >
> > For the following code, we have to make sure that `StateUpdateTask` is
> > started:
> > >                 .mapGroupsWithState(
> > >                         new
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > >                         Encoders.bean(ModelStateInfo.class),
> > >                         Encoders.bean(ModelUpdate.class),
> > >                         GroupStateTimeout.ProcessingTimeTimeout());
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 19:59:31 +0530
> > Srinivas V <[hidden email]> wrote:
> >
> > > Giving the code below:
> > > //accumulators is a class level variable in driver.
> > >
> > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > >             @Override
> > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > >                 logger.info("Query started: " + queryStarted.id());
> > >             }
> > >             @Override
> > >             public void onQueryTerminated(QueryTerminatedEvent
> > > queryTerminated) {
> > >                 logger.info("Query terminated: " +
> > queryTerminated.id());
> > >             }
> > >             @Override
> > >             public void onQueryProgress(QueryProgressEvent
> > queryProgress) {
> > >
> > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > >                 long eventsReceived = 0;
> > >                 long eventsExpired = 0;
> > >                 long eventSentSuccess = 0;
> > >                 try {
> > >                     eventsReceived =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > >                     eventsExpired =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > >                     eventSentSuccess =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > >                 } catch (MissingKeyException e) {
> > >                     logger.error("Accumulator key not found due to
> > > Exception {}", e.getMessage());
> > >                 }
> > >                 logger.info("Events Received:{}", eventsReceived);
> > >                 logger.info("Events State Expired:{}", eventsExpired);
> > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > >                 logger.info("Query made progress - batchId: {}
> > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > durationMs:{}" ,
> > >                         queryProgress.progress().batchId(),
> > > queryProgress.progress().numInputRows(),
> > > queryProgress.progress().inputRowsPerSecond(),
> > >
> >  queryProgress.progress().processedRowsPerSecond(),
> > > queryProgress.progress().durationMs());
> > >
> > >
> > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > >
> > > > May I get how the accumulator is accessed in the method
> > > > `onQueryProgress()`?
> > > >
> > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > that
> > > > in cluster like this:
> > > > ```
> > > >     // Add the following while loop before invoking awaitTermination
> > > >     while (true) {
> > > >       println("My acc: " + myAcc.value)
> > > >       Thread.sleep(5 * 1000)
> > > >     }
> > > >
> > > >     //query.awaitTermination()
> > > > ```
> > > >
> > > > And the accumulator value updated can be found from driver stdout.
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > Srinivas V <[hidden email]> wrote:
> > > >
> > > > > yes, I am using stateful structured streaming. Yes similar to what
> > you
> > > > do.
> > > > > This is in Java
> > > > > I do it this way:
> > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > >                 .groupByKey(
> > > > >                         (MapFunction<InputEventModel, String>) event
> > ->
> > > > > event.getId(), Encoders.STRING())
> > > > >                 .mapGroupsWithState(
> > > > >                         new
> > > > >
> > > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > >                         Encoders.bean(ModelUpdate.class),
> > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > >
> > > > > StateUpdateTask contains the update method.
> > > > >
> > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > [hidden email]> wrote:
> > > > >
> > > > > > Yes, that's exactly how I am creating them.
> > > > > >
> > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > you've
> > > > > > something like this?
> > > > > >
> > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > >         updateAcrossEvents
> > > > > >       )
> > > > > >
> > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > streaming applications it works as expected.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > wrote:
> > > > > >
> > > > > >> Yes, I am talking about Application specific Accumulators.
> > Actually I
> > > > am
> > > > > >> getting the values printed in my driver log as well as sent to
> > > > Grafana. Not
> > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > a
> > > > yarn
> > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > work the
> > > > > >> same for cluster mode as well.
> > > > > >> Create accumulators like this:
> > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > >>
> > > > > >>
> > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > >> [hidden email]> wrote:
> > > > > >>
> > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > computed in
> > > > > >>> your code? I am talking about the Application Specific
> > Accumulators.
> > > > The
> > > > > >>> other standard counters such as
> > 'event.progress.inputRowsPerSecond'
> > > > are
> > > > > >>> getting populated correctly!
> > > > > >>>
> > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > wrote:
> > > > > >>>
> > > > > >>>> Hello,
> > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > cluster.
> > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > values
> > > > > >>>> are coming there.
> > > > > >>>>
> > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > >>>> [hidden email]> wrote:
> > > > > >>>>
> > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > >>>>>
> > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > >
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > should
> > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > for
> > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > chance
> > > > to replace
> > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > LongAccumulator[3]
> > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > work?
> > > > > >>>>>>
> > > > > >>>>>> ---
> > > > > >>>>>> Cheers,
> > > > > >>>>>> -z
> > > > > >>>>>> [1]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > >>>>>> [2]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > >>>>>> [3]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > >>>>>>
> > > > > >>>>>> ________________________________________
> > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > >>>>>> To: spark-user
> > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > Streaming
> > > > > >>>>>>
> > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > functionality
> > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > can't get it
> > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > class that
> > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > one or
> > > > more
> > > > > >>>>>> accumulators. Here's the definition:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > >>>>>>
> > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > >>>>>>
> > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > >>>>>>
> > > > > >>>>>> Is this working under Structured Streaming?
> > > > > >>>>>>
> > > > > >>>>>> I will keep looking for alternate approaches but any help
> > would be
> > > > > >>>>>> greatly appreciated. Thanks.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > wrote:
> > > > > >>>>>>
> > > > > >>>>>> In my structured streaming job I am updating Spark
> > Accumulators in
> > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > try to
> > > > print
> > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > >>>>>>
> > > > > >>>>>>
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > >>>>>>         updateAcrossEvents
> > > > > >>>>>>       )
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > I've a
> > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > are
> > > > ALWAYS
> > > > > >>>>>> ZERO!
> > > > > >>>>>>
> > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > could see
> > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > >>>>>>
> > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > mode
> > > > it
> > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > distributed
> > > > > >>>>>> correctly - or something like that!
> > > > > >>>>>>
> > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > 'Stateful
> > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > in
> > > > > >>>>>> SparkContext.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > >
> >

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

ZHANG Wei
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <[hidden email]> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. 

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <[hidden email]> wrote:

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <[hidden email]> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}



On Mon, Jun 8, 2020 at 11:54 AM Something Something <[hidden email]> wrote:
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. 

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <[hidden email]> wrote:

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <[hidden email]> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Srinivas V
Ya, I had asked this question before. No one responded. By the way, what’s your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <[hidden email]> wrote:
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}



On Mon, Jun 8, 2020 at 11:54 AM Something Something <[hidden email]> wrote:
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. 

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <[hidden email]> wrote:

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <[hidden email]> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Using Spark Accumulators with Structured Streaming

Eric Beabes
Honestly, I don't know how to do this in Scala. I tried something like this...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
new StateUpdater(myAcc)
)

StateUpdater is similar to what Zhang has provided but it's NOT compiling 'cause I need to return a 'Dataset'.

Here's the definition of mapGroupsWithState in Scala:
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {


On Mon, Jun 8, 2020 at 12:07 PM Srinivas V <[hidden email]> wrote:
Ya, I had asked this question before. No one responded. By the way, what’s your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <[hidden email]> wrote:
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}



On Mon, Jun 8, 2020 at 11:54 AM Something Something <[hidden email]> wrote:
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. 

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <[hidden email]> wrote:

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <[hidden email]> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <[hidden email]> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <[hidden email]> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <[hidden email]> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <[hidden email]> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <[hidden email]> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <[hidden email]> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <[hidden email]> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > [hidden email]> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> [hidden email]> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <[hidden email]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> [hidden email]> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <[hidden email]
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <[hidden email]>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> [hidden email]<mailto:[hidden email]>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
12