kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Out of order message processing with Kafka Streams
Date Tue, 21 Mar 2017 17:52:40 GMT
Hans,

Which class's javadocs should i look at? From my initial look at the
javadocs and discussion with Michael, it doesn't seem possible.

On Tue, Mar 21, 2017 at 10:44 PM, Hans Jespersen <hans@confluent.io> wrote:

> Yes, and yes!
>
> -hans
>
>
>
> > On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac200@gmail.com> wrote:
> >
> > That would require
> >
> > - Knowing the current window's id (or some other identifier) to
> > differentiate it from other windows
> >
> > - Being able to process individual messages in a window
> >
> > Are those 2 things possible w/ kafka streams? (java)
> >
> > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <hans@confluent.io>
> wrote:
> >
> >> While it's not exactly the same as the window start/stop time you can
> >> store (in the state store) the earliest and latest timestamps of any
> >> messages in each window and use that as a good approximation for the
> window
> >> boundary times.
> >>
> >> -hans
> >>
> >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:
> >>>
> >>> Yeah, windowing seems perfect, if only I could find out the current
> >>> window's start time (so I can log the current bucket's start & end
> times)
> >>> and process window messages individually rather than as aggregates.
> >>>
> >>> It doesn't seem like i can get this metadata from ProcessorContext
> >> though,
> >>> from looking over the javadocs
> >>>
> >>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <michael@confluent.io>
> >> wrote:
> >>>>
> >>>> Ali,
> >>>>
> >>>> what you describe is (roughly!) how Kafka Streams implements the
> >> internal
> >>>> state stores to support windowing.
> >>>>
> >>>> Some users have been following a similar approach as you outlined,
> using
> >>>> the Processor API.
> >>>>
> >>>>
> >>>>
> >>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac200@gmail.com>
> >> wrote:
> >>>>>
> >>>>> It would be helpful to know the 'start' and 'end' of the current
> >>>> metadata,
> >>>>> so if an out of order message arrives late, and is being processed
in
> >>>>> foreach(), you'd know which window / bucket it belongs to, and can
> >> handle
> >>>>> it accordingly.
> >>>>>
> >>>>> I'm guessing that's not possible at the moment.
> >>>>>
> >>>>> (My use case is, i receive a stream of messages. Messages need to
be
> >>>> stored
> >>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's
> a
> >>>> gap
> >>>>> of 30 mins or more since the last message (under a key), a new
> >> 'session'
> >>>>> (bucket) should be started, and future messages should belong to
that
> >>>>> 'session', until the next 30+ min gap).
> >>>>>
> >>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <michael@confluent.io
> >
> >>>>> wrote:
> >>>>>
> >>>>>>> Can windows only be used for aggregations, or can they also
be used
> >>>> for
> >>>>>> foreach(),
> >>>>>> and such?
> >>>>>>
> >>>>>> As of today, you can use windows only in aggregations.
> >>>>>>
> >>>>>>> And is it possible to get metadata on the message, such
as whether
> or
> >>>>>> not its
> >>>>>> late, its index/position within the other messages, etc?
> >>>>>>
> >>>>>> If you use the Processor API of Kafka Streams, you can have
access
> to
> >>>> an
> >>>>>> incoming record's topic, partition, offset, etc. via the so-called
> >>>>>> ProcessorContext (which is updated for every new incoming record):
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>>>> apache/kafka/streams/processor/Processor.html
> >>>>>> - You can get/store a reference to the ProcessorContext from
> >>>>>> `Processor#init()`.
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/
> >>>>>> apache/kafka/streams/processor/ProcessorContext.html
> >>>>>> - The context can then be used within `Processor#process()`
when you
> >>>>>> process a new record.  As I said, the context is updated behind
the
> >>>>> scenes
> >>>>>> to match the record that is currently being processed.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Michael
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac200@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Can windows only be used for aggregations, or can they also
be used
> >>>> for
> >>>>>>> foreach(), and such?
> >>>>>>>
> >>>>>>> And is it possible to get metadata on the message, such
as whether
> or
> >>>>> not
> >>>>>>> its late, its index/position within the other messages,
etc?
> >>>>>>>
> >>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <
> michael@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> And since you asked for a pointer, Ali:
> >>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> >>>> michael@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Late-arriving and out-of-order data is only treated
specially for
> >>>>>>>> windowed
> >>>>>>>>> aggregations.
> >>>>>>>>>
> >>>>>>>>> For stateless operations such as `KStream#foreach()`
or
> >>>>>>> `KStream#map()`,
> >>>>>>>>> records are processed in the order they arrive (per
partition).
> >>>>>>>>>
> >>>>>>>>> -Michael
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> >>>> ali.rac200@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>> later when message A arrives it will put
that message back
> >>>> into
> >>>>>>>>>>> the right temporal context and publish an
amended result for
> >>>> the
> >>>>>>>> proper
> >>>>>>>>>>> time/session window as if message B were
consumed in the
> >>>>> timestamp
> >>>>>>>> order
> >>>>>>>>>>> before message A.
> >>>>>>>>>>
> >>>>>>>>>> Does this apply to the aggregation Kafka stream
methods then,
> >>>> and
> >>>>>> not
> >>>>>>> to
> >>>>>>>>>> e.g foreach?
> >>>>>>>>>>
> >>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen
<
> >>>>> hans@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Yes stream processing and CEP are subtlety
different things.
> >>>>>>>>>>>
> >>>>>>>>>>> Kafka Streams helps you write stateful apps
and allows that
> >>>>> state
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>>> preserved on disk (a local State store)
as well as distributed
> >>>>> for
> >>>>>>> HA
> >>>>>>>> or
> >>>>>>>>>>> for parallel partitioned processing (via
Kafka topic
> >>>> partitions
> >>>>>> and
> >>>>>>>>>>> consumer groups) as well as in memory (as
a performance
> >>>>>>> enhancement).
> >>>>>>>>>>>
> >>>>>>>>>>> However a classical CEP engine with a pre-modeled
state
> >>>> machine
> >>>>>> and
> >>>>>>>>>>> pattern matching rules is something different
from stream
> >>>>>>> processing.
> >>>>>>>>>>>
> >>>>>>>>>>> It is on course possible to build a CEP
system on top on Kafka
> >>>>>>> Streams
> >>>>>>>>>> and
> >>>>>>>>>>> get the best of both worlds.
> >>>>>>>>>>>
> >>>>>>>>>>> -hans
> >>>>>>>>>>>
> >>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish
Sasidharan <
> >>>>>>>>>>> sabarish.spk@gmail.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hans
> >>>>>>>>>>>>
> >>>>>>>>>>>> What you state would work for aggregations,
but not for
> >>>> state
> >>>>>>>> machines
> >>>>>>>>>>> and
> >>>>>>>>>>>> CEP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards
> >>>>>>>>>>>> Sab
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans
Jespersen" <
> >>>>> hans@confluent.io
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The only way to make sure A is consumed
first would be to
> >>>>> delay
> >>>>>>> the
> >>>>>>>>>>>>> consumption of message B for at
least 15 minutes which
> >>>> would
> >>>>>> fly
> >>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>> face of the principals of a true
streaming platform so the
> >>>>>> short
> >>>>>>>>>> answer
> >>>>>>>>>>> to
> >>>>>>>>>>>>> your question is "no" because that
would be batch
> >>>> processing
> >>>>>> not
> >>>>>>>>>> stream
> >>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However, Kafka Streams does handle
late arriving data. So
> >>>> if
> >>>>>> you
> >>>>>>>> had
> >>>>>>>>>>> some
> >>>>>>>>>>>>> analytics that computes results
on a time window or a
> >>>> session
> >>>>>>>> window
> >>>>>>>>>>> then
> >>>>>>>>>>>>> Kafka streams will compute on the
stream in real time
> >>>>>> (processing
> >>>>>>>>>>> message
> >>>>>>>>>>>>> B) and then later when message A
arrives it will put that
> >>>>>> message
> >>>>>>>>>> back
> >>>>>>>>>>> into
> >>>>>>>>>>>>> the right temporal context and publish
an amended result
> >>>> for
> >>>>>> the
> >>>>>>>>>> proper
> >>>>>>>>>>>>> time/session window as if message
B were consumed in the
> >>>>>>> timestamp
> >>>>>>>>>> order
> >>>>>>>>>>>>> before message A. The end result
of this flow is that you
> >>>>>>>> eventually
> >>>>>>>>>> get
> >>>>>>>>>>>>> the same results you would get in
a batch processing system
> >>>>> but
> >>>>>>>> with
> >>>>>>>>>> the
> >>>>>>>>>>>>> added benefit of getting intermediary
result at much lower
> >>>>>>> latency.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -hans
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> /**
> >>>>>>>>>>>>> * Hans Jespersen, Principal Systems
Engineer, Confluent
> >>>> Inc.
> >>>>>>>>>>>>> * hans@confluent.io (650)924-2670
> >>>>>>>>>>>>> */
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29
AM, Ali Akhtar <
> >>>>>>>> ali.rac200@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Is it possible to have Kafka
Streams order messages
> >>>>> correctly
> >>>>>> by
> >>>>>>>>>> their
> >>>>>>>>>>>>>> timestamps, even if they arrived
out of order?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> E.g, say Message A with a timestamp
of 5:00 PM and
> >>>> Message B
> >>>>>>> with
> >>>>>>>> a
> >>>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Message B arrives sooner than
Message A, due to network
> >>>>>> issues.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Is it possible to make sure
that, across all consumers of
> >>>>>> Kafka
> >>>>>>>>>> Streams
> >>>>>>>>>>>>>> (even if they are across different
servers, but have the
> >>>>> same
> >>>>>>>>>> consumer
> >>>>>>>>>>>>>> group), Message A is consumed
first, before Message B?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message