kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Kafka Streams - ordering grouped messages
Date Wed, 01 Mar 2017 15:31:27 GMT
You could implement your own based sorting algorithm using the low level
processor api, i.e, you have a processor that keeps a sorted list of
records and then, periodically, perhaps on punctuate, it emits the sorted
messages downstream. You could do something like:

    builder.stream("topic").transform(new TransformerSupplier() {

        @Override
        public Transformer get() {
            return new TheTransformer();
    }
}).groupByKey().reduce(..);

Where the TheTransformer might look something like:

private static class TheTransformer<K, V, R> implements Transformer<K, V, R> {
    private ProcessorContext context;
    private TreeMap<K, V> sorted = new TreeMap<>();

    @Override
    public void init(final ProcessorContext context) {
        this.context = context;
        context.schedule(1000); // punctuate every 1 second of streams-time
    }

    @Override
    public R transform(final K key, final V value) {
        // do stuff
        sorted.put(key, value);
    }

    @Override
    public R punctuate(final long timestamp) {
        for (final Map.Entry<K, V> kvEntry : sorted.entrySet()) {
            context.forward(kvEntry.getKey(), kvEntry.getValue());
        }
        sorted.clear();
        return null;
    }

    @Override
    public void close() {

    }
}





On Wed, 1 Mar 2017 at 13:04 Ofir Sharony <ofir.sharony@myheritage.com>
wrote:

> Is there any way to sort grouped records before sending them to the
> reducer?
>
> *Ofir Sharony*
> BackEnd Tech Lead
>
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>
> <http://www.myheritage.com/>
>
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
>
>
> On Wed, Mar 1, 2017 at 3:03 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi,
> >
> > The TimestampExtractor won't effect the order the records arrive in. It
> > just provides a way for developers to use a timestamp other than the
> > default.
> >
> > Thanks,
> > Damian
> >
> > On Wed, 1 Mar 2017 at 12:34 Ofir Sharony <ofir.sharony@myheritage.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have the following code on a stream:
> > >
> > > .selectKey(...)
> > > .groupByKey(...)
> > > .reduce(...)
> > >
> > > The records arrived to the Reducer function in the same order they were
> > > consumed from Kafka
> > > I have implemented a TimestampExtractor, extracting the wanted
> timestamp
> > > from each record, but unfortunately this didn't have any effect on the
> > > order the messages were received in the Reducer.
> > >
> > > Any thoughts on that?
> > > Thanks,
> > >
> > > *Ofir Sharony*
> > > BackEnd Tech Lead
> > >
> > > Mobile: +972-54-7560277 <+972%2054-756-0277> <+972%2054-756-0277>
> <+972%2054-756-0277> |
> > > ofir.sharony@myheritage.com
> > > | www.myheritage.com
> > > MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> > >
> > > <http://www.myheritage.com/>
> > >
> > > <https://www.facebook.com/myheritage>
> > > <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
> > >     <https://www.youtube.com/user/MyHeritageLtd>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message