spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Srinivas V <srini....@gmail.com>
Subject Re: Using Spark Accumulators with Structured Streaming
Date Mon, 08 Jun 2020 19:06:59 GMT
Ya, I had asked this question before. No one responded. By the way, what’s
your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <
mailinglists19@gmail.com> wrote:

> What is scary is this interface is marked as "experimental"
>
> @Experimental
> @InterfaceStability.Evolving
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>   R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
> }
>
>
>
>
> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Right, this is exactly how I've it right now. Problem is in the cluster
>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>> will see what I mean.
>>
>> I think how Zhang is using will work. Will try & revert.
>>
>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <srini.vyr@gmail.com> wrote:
>>
>>>
>>> You don’t need to have a separate class. I created that as it has lot of
>>> code and logic in my case.
>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>> Pasting it below for your quick reference:
>>>
>>> ```scala
>>>     spark.streams.addListener(new StreamingQueryListener {
>>>       override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
>>> Unit = {
>>>         println(event.progress.id + " is on progress")
>>>         println(s"My accu is ${myAcc.value} on query progress")
>>>       }
>>>         ...
>>>     })
>>>
>>>     def mappingFunc(key: Long, values: Iterator[String], state:
>>> GroupState[Long]): ... = {
>>>       myAcc.add(1)
>>>       println(s">>> key: $key => state: ${state}")
>>>         ...
>>>     }
>>>
>>>     val wordCounts = words
>>>       .groupByKey(v => ...)
>>>       .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func
>>> = mappingFunc)
>>>
>>>     val query = wordCounts.writeStream
>>>       .outputMode(OutputMode.Update)
>>>
>>>
>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> Great. I guess the trick is to use a separate class such as
>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>
>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <wezhang@outlook.com> wrote:
>>>>
>>>>> The following Java codes can work in my cluster environment:
>>>>> ```
>>>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>>                 myAcc.add(1);
>>>>>                 <...>
>>>>>                 state.update(newState);
>>>>>                 return new LeadingCharCount(key, newState);
>>>>>             },
>>>>>             Encoders.LONG(),
>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>>>> ```
>>>>>
>>>>> Also works fine with my `StateUpdateTask`:
>>>>> ```
>>>>>     .mapGroupsWithState(
>>>>>             new StateUpdateTask(myAcc),
>>>>>             Encoders.LONG(),
>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> public class StateUpdateTask
>>>>>             implements MapGroupsWithStateFunction<String, String,
>>>>> Long, LeadingCharCount> {
>>>>>         private LongAccumulator myAccInTask;
>>>>>
>>>>>         public StateUpdateTask(LongAccumulator acc) {
>>>>>             this.myAccInTask = acc;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public LeadingCharCount call(String key, Iterator<String>
>>>>> values, GroupState<Long> state) throws Exception {
>>>>>             myAccInTask.add(1);
>>>>>             <...>
>>>>>             state.update(newState);
>>>>>             return new LeadingCharCount(key, newState);
>>>>>         }
>>>>> }
>>>>> ```
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> -z
>>>>>
>>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>>> ZHANG Wei <wezhang@outlook.com> wrote:
>>>>>
>>>>> > Yes, verified on the cluster with 5 executors.
>>>>> >
>>>>> > --
>>>>> > Cheers,
>>>>> > -z
>>>>> >
>>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>>> > Something Something <mailinglists19@gmail.com> wrote:
>>>>> >
>>>>> > > Did you try this on the Cluster? Note: This works just fine
under
>>>>> 'Local'
>>>>> > > mode.
>>>>> > >
>>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <wezhang@outlook.com>
>>>>> wrote:
>>>>> > >
>>>>> > > > I can't reproduce the issue with my simple code:
>>>>> > > > ```scala
>>>>> > > >     spark.streams.addListener(new StreamingQueryListener
{
>>>>> > > >       override def onQueryProgress(event:
>>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>>> > > >         println(event.progress.id + " is on progress")
>>>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>>>> > > >       }
>>>>> > > >         ...
>>>>> > > >     })
>>>>> > > >
>>>>> > > >     def mappingFunc(key: Long, values: Iterator[String],
state:
>>>>> > > > GroupState[Long]): ... = {
>>>>> > > >       myAcc.add(1)
>>>>> > > >       println(s">>> key: $key => state: ${state}")
>>>>> > > >         ...
>>>>> > > >     }
>>>>> > > >
>>>>> > > >     val wordCounts = words
>>>>> > > >       .groupByKey(v => ...)
>>>>> > > >       .mapGroupsWithState(timeoutConf =
>>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>>> > > >
>>>>> > > >     val query = wordCounts.writeStream
>>>>> > > >       .outputMode(OutputMode.Update)
>>>>> > > >         ...
>>>>> > > > ```
>>>>> > > >
>>>>> > > > I'm wondering if there were any errors can be found from
driver
>>>>> logs? The
>>>>> > > > micro-batch
>>>>> > > > exceptions won't terminate the streaming job running.
>>>>> > > >
>>>>> > > > For the following code, we have to make sure that
>>>>> `StateUpdateTask` is
>>>>> > > > started:
>>>>> > > > >                 .mapGroupsWithState(
>>>>> > > > >                         new
>>>>> > > >
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> > > > > appConfig, accumulators),
>>>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>>>> > > > >
>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>> > > >
>>>>> > > > --
>>>>> > > > Cheers,
>>>>> > > > -z
>>>>> > > >
>>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>>> > > > Srinivas V <srini.vyr@gmail.com> wrote:
>>>>> > > >
>>>>> > > > > Giving the code below:
>>>>> > > > > //accumulators is a class level variable in driver.
>>>>> > > > >
>>>>> > > > >  sparkSession.streams().addListener(new
>>>>> StreamingQueryListener() {
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>>>> queryStarted) {
>>>>> > > > >                 logger.info("Query started: " +
>>>>> queryStarted.id());
>>>>> > > > >             }
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>>>> > > > > queryTerminated) {
>>>>> > > > >                 logger.info("Query terminated: "
+
>>>>> > > > queryTerminated.id());
>>>>> > > > >             }
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>>>> > > > queryProgress) {
>>>>> > > > >
>>>>> > > > >
>>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>>> > > > >                 long eventsReceived = 0;
>>>>> > > > >                 long eventsExpired = 0;
>>>>> > > > >                 long eventSentSuccess = 0;
>>>>> > > > >                 try {
>>>>> > > > >                     eventsReceived =
>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>>> > > > >                     eventsExpired =
>>>>> > > > >
>>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>>> > > > >                     eventSentSuccess =
>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>>> > > > >                 } catch (MissingKeyException e) {
>>>>> > > > >                     logger.error("Accumulator key
not found
>>>>> due to
>>>>> > > > > Exception {}", e.getMessage());
>>>>> > > > >                 }
>>>>> > > > >                 logger.info("Events Received:{}",
>>>>> eventsReceived);
>>>>> > > > >                 logger.info("Events State Expired:{}",
>>>>> eventsExpired);
>>>>> > > > >                 logger.info("Events Sent Success:{}",
>>>>> eventSentSuccess);
>>>>> > > > >                 logger.info("Query made progress
- batchId: {}
>>>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>>>>> > > > > durationMs:{}" ,
>>>>> > > > >                         queryProgress.progress().batchId(),
>>>>> > > > > queryProgress.progress().numInputRows(),
>>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>>> > > > >
>>>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>>>> > > > > queryProgress.progress().durationMs());
>>>>> > > > >
>>>>> > > > >
>>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <wezhang@outlook.com>
>>>>> wrote:
>>>>> > > > >
>>>>> > > > > > May I get how the accumulator is accessed in
the method
>>>>> > > > > > `onQueryProgress()`?
>>>>> > > > > >
>>>>> > > > > > AFAICT, the accumulator is incremented well.
There is a way
>>>>> to verify
>>>>> > > > that
>>>>> > > > > > in cluster like this:
>>>>> > > > > > ```
>>>>> > > > > >     // Add the following while loop before invoking
>>>>> awaitTermination
>>>>> > > > > >     while (true) {
>>>>> > > > > >       println("My acc: " + myAcc.value)
>>>>> > > > > >       Thread.sleep(5 * 1000)
>>>>> > > > > >     }
>>>>> > > > > >
>>>>> > > > > >     //query.awaitTermination()
>>>>> > > > > > ```
>>>>> > > > > >
>>>>> > > > > > And the accumulator value updated can be found
from driver
>>>>> stdout.
>>>>> > > > > >
>>>>> > > > > > --
>>>>> > > > > > Cheers,
>>>>> > > > > > -z
>>>>> > > > > >
>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>> > > > > > Srinivas V <srini.vyr@gmail.com> wrote:
>>>>> > > > > >
>>>>> > > > > > > yes, I am using stateful structured streaming.
Yes similar
>>>>> to what
>>>>> > > > you
>>>>> > > > > > do.
>>>>> > > > > > > This is in Java
>>>>> > > > > > > I do it this way:
>>>>> > > > > > >     Dataset<ModelUpdate> productUpdates
= watermarkedDS
>>>>> > > > > > >                 .groupByKey(
>>>>> > > > > > >                         (MapFunction<InputEventModel,
>>>>> String>) event
>>>>> > > > ->
>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>> > > > > > >                 .mapGroupsWithState(
>>>>> > > > > > >                         new
>>>>> > > > > > >
>>>>> > > > > >
>>>>> > > >
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> > > > > > > appConfig, accumulators),
>>>>> > > > > > >
>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>>>> > > > > > >
>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>> > > > > > >
>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>> > > > > > >
>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something
Something <
>>>>> > > > > > > mailinglists19@gmail.com> wrote:
>>>>> > > > > > >
>>>>> > > > > > > > Yes, that's exactly how I am creating
them.
>>>>> > > > > > > >
>>>>> > > > > > > > Question... Are you using 'Stateful
Structured
>>>>> Streaming' in which
>>>>> > > > > > you've
>>>>> > > > > > > > something like this?
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> > > > > > > >         updateAcrossEvents
>>>>> > > > > > > >       )
>>>>> > > > > > > >
>>>>> > > > > > > > And updating the Accumulator inside
>>>>> 'updateAcrossEvents'? We're
>>>>> > > > > > experiencing this only under 'Stateful Structured
>>>>> Streaming'. In other
>>>>> > > > > > streaming applications it works as expected.
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas
V <
>>>>> srini.vyr@gmail.com>
>>>>> > > > > > wrote:
>>>>> > > > > > > >
>>>>> > > > > > > >> Yes, I am talking about Application
specific
>>>>> Accumulators.
>>>>> > > > Actually I
>>>>> > > > > > am
>>>>> > > > > > > >> getting the values printed in
my driver log as well as
>>>>> sent to
>>>>> > > > > > Grafana. Not
>>>>> > > > > > > >> sure where and when I saw 0 before.
My deploy mode is
>>>>> “client” on
>>>>> > > > a
>>>>> > > > > > yarn
>>>>> > > > > > > >> cluster(not local Mac) where I
submit from master node.
>>>>> It should
>>>>> > > > > > work the
>>>>> > > > > > > >> same for cluster mode as well.
>>>>> > > > > > > >> Create accumulators like this:
>>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>>> sparkContext.longAccumulator(name);
>>>>> > > > > > > >>
>>>>> > > > > > > >>
>>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM
Something Something <
>>>>> > > > > > > >> mailinglists19@gmail.com> wrote:
>>>>> > > > > > > >>
>>>>> > > > > > > >>> Hmm... how would they go to
Graphana if they are not
>>>>> getting
>>>>> > > > > > computed in
>>>>> > > > > > > >>> your code? I am talking about
the Application Specific
>>>>> > > > Accumulators.
>>>>> > > > > > The
>>>>> > > > > > > >>> other standard counters such
as
>>>>> > > > 'event.progress.inputRowsPerSecond'
>>>>> > > > > > are
>>>>> > > > > > > >>> getting populated correctly!
>>>>> > > > > > > >>>
>>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39
PM Srinivas V <
>>>>> srini.vyr@gmail.com>
>>>>> > > > > > wrote:
>>>>> > > > > > > >>>
>>>>> > > > > > > >>>> Hello,
>>>>> > > > > > > >>>> Even for me it comes as
0 when I print in
>>>>> OnQueryProgress. I use
>>>>> > > > > > > >>>> LongAccumulator as well.
Yes, it prints on my local
>>>>> but not on
>>>>> > > > > > cluster.
>>>>> > > > > > > >>>> But one consolation is
that when I send metrics to
>>>>> Graphana, the
>>>>> > > > > > values
>>>>> > > > > > > >>>> are coming there.
>>>>> > > > > > > >>>>
>>>>> > > > > > > >>>> On Tue, May 26, 2020 at
3:10 AM Something Something <
>>>>> > > > > > > >>>> mailinglists19@gmail.com>
wrote:
>>>>> > > > > > > >>>>
>>>>> > > > > > > >>>>> No this is not working
even if I use LongAccumulator.
>>>>> > > > > > > >>>>>
>>>>> > > > > > > >>>>> On Fri, May 15, 2020
at 9:54 PM ZHANG Wei <
>>>>> wezhang@outlook.com
>>>>> > > > >
>>>>> > > > > > wrote:
>>>>> > > > > > > >>>>>
>>>>> > > > > > > >>>>>> There is a restriction
in AccumulatorV2 API [1],
>>>>> the OUT type
>>>>> > > > > > should
>>>>> > > > > > > >>>>>> be atomic or thread
safe. I'm wondering if the
>>>>> implementation
>>>>> > > > for
>>>>> > > > > > > >>>>>> `java.util.Map[T,
Long]` can meet it or not. Is
>>>>> there any
>>>>> > > > chance
>>>>> > > > > > to replace
>>>>> > > > > > > >>>>>> CollectionLongAccumulator
by
>>>>> CollectionAccumulator[2] or
>>>>> > > > > > LongAccumulator[3]
>>>>> > > > > > > >>>>>> and test if the
StreamingListener and other codes
>>>>> are able to
>>>>> > > > > > work?
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> ---
>>>>> > > > > > > >>>>>> Cheers,
>>>>> > > > > > > >>>>>> -z
>>>>> > > > > > > >>>>>> [1]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>>>> > > > > > > >>>>>> [2]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>>>> > > > > > > >>>>>> [3]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> ________________________________________
>>>>> > > > > > > >>>>>> From: Something
Something <mailinglists19@gmail.com
>>>>> >
>>>>> > > > > > > >>>>>> Sent: Saturday,
May 16, 2020 0:38
>>>>> > > > > > > >>>>>> To: spark-user
>>>>> > > > > > > >>>>>> Subject: Re: Using
Spark Accumulators with
>>>>> Structured
>>>>> > > > Streaming
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Can someone from
Spark Development team tell me if
>>>>> this
>>>>> > > > > > functionality
>>>>> > > > > > > >>>>>> is supported and
tested? I've spent a lot of time
>>>>> on this but
>>>>> > > > > > can't get it
>>>>> > > > > > > >>>>>> to work. Just
to add more context, we've our own
>>>>> Accumulator
>>>>> > > > > > class that
>>>>> > > > > > > >>>>>> extends from AccumulatorV2.
In this class we keep
>>>>> track of
>>>>> > > > one or
>>>>> > > > > > more
>>>>> > > > > > > >>>>>> accumulators.
Here's the definition:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>>> > > > > > > >>>>>>     extends AccumulatorV2[T,
java.util.Map[T, Long]]
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> When the job begins
we register an instance of this
>>>>> class:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>>> "MyAccumulator")
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Is this working
under Structured Streaming?
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> I will keep looking
for alternate approaches but
>>>>> any help
>>>>> > > > would be
>>>>> > > > > > > >>>>>> greatly appreciated.
Thanks.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> On Thu, May 14,
2020 at 2:36 PM Something Something
>>>>> <
>>>>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>>>>> mailinglists19@gmail.com>>
>>>>> > > > wrote:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> In my structured
streaming job I am updating Spark
>>>>> > > > Accumulators in
>>>>> > > > > > > >>>>>> the updateAcrossEvents
method but they are always 0
>>>>> when I
>>>>> > > > try to
>>>>> > > > > > print
>>>>> > > > > > > >>>>>> them in my StreamingListener.
Here's the code:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> > > > > > > >>>>>>         updateAcrossEvents
>>>>> > > > > > > >>>>>>       )
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> The accumulators
get incremented in
>>>>> 'updateAcrossEvents'.
>>>>> > > > I've a
>>>>> > > > > > > >>>>>> StreamingListener
which writes values of the
>>>>> accumulators in
>>>>> > > > > > > >>>>>> 'onQueryProgress'
method but in this method the
>>>>> Accumulators
>>>>> > > > are
>>>>> > > > > > ALWAYS
>>>>> > > > > > > >>>>>> ZERO!
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> When I added log
statements in the
>>>>> updateAcrossEvents, I
>>>>> > > > could see
>>>>> > > > > > > >>>>>> that these accumulators
are getting incremented as
>>>>> expected.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> This only happens
when I run in the 'Cluster' mode.
>>>>> In Local
>>>>> > > > mode
>>>>> > > > > > it
>>>>> > > > > > > >>>>>> works fine which
implies that the Accumulators are
>>>>> not getting
>>>>> > > > > > distributed
>>>>> > > > > > > >>>>>> correctly - or
something like that!
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Note: I've seen
quite a few answers on the Web that
>>>>> tell me to
>>>>> > > > > > > >>>>>> perform an "Action".
That's not a solution here.
>>>>> This is a
>>>>> > > > > > 'Stateful
>>>>> > > > > > > >>>>>> Structured Streaming'
job. Yes, I am also
>>>>> 'registering' them
>>>>> > > > in
>>>>> > > > > > > >>>>>> SparkContext.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> >
>>>>> > ---------------------------------------------------------------------
>>>>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>> >
>>>>>
>>>>

Mime
View raw message