kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xiyuan Hu <xiyuan.h...@gmail.com>
Subject Re: kafka stream ktable with suppress operator
Date Mon, 04 Nov 2019 15:42:00 GMT
Hi Matthias,

Could you help with above issue? Or any suggestions?

Thanks a lot!

On Thu, Oct 31, 2019 at 4:00 PM Xiyuan Hu <xiyuan.huhu@gmail.com> wrote:
>
> Hi Matthias,
>
> Some additional information, after I restart the app, it went to
> endless rebalancing. Join rate loos like below attachment. It's
> basically rebalanced every 5 minutes. I checked into each node
> logging. And found below warning:
>
> On node A:
> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,543 WARN
> [kafka-coordinator-heartbeat-thread | XXX]
> o.a.k.c.c.i.AbstractCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] This member will
> leave the group because consumer poll timeout has expired. This means
> the time between subsequent calls to poll() was longer than the
> configured max.poll.interval.ms, which typically implies that the poll
> loop is spending too much time processing messages. You can address
> this either by increasing max.poll.interval.ms or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> 2019/10/31 10:13:46 | 2019-10-31 10:13:46,544 INFO
> [kafka-coordinator-heartbeat-thread | XXX]
> o.a.k.c.c.i.AbstractCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Member
> XXX-StreamThread-1-consumer-7307ab88-9724-4af8-99b8-5d1c3ef5294f
> sending LeaveGroup request to coordinator xx:9092 (id: 2147483644
> rack: null)
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,766 INFO
> [XXX-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store
> KSTREAM-REDUCE-STATE-STORE-0000000003.1572480000000 in regular mode
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,767 INFO
> [XXX-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader stream-thread
> [XXX-StreamThread-1] Restoring task 1_3's state store
> KTABLE-SUPPRESS-STATE-STORE-0000000009 from beginning of the changelog
> XXX-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-3
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
> [XXX-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [Consumer
> clientId=XXX-StreamThread-1-consumer, groupId=XXX] Revoking previously
> assigned partitions
> [XXX-KSTREAM-REDUCE-STATE-STORE-0000000003-repartition-3]
> 2019/10/31 10:13:52 | 2019-10-31 10:13:52,794 INFO
> [XXX-StreamThread-1] o.a.k.s.p.internals.StreamThread stream-thread
> [XXX-StreamThread-1] State transition from PARTITIONS_ASSIGNED to
> PARTITIONS_REVOKED
>
> While on other nodes say: 2019/10/31 10:13:47 Attempt to heartbeat
> failed since group is rebalancing.
>
> If my understanding is correct, above warning caused the group
> rebalancing? My questions are:
> 1) Why it only happened after restart?
> 2) Even if it rebalanced, why it keeps rebalancing like a endless
> loop? I can't understand the behavior here.
> 3) You mentioned reduce() is using RocksDB by default, but I also
> noticed offset has been set to zero for reduce changelog topic. Is it
> wrong?
>
> Thanks a lot for the help!
>
> On Thu, Oct 31, 2019 at 8:11 AM Xiyuan Hu <xiyuan.huhu@gmail.com> wrote:
> >
> > Hi Matthias,
> >
> > When I redeployment the application with the same application Id, it
> > will cause a rebalance loop: partition revoked -> rebalance -> offset
> > reset to zero -> partition assigned -> partition revoked.
> >
> > The app was running well before the redeployment, but once redeployed,
> > it will keep rebalancing for hours and I have to switch to a new
> > application id to stop that.
> >
> > You mentioned that, reduce() could use RocksDB as stores by default
> > while suppress() is in memory. Is that the reason that reduce() has
> > both -repartition and -changelog topics while suppress() only has
> > -changelog topic?
> >
> > And will that be related to the shutdown hook? If I don't provide
> > shutdown hook and perform a redeployment, will it cause above issue?
> >
> > Thanks!
> >
> > On Thu, Oct 31, 2019 at 5:55 AM Matthias J. Sax <matthias@confluent.io> wrote:
> > >
> > > Just a follow up: currently, suppress() only supports in-memory stores
> > > (note, that `suppress()` has it's own store).
> > >
> > > For the actually `reduce()` store, you can pick between RocksDB and
> > > in-memory (default is RocksDB). Hence, if you restart an application on
> > > the same host, it should not be necessary to reload the state from the
> > > changelog topic if you use RocksDB.
> > >
> > > However, the suppress buffer must be recreated from the
> > > suppress-changelog topics on restart atm.
> > >
> > > Originally, `suppress()` intended to support persistent stores as well,
> > > but it was not implement yet. We hope to close this gap in the future.
> > >
> > > >>> I haven't figured out the reason, but after restart, the app will
keep
> > > >>> reset changelog topic offset to ZERO and trigger rebalance.
> > >
> > > Resetting to zero would happen is the full state needs to be recovered.
> > > However, this should not result in a rebalance. Can you elaborate on the
> > > rebalancing issue you described?
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 10/28/19 5:54 PM, Alex Brekken wrote:
> > > > I assume you're using RocksDB as your state stores... The bytes out you're
> > > > seeing on the changelog topics is probably because they are restoring
your
> > > > state stores.  If your state stores are in-memory, then on every
> > > > application startup they're going to be restored from the changelog
> > > > topics.  If your state stores are persistent (saved to disk), then a
> > > > restore can still happen if you've lost your filesystem.  (maybe you're
> > > > doing a state store cleanup on startup/shutdown, or have temporal storage
> > > > such as emptyDir in k8s, for example)  So *I think* what you're seeing
is
> > > > normal, though if you want to dig deeper there are rocksdb metrics that
can
> > > > be exposed and will show restore related info.  Additionally, there is
a
> > > > StateRestoreListener interface that you can implement if you'd like to
log
> > > > some of the state store restoration details.
> > > >
> > > > Alex
> > > >
> > > > On Mon, Oct 28, 2019 at 4:41 PM Xiyuan Hu <xiyuan.huhu@gmail.com>
wrote:
> > > >
> > > >> Hi,
> > > >> I'm using 2.3.1 now and having the same issue. During restarting,
I
> > > >> noticed a lot logging like below:
> > > >> Seeking to EARLIEST offset of partition
> > > >> XX-KSTREAM-REDUCE-STATE-STORE-0000000014-changelog-41
> > > >> Seeking to EARLIEST offset of partition
> > > >> XX-KTABLE-SUPPRESS-STATE-STORE-0000000020-changelog-41
> > > >>
> > > >> After restarting, the bytesout of changelog topic is as high as
> > > >> 800-900MB/s while normally, it has zero bytes out. Is this expected?
> > > >> I haven't figured out the reason, but after restart, the app will
keep
> > > >> reset changelog topic offset to ZERO and trigger rebalance. It seems
a
> > > >> dead loop?
> > > >> Rebalance -> reset to ZERO -> rebalance
> > > >>
> > > >> Is there any config I should set?
> > > >>
> > > >> Thanks!
> > > >>
> > > >> On Sun, Oct 27, 2019 at 11:25 PM Matthias J. Sax <matthias@confluent.io>
> > > >> wrote:
> > > >>>
> > > >>> What version are you using? We fixed couple of bugs in `suppress()`
-- I
> > > >>> would recommend to use latest 2.3.1 bug-fix release.
> > > >>>
> > > >>>
> > > >>> -Matthia
> > > >>>
> > > >>> On 10/25/19 9:12 AM, Tao Wang wrote:
> > > >>>> When using suppress operator with windowed Ktable, it looks
like
> > > >> restarting the kafka stream causes the aggregated messages from the
> > > >> SUPPRESS-STATE-STORE published again..
> > > >>>>
> > > >>>> Here is the sudo code .. anything I am missing or anything
can be done
> > > >> to avoid this ..
> > > >>>>
> > > >>>>
> > > >>>> KTable<Windowed<String>, String> test = <KStreamObject>
> > > >>>> .groupByKey()
> > > >>>>
> > > >> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(3)))
> > > >>>> .aggregate(
> > > >>>>         ....
> > > >>>>         ,
> > > >>>>
> > > >>  Materialized.<String,String,WindowStore<String,String>>as("aggregated-stream21-store")
> > > >>>>                     .withRetention(Duration.ofMinutes(5))
> > > >>>>                     .with(Serdes.String(), Serdes.String())
> > > >>>>    )
> > > >>>>
> > > >> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >>>>
> > > >>>> .toStream()
> > > >>>>
> > > >>>> .to("<topic_out>")
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> So when restarting the stream app, the <topic_out> will
have
> > > >> duplicated messages from a while back ... is this expected behavior
?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Tao Wang
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> ________________________________
> > > >>>>
> > > >>>> This message may contain confidential information and is intended
for
> > > >> specific recipients unless explicitly noted otherwise. If you have
reason
> > > >> to believe you are not an intended recipient of this message, please
delete
> > > >> it and notify the sender. This message may not represent the opinion
of
> > > >> Intercontinental Exchange, Inc. (ICE), its subsidiaries or affiliates,
and
> > > >> does not constitute a contract or guarantee. Unencrypted electronic
mail is
> > > >> not secure and the recipient of this message is expected to provide
> > > >> safeguards from viruses and pursue alternate means of communication
where
> > > >> privacy or a binding message is desired.
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >

Mime
View raw message