kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 21:54:13 GMT
Thanks for the reply.

Its not possible to provide keys, unfortunately. (Producer is written by a
colleague, and said colleague just wants to provide whatever data the API
gives, and leave all processing of the data to me).

Perhaps I can use an intermediate kafka topic, and have producers post to
that topic w/ whatever data they receive. Then, another consumer can listen
to that topic, and use it as a KTable to process data in the order of 'last
one winning'.

However, the source of truth on the time of the messages, is embedded in
the message itself, its not the Kafka internal timestamp.

The message is a json string, which contains a 'timestamp' field,
containing a string timestamp, and that string timestamp is the source of
truth on when this message was generated.

So, is it possible to use a KTable which lets me parse the message and
return the time  which is contained inside the message, and use THAT time
for sorting the messages?

On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> It is not global in this sense.
>
> Thus, you need to ensure that records updating the same product, go to
> the same instance. You can ensure this, by given all records of the
> same product the same key and "groupByKey" before processing the data.
>
> - -Matthias
>
> On 10/6/16 10:55 AM, Ali Akhtar wrote:
> > Thank you, State Store seems promising. But, is it distributed, or
> > limited to the particular instance of my application?
> >
> > I.e if there are 3 messages, setting product 1's price to $1, $3,
> > and $5, and all 3 of them go to a different instance of my
> > application, will they be able to correctly identify the latest
> > received message using State Store?
> >
> > On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
> > <matthias@confluent.io> wrote:
> >
> > What do you mean by "message keys are random" -- do you
> > effectively have no keys and want all messages to be processed as
> > if they all have the same key?
> >
> > To access record TS in general, you need to use Processor API. The
> > given ProcessorContext object given by Processor#init() always
> > return the timestamp of the currently processed on #timestamp().
> >
> > Thus, you can attach a state store to your processor and compare
> > the timestamps of the current record with the timestamp of the one
> > in your store.
> >
> > -Matthias
> >
> > On 10/6/16 8:52 AM, Ali Akhtar wrote:
> >>>> Heya,
> >>>>
> >>>> I have some Kafka producers, which are listening to webhook
> >>>> events, and for each webhook event, they post its payload to
> >>>> a Kafka topic.
> >>>>
> >>>> Each payload contains a timestamp from the webhook source.
> >>>>
> >>>> This timestamp is the source of truth about which events
> >>>> happened first, which happened last, etc.
> >>>>
> >>>> I need to ensure that the last arrival of a particular type
> >>>> of message wins.
> >>>>
> >>>> E.g, if there are 5 messages, saying the price of a product
> >>>> with id 1, was set to $1, then $3, then something else, etc,
> >>>> before finally being set to $10, then I need to make sure
> >>>> that the final price for that product is $10.
> >>>>
> >>>> These messages can be out of order, and I need to determine
> >>>> the latest arrival based on the timestamp from the webhook
> >>>> source. (Its atm in a string format which can be parsed)
> >>>>
> >>>> Since KTable looks like it uses message keys to determine
> >>>> what happens - and in this case, the message keys are random,
> >>>> and the timestamp contained in the value of the message is
> >>>> what determines the order of the events - any pointers on
> >>>> what the best way to do this is?
> >>>>
> >>>> I'm using kafka streaming, latest version, Java.
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9sM4AAoJECnhiMLycopPtDQQAIRZ5X/w4u9tdeBORgLlvvRJ
> VQpdov1/xhY1VDzNLbqxnW0HlBlPcWl0UJ2gHd9vWbHyGlm0D/amZbaAr+n54xSu
> NTZ9u4zLDD6bRtNpnoFX8m2lsxb4AAzpzbCQqCCeJRPQ9D5eJCkV8i+mcRo6CA8V
> JEkb+OZkEFwvlFFr5jUuLfrnFEI4pV+gRguFrmRdbXKAbqGgV8hMk/hS4aNAjiz0
> hZ3uHW3JAXhn+kgCqykHlsVCHA/yQUU4Gm/5mNuYvrsTYW1UObuBJ/O0SPSfPcs9
> XnJbV0T6xeNZLbCnGer+IykdLItdjs3slfAxINtyJkLXv7A6kOkE0Odb05gXYiy9
> b+/vLiEiVnXU/eO70V5kcs3NCCovEu/+vFcsEEVg/UnCXl0K96ywVm44ljXOe80O
> 4ESuagg6oNO50uLVrOydYGLlgVYjBL/LM/ld4DnWlt37g6r50FTclEkXZExlnWtz
> pWjOsn/bbsx3Nybcvc4blga/7I5C25yZ44yQCcGtEg5JFqn4+2cUQGy6mOmRUjMV
> wghdgHOc+A79sSmBx9BuOB4Lt15AYfvMP/NUGjESSKvIEoAbBavQKBFNYcbqMdfX
> UgYR7Icx5KmdB9ufB2dFMkaD163b1tVUT1nAzyyWcPrCjGQ9n7CDEQA4b9eUR5bH
> cydxl9H+2QRo1jh8UOlE
> =Mwrf
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message