Honestly, I don't know how to do this in Scala. I tried something like this...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
new StateUpdater(myAcc)
)

StateUpdater is similar to what Zhang has provided but it's NOT compiling 'cause I need to return a 'Dataset'.

Here's the definition of mapGroupsWithState in Scala:
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {


On Mon, Jun 8, 2020 at 12:07 PM Srinivas V <srini.vyr@gmail.com> wrote:
Ya, I had asked this question before. No one responded. By the way, what’s your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <mailinglists19@gmail.com> wrote:
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}



On Mon, Jun 8, 2020 at 11:54 AM Something Something <mailinglists19@gmail.com> wrote:
Right, this is exactly how I've it right now. Problem is in the cluster mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you will see what I mean. 

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini.vyr@gmail.com> wrote:

You don’t need to have a separate class. I created that as it has lot of code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
  

On Mon, Jun 8, 2020 at 11:14 AM Something Something <mailinglists19@gmail.com> wrote:
Great. I guess the trick is to use a separate class such as 'StateUpdateTask'. I will try that. My challenge is to convert this into Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezhang@outlook.com> wrote:
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
                <...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

--
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <wezhang@outlook.com> wrote:

> Yes, verified on the cluster with 5 executors.
>
> --
> Cheers,
> -z
>
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <mailinglists19@gmail.com> wrote:
>
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> >
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezhang@outlook.com> wrote:
> >
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <srini.vyr@gmail.com> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezhang@outlook.com> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <srini.vyr@gmail.com> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > mailinglists19@gmail.com> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <srini.vyr@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> mailinglists19@gmail.com> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <srini.vyr@gmail.com>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> mailinglists19@gmail.com> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezhang@outlook.com
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <mailinglists19@gmail.com>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> mailinglists19@gmail.com<mailto:mailinglists19@gmail.com>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>