kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: deduplication strategy for Kafka Streams DSL
Date Thu, 25 Jan 2018 14:54:51 GMT
You may not be surprised that after further investigation it turns out this
was related to some logic in my topology.

On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkovsky@gmail.com>
wrote:

> Hi Gouzhang,
>
> Here it is:
>
> topology.stream(MAILBOX_OPERATION_REQUESTS,
> Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
>   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
>   .groupByKey(Serialized.with(byteStringSerde,
> mailboxOperationRequestSerde))
>   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> mailboxOperationRequestSerde));
>
>
>
>
> On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> Dmitry,
>>
>> For your topology it is not expected to happen, could you elaborate a bit
>> more on your code snippet as well as the input data? Is there a good way
>> to
>> re-produce it?
>>
>>
>> Guozhang
>>
>>
>> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <dminkovsky@gmail.com>
>> wrote:
>>
>> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group by
>> ->
>> > reduce. It emits duplicate key/value/timestamps (i.e. total duplicates).
>> >
>> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <dminkovsky@gmail.com
>> >
>> > wrote:
>> >
>> > > Can someone explain what is causing this? I am experiencing this too.
>> My
>> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` are
>> at
>> > > their default values, so quite substantial. I tried raising them but
>> it
>> > had
>> > > no effect.
>> > >
>> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <artmro@gmail.com>
>> > wrote:
>> > >
>> > >> Hi
>> > >> I run an app where I transform KTable to stream and then I groupBy
>> and
>> > >> aggregate and capture the results in KTable again. That generates
>> many
>> > >> duplicates.
>> > >>
>> > >> I have played with exactly once semantics that seems to reduce
>> > duplicates
>> > >> for records that should be unique. But I still get duplicates on keys
>> > that
>> > >> have two or more records.
>> > >>
>> > >> I could not reproduce it on small number of records so I disable
>> caching
>> > >> by
>> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
>> > loads
>> > >> of duplicates, even these previously eliminated by exactly once
>> > semantics.
>> > >> Now I have hard time to enable it again on Confluent 3.3.
>> > >>
>> > >> But, generally what it the best deduplication strategy for Kafka
>> > Streams?
>> > >>
>> > >> Artur
>> > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message