kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: deduplication strategy for Kafka Streams DSL
Date Wed, 24 Jan 2018 22:43:32 GMT
Hi Gouzhang,

Here it is:

topology.stream(MAILBOX_OPERATION_REQUESTS, Consumed.with(byteStringSerde,
mailboxOperationRequestSerde))
  .flatMap(entityTopologyProcessor::distributeMailboxOperation)
  .groupByKey(Serialized.with(byteStringSerde,
mailboxOperationRequestSerde))
  .reduce((a, b) -> b, Materialized.with(byteStringSerde,
mailboxOperationRequestSerde));




On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Dmitry,
>
> For your topology it is not expected to happen, could you elaborate a bit
> more on your code snippet as well as the input data? Is there a good way to
> re-produce it?
>
>
> Guozhang
>
>
> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <dminkovsky@gmail.com>
> wrote:
>
> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group by
> ->
> > reduce. It emits duplicate key/value/timestamps (i.e. total duplicates).
> >
> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <dminkovsky@gmail.com>
> > wrote:
> >
> > > Can someone explain what is causing this? I am experiencing this too.
> My
> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` are
> at
> > > their default values, so quite substantial. I tried raising them but it
> > had
> > > no effect.
> > >
> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <artmro@gmail.com>
> > wrote:
> > >
> > >> Hi
> > >> I run an app where I transform KTable to stream and then I groupBy and
> > >> aggregate and capture the results in KTable again. That generates many
> > >> duplicates.
> > >>
> > >> I have played with exactly once semantics that seems to reduce
> > duplicates
> > >> for records that should be unique. But I still get duplicates on keys
> > that
> > >> have two or more records.
> > >>
> > >> I could not reproduce it on small number of records so I disable
> caching
> > >> by
> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> > loads
> > >> of duplicates, even these previously eliminated by exactly once
> > semantics.
> > >> Now I have hard time to enable it again on Confluent 3.3.
> > >>
> > >> But, generally what it the best deduplication strategy for Kafka
> > Streams?
> > >>
> > >> Artur
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message