kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ofir Sharony <ofir.shar...@myheritage.com>
Subject Re: Kafka Streams - ordering grouped messages
Date Sun, 05 Mar 2017 10:24:34 GMT
Thanks guys,

I would like to continue where we stopped (late arriving records):

As I understand, the best practice to handle late arriving records is
enabling Kafka log compaction, thus keeping only the latest record of a
certain key.
As log compaction starts to do its magic only on non-active segments, I'm
trying to understand what's the best approach in case I want to send my
data downstream in real time.

Would you advise to plan my downstream apps to handle these key
duplications, or there's any way to remove them in real time or close to it
(let's say up to 1 minute)?

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 | ofir.sharony@myheritage.com | www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel

<http://www.myheritage.com/>

<https://www.facebook.com/myheritage>
<https://twitter.com/myheritage>         <http://blog.myheritage.com/>
    <https://www.youtube.com/user/MyHeritageLtd>


On Wed, Mar 1, 2017 at 7:44 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Just wanted to add, that there is always the potential about late
> arriving records, and thus, ordering by timestamp will never be perfect...
>
> You should rather try to design you application in a way such that it
> can handle out-of-order data gracefully and try to avoid the necessity
> of ordering records by timestamp.
>
>
> -Matthias
>
> On 3/1/17 7:31 AM, Damian Guy wrote:
> > You could implement your own based sorting algorithm using the low level
> > processor api, i.e, you have a processor that keeps a sorted list of
> > records and then, periodically, perhaps on punctuate, it emits the sorted
> > messages downstream. You could do something like:
> >
> >     builder.stream("topic").transform(new TransformerSupplier() {
> >
> >         @Override
> >         public Transformer get() {
> >             return new TheTransformer();
> >     }
> > }).groupByKey().reduce(..);
> >
> > Where the TheTransformer might look something like:
> >
> > private static class TheTransformer<K, V, R> implements Transformer<K,
> V, R> {
> >     private ProcessorContext context;
> >     private TreeMap<K, V> sorted = new TreeMap<>();
> >
> >     @Override
> >     public void init(final ProcessorContext context) {
> >         this.context = context;
> >         context.schedule(1000); // punctuate every 1 second of
> streams-time
> >     }
> >
> >     @Override
> >     public R transform(final K key, final V value) {
> >         // do stuff
> >         sorted.put(key, value);
> >     }
> >
> >     @Override
> >     public R punctuate(final long timestamp) {
> >         for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
> >             context.forward(kvEntry.getKey(), kvEntry.getValue());
> >         }
> >         sorted.clear();
> >         return null;
> >     }
> >
> >     @Override
> >     public void close() {
> >
> >     }
> > }
> >
> >
> >
> >
> >
> > On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <ofir.sharony@myheritage.com>
> > wrote:
> >
> >> Is there any way to sort grouped records before sending them to the
> >> reducer?
> >>
> >> *Ofir Sharony*
> >> BackEnd Tech Lead
> >>
> >> Mobile: +972-54-7560277 <+972%2054-756-0277> |
> ofir.sharony@myheritage.com
> >> | www.myheritage.com
> >> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >>
> >> <http://www.myheritage.com/>
> >>
> >> <https://www.facebook.com/myheritage>
> >> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> >>     <https://www.youtube.com/user/MyHeritageLtd>
> >>
> >>
> >> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <damian.guy@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> The TimestampExtractor won't effect the order the records arrive in. It
> >>> just provides a way for developers to use a timestamp other than the
> >>> default.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <ofir.sharony@myheritage.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I have the following code on a stream:
> >>>>
> >>>> .selectKey(...)
> >>>> .groupByKey(...)
> >>>> .reduce(...)
> >>>>
> >>>> The records arrived to the Reducer function in the same order they
> were
> >>>> consumed from Kafka
> >>>> I have implemented a TimestampExtractor, extracting the wanted
> >> timestamp
> >>>> from each record, but unfortunately this didn't have any effect on the
> >>>> order the messages were received in the Reducer.
> >>>>
> >>>> Any thoughts on that?
> >>>> Thanks,
> >>>>
> >>>> *Ofir Sharony*
> >>>> BackEnd Tech Lead
> >>>>
> >>>> Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> >> <+972%2054-756-0277> |
> >>>> ofir.sharony@myheritage.com
> >>>> | www.myheritage.com
> >>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> >>>>
> >>>> <http://www.myheritage.com/>
> >>>>
> >>>> <https://www.facebook.com/myheritage>
> >>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/
> >
> >>>>     <https://www.youtube.com/user/MyHeritageLtd>
> >>>>
> >>>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message