kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Kafka Streams - ordering grouped messages
Date Mon, 06 Mar 2017 10:06:35 GMT
Hi Ofir,

My advice it to handle the duplicates. As you said compaction only runs on
the non-active segments. There could be duplicates in the active segment.
Further, even after compaction has run there could still be duplicates.
You can attempt to minimize the occurrence of duplicates by adjusting the
segment size of the topic(s) in question. If you have a smaller segment
size then compaction will get a chance to run more frequently, however this
also means you'll have more files.

Thanks,
Damian

On Sun, 5 Mar 2017 at 10:24 Ofir Sharony <ofir.sharony@myheritage.com>
wrote:

> Thanks guys,
>
> I would like to continue where we stopped (late arriving records):
>
> As I understand, the best practice to handle late arriving records is
> enabling Kafka log compaction, thus keeping only the latest record of a
> certain key.
> As log compaction starts to do its magic only on non-active segments, I'm
> trying to understand what's the best approach in case I want to send my
> data downstream in real time.
>
> Would you advise to plan my downstream apps to handle these key
> duplications, or there's any way to remove them in real time or close to it
> (let's say up to 1 minute)?
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> <http://www.myheritage.com/>
>
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
>
>
> On Wed, Mar 1, 2017 at 7:44 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Just wanted to add, that there is always the potential about late
> > arriving records, and thus, ordering by timestamp will never be
> perfect...
> >
> > You should rather try to design you application in a way such that it
> > can handle out-of-order data gracefully and try to avoid the necessity
> > of ordering records by timestamp.
> >
> >
> > -Matthias
> >
> > On 3/1/17 7:31 AM, Damian Guy wrote:
> > > You could implement your own based sorting algorithm using the low
> level
> > > processor api, i.e, you have a processor that keeps a sorted list of
> > > records and then, periodically, perhaps on punctuate, it emits the
> sorted
> > > messages downstream. You could do something like:
> > >
> > >     builder.stream("topic").transform(new TransformerSupplier() {
> > >
> > >         @Override
> > >         public Transformer get() {
> > >             return new TheTransformer();
> > >     }
> > > }).groupByKey().reduce(..);
> > >
> > > Where the TheTransformer might look something like:
> > >
> > > private static class TheTransformer<K, V, R> implements Transformer<K,
> > V, R> {
> > >     private ProcessorContext context;
> > >     private TreeMap<K, V> sorted = new TreeMap<>();
> > >
> > >     @Override
> > >     public void init(final ProcessorContext context) {
> > >         this.context = context;
> > >         context.schedule(1000); // punctuate every 1 second of
> > streams-time
> > >     }
> > >
> > >     @Override
> > >     public R transform(final K key, final V value) {
> > >         // do stuff
> > >         sorted.put(key, value);
> > >     }
> > >
> > >     @Override
> > >     public R punctuate(final long timestamp) {
> > >         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
> > >             context.forward(kvEntry.getKey(), kvEntry.getValue());
> > >         }
> > >         sorted.clear();
> > >         return null;
> > >     }
> > >
> > >     @Override
> > >     public void close() {
> > >
> > >     }
> > > }
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <ofir.sharony@myheritage.com>
> > > wrote:
> > >
> > >> Is there any way to sort grouped records before sending them to the
> > >> reducer?
> > >>
> > >> *Ofir Sharony*
> > >> BackEnd Tech Lead
> > >>
> > >> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
|
> > ofir.sharony@myheritage.com
> > >> | www.myheritage.com
> > >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >>
> > >> <http://www.myheritage.com/>
> > >>
> > >> <https://www.facebook.com/myheritage>
> > >> <https://twitter.com/myheritage>         <http://blog.myheritage.com/
> >
> > >>     <https://www.youtube.com/user/MyHeritageLtd>
> > >>
> > >>
> > >> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <damian.guy@gmail.com>
> > wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> The TimestampExtractor won't effect the order the records arrive in.
> It
> > >>> just provides a way for developers to use a timestamp other than the
> > >>> default.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <
> ofir.sharony@myheritage.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> I have the following code on a stream:
> > >>>>
> > >>>> .selectKey(...)
> > >>>> .groupByKey(...)
> > >>>> .reduce(...)
> > >>>>
> > >>>> The records arrived to the Reducer function in the same order they
> > were
> > >>>> consumed from Kafka
> > >>>> I have implemented a TimestampExtractor, extracting the wanted
> > >> timestamp
> > >>>> from each record, but unfortunately this didn't have any effect
on
> the
> > >>>> order the messages were received in the Reducer.
> > >>>>
> > >>>> Any thoughts on that?
> > >>>> Thanks,
> > >>>>
> > >>>> *Ofir Sharony*
> > >>>> BackEnd Tech Lead
> > >>>>
> > >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> <+972%2054-756-0277>
> > >> <+972%2054-756-0277> |
> > >>>> ofir.sharony@myheritage.com
> > >>>> | www.myheritage.com
> > >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >>>>
> > >>>> <http://www.myheritage.com/>
> > >>>>
> > >>>> <https://www.facebook.com/myheritage>
> > >>>> <https://twitter.com/myheritage>         <
> http://blog.myheritage.com/
> > >
> > >>>>     <https://www.youtube.com/user/MyHeritageLtd>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message