kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Brekken <brek...@gmail.com>
Subject Re: Is there a way to prevent duplicate messages to downstream
Date Wed, 11 Dec 2019 01:56:42 GMT
I've never used that dedup transformer before, but what you've got looks
right. (though if there's a way to hash your message value, or somehow get
a guid out of it that might be preferable)  As you probably noticed it's
state is Windowed - so if your use-case depends on being able to remove
duplicate events over a large (or infinite) time-span, than this solution
probably isn't a good fit.

Alex

On Tue, Dec 10, 2019 at 8:40 AM Sachin Mittal <sjmittal@gmail.com> wrote:

> Hi Alex,
> Thanks for the quick response.
> What I have is around 8 streams branched from a single stream, that down
> the line again gets joined into 1.
> Now each branched stream can have duplicates and when joining all this data
> I just have kind of endless tuples of data.
>
> So what I was thinking what if I can actually remove all the duplicates
> right at start then I will have manageable data to do the joins.
>
> So I checked the code, and wanted to know how can this be inserted into
> existing pipeline. Basically my current code was something like this:
> Properties props = new Properties();
> ....
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<K, V> input = builder.stream("input-topic");
> input.... //my pipeline starts
> .....
> final Topology topology = builder.build(props);
> final KafkaStreams streams = new KafkaStreams(topology, props);
> ......
> streams.start();
>
> This will change to:
> Properties props = new Properties();
> ....
> final StoreBuilder<WindowStore<K, V>> dedupStoreBuilder = .....
> .....
> final KStream<K, V> input = builder.stream("input-topic");
> final KStream<K, V> deduplicated = input.transform(() -> new
> DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value));
> deduplicated.... //my existing pipeline
> ..... //rest same as before
> streams.start();
>
> Let me know if I got this right.
>
> Thanks
> Sachin
>
>
> On Tue, Dec 10, 2019 at 6:59 PM Alex Brekken <brekkal@gmail.com> wrote:
>
> > Hi Sachin, is your goal to prevent any records with a duplicate key from
> > ever getting sent downstream?  The KTable you have in your example will
> of
> > course have the most recent record for a given key, but it will still
> emit
> > updates.  So if key "A" arrives a second time (with no change to the
> > value), it will still emitted. (depending on how rapidly you get
> duplicate
> > events, some might get removed by internal caching but you will still
> > likely get at least 1 of those duplicates sent further downstream through
> > the topology)  Take a look at this example from Confluent to see if it
> > would work for your case:
> >
> >
> https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
> > .
> >
> >
> > Also, what is the reason for wanting to eliminate duplicates?  Do you
> have
> > downstream aggregators that you don't want to incorrectly count
> duplicated
> > events?
> >
> > Alex
> >
> > On Tue, Dec 10, 2019 at 7:05 AM Sachin Mittal <sjmittal@gmail.com>
> wrote:
> >
> > > Hi,
> > > I am using streams and I get messages like: (K, V)
> > > (A, a), (B, b), (C, c), (A, a), (C, c), (A, a) .....
> > > I wanted to define a topology which would filter out duplicate messages
> > > from upstream.
> > >
> > > I want to know if this is possible?
> > > The code I have written to do this is something like this:
> > >
> > > source.groupBy((k, v) -> new Key(k, v))
> > >       .reduce((av, nv) -> nv)
> > >       .toStream()
> > >
> > > So basically I create a new key which is combination of existing (k,v).
> > > Then I group by it and reduce it to a table to just store the final
> > value.
> > > Finally I convert that to a stream to be used downstream.
> > >
> > > My question is is that would this logic work?
> > > Like if I get another message (A, a) it will basically replace the
> > existing
> > > (A, a) in the table and no new message would get appended to the
> > resulting
> > > stream.
> > >
> > > Is my understanding correct?
> > >
> > > If not then is there any other way to achieve this?
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message