kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 22:54:27 GMT
Thanks! That looks perfect.

Last q.. is there any shortcut to having the json string messages
automatically get serialized to their equivalent Java class via Jackson, or
such?

Perhaps I can write a Serde<V> impl which takes the java.lang.Class of the
class to be mapped, and maps it via Jackson?

On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Exactly. You need to set the key using KStream#selectKey() and
> re-distribute data via #through().
>
> About timestamps: you can provide a custom TimestampExtractor that
> returns the JSON embedded TS instead of record TS (as
> DefaultTimestampExtractor does)
>
> See
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-ex
> tractor
>
>
> - -Matthias
>
> On 10/6/16 2:59 PM, Ali Akhtar wrote:
> > Sorry, to be clear:
> >
> > - Producers post to topic A - Consumers of topic A receive the
> > data, parse it to find the keys, and post the correct key + message
> > to Topic B - Topic B is treated as a KTable by 2nd consumer layer,
> > and its this layer which does the writes to ensure 'last one wins'
> > (Assuming 'last one' can be determined using the timestamp in the
> > json of the message)
> >
> > On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac200@gmail.com>
> > wrote:
> >
> >> Thanks for the reply.
> >>
> >> Its not possible to provide keys, unfortunately. (Producer is
> >> written by a colleague, and said colleague just wants to provide
> >> whatever data the API gives, and leave all processing of the data
> >> to me).
> >>
> >> Perhaps I can use an intermediate kafka topic, and have producers
> >> post to that topic w/ whatever data they receive. Then, another
> >> consumer can listen to that topic, and use it as a KTable to
> >> process data in the order of 'last one winning'.
> >>
> >> However, the source of truth on the time of the messages, is
> >> embedded in the message itself, its not the Kafka internal
> >> timestamp.
> >>
> >> The message is a json string, which contains a 'timestamp'
> >> field, containing a string timestamp, and that string timestamp
> >> is the source of truth on when this message was generated.
> >>
> >> So, is it possible to use a KTable which lets me parse the
> >> message and return the time  which is contained inside the
> >> message, and use THAT time for sorting the messages?
> >>
> >> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax
> >> <matthias@confluent.io> wrote:
> >>
> > It is not global in this sense.
> >
> > Thus, you need to ensure that records updating the same product, go
> > to the same instance. You can ensure this, by given all records of
> > the same product the same key and "groupByKey" before processing
> > the data.
> >
> > -Matthias
> >
> > On 10/6/16 10:55 AM, Ali Akhtar wrote:
> >>>>> Thank you, State Store seems promising. But, is it
> >>>>> distributed, or limited to the particular instance of my
> >>>>> application?
> >>>>>
> >>>>> I.e if there are 3 messages, setting product 1's price to
> >>>>> $1, $3, and $5, and all 3 of them go to a different
> >>>>> instance of my application, will they be able to correctly
> >>>>> identify the latest received message using State Store?
> >>>>>
> >>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
> >>>>> <matthias@confluent.io> wrote:
> >>>>>
> >>>>> What do you mean by "message keys are random" -- do you
> >>>>> effectively have no keys and want all messages to be
> >>>>> processed as if they all have the same key?
> >>>>>
> >>>>> To access record TS in general, you need to use Processor
> >>>>> API. The given ProcessorContext object given by
> >>>>> Processor#init() always return the timestamp of the
> >>>>> currently processed on #timestamp().
> >>>>>
> >>>>> Thus, you can attach a state store to your processor and
> >>>>> compare the timestamps of the current record with the
> >>>>> timestamp of the one in your store.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote:
> >>>>>>>> Heya,
> >>>>>>>>
> >>>>>>>> I have some Kafka producers, which are listening to
> >>>>>>>> webhook events, and for each webhook event, they post
> >>>>>>>> its payload to a Kafka topic.
> >>>>>>>>
> >>>>>>>> Each payload contains a timestamp from the webhook
> >>>>>>>> source.
> >>>>>>>>
> >>>>>>>> This timestamp is the source of truth about which
> >>>>>>>> events happened first, which happened last, etc.
> >>>>>>>>
> >>>>>>>> I need to ensure that the last arrival of a
> >>>>>>>> particular type of message wins.
> >>>>>>>>
> >>>>>>>> E.g, if there are 5 messages, saying the price of a
> >>>>>>>> product with id 1, was set to $1, then $3, then
> >>>>>>>> something else, etc, before finally being set to $10,
> >>>>>>>> then I need to make sure that the final price for
> >>>>>>>> that product is $10.
> >>>>>>>>
> >>>>>>>> These messages can be out of order, and I need to
> >>>>>>>> determine the latest arrival based on the timestamp
> >>>>>>>> from the webhook source. (Its atm in a string format
> >>>>>>>> which can be parsed)
> >>>>>>>>
> >>>>>>>> Since KTable looks like it uses message keys to
> >>>>>>>> determine what happens - and in this case, the
> >>>>>>>> message keys are random, and the timestamp contained
> >>>>>>>> in the value of the message is what determines the
> >>>>>>>> order of the events - any pointers on what the best
> >>>>>>>> way to do this is?
> >>>>>>>>
> >>>>>>>> I'm using kafka streaming, latest version, Java.
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX9tKmAAoJECnhiMLycopPQ50P/2wibksuel/cAu0EY1Des9sg
> AQPxobWD2UITOnzuKeRWZdSOYOLTyvKSy15uFcDmWDDnNH3bQcXO2ElND/6KW+17
> wz6YS/btBx/LOhvYRKv8gDz5vAT6EYFevF5royBuiSCGhTZBO829m3b3uSqyAjoT
> 81lp9nQAjlswveS7hlmD91abmuM4/bbs2f0+CAIFCPiszigkaSR2WOLHzsJs3zfl
> n67jsQ/HPqMnksp2slLlvzfReYFkk+RPsNImAkXAp65tcp0XM9n/RKbrh5LpFcGY
> oBRx3C5sJT7xi6jr2EJau9OgsrBD+h+CSH5ipgtbZ4f8X5u2NkfU9TL2Lb+zol/e
> x2U2fD6dTlCh7w+dLFSqXnMUixKKTYI9xP/gS7ASeRH/f/lw0R0srNTUB5JVtkTY
> M5KkAcvRkdmrlnl2uOqLZlhwc8wbUwVL6SSgU+qPuk8nHIri5WiMiitrE0a6hx3F
> HT4GWtDJwCdOkNOcA8xZvWp3VHoXHf56Qm/U3Rq2RBE+aWINEIKRrxyjqjB4YvrO
> 8xWU9YYutxzMNwgrniThWnll//dE87W6nwAOb5By72OIa36yspBzpLkQsqa9orbP
> HxebhQCTdU95fCVhF5/IpNhqNLMHhRwb0UtWkEbiZm8GIom8qeN5TPQitjMujOwu
> DZfSLSO+fQSrSRQBMJ4z
> =R9KG
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message