kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 21:59:14 GMT
Sorry, to be clear:

- Producers post to topic A
- Consumers of topic A receive the data, parse it to find the keys, and
post the correct key + message to Topic B
- Topic B is treated as a KTable by 2nd consumer layer, and its this layer
which does the writes to ensure 'last one wins' (Assuming 'last one' can be
determined using the timestamp in the json of the message)

On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar <ali.rac200@gmail.com> wrote:

> Thanks for the reply.
>
> Its not possible to provide keys, unfortunately. (Producer is written by a
> colleague, and said colleague just wants to provide whatever data the API
> gives, and leave all processing of the data to me).
>
> Perhaps I can use an intermediate kafka topic, and have producers post to
> that topic w/ whatever data they receive. Then, another consumer can listen
> to that topic, and use it as a KTable to process data in the order of 'last
> one winning'.
>
> However, the source of truth on the time of the messages, is embedded in
> the message itself, its not the Kafka internal timestamp.
>
> The message is a json string, which contains a 'timestamp' field,
> containing a string timestamp, and that string timestamp is the source of
> truth on when this message was generated.
>
> So, is it possible to use a KTable which lets me parse the message and
> return the time  which is contained inside the message, and use THAT time
> for sorting the messages?
>
> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> It is not global in this sense.
>>
>> Thus, you need to ensure that records updating the same product, go to
>> the same instance. You can ensure this, by given all records of the
>> same product the same key and "groupByKey" before processing the data.
>>
>> - -Matthias
>>
>> On 10/6/16 10:55 AM, Ali Akhtar wrote:
>> > Thank you, State Store seems promising. But, is it distributed, or
>> > limited to the particular instance of my application?
>> >
>> > I.e if there are 3 messages, setting product 1's price to $1, $3,
>> > and $5, and all 3 of them go to a different instance of my
>> > application, will they be able to correctly identify the latest
>> > received message using State Store?
>> >
>> > On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax
>> > <matthias@confluent.io> wrote:
>> >
>> > What do you mean by "message keys are random" -- do you
>> > effectively have no keys and want all messages to be processed as
>> > if they all have the same key?
>> >
>> > To access record TS in general, you need to use Processor API. The
>> > given ProcessorContext object given by Processor#init() always
>> > return the timestamp of the currently processed on #timestamp().
>> >
>> > Thus, you can attach a state store to your processor and compare
>> > the timestamps of the current record with the timestamp of the one
>> > in your store.
>> >
>> > -Matthias
>> >
>> > On 10/6/16 8:52 AM, Ali Akhtar wrote:
>> >>>> Heya,
>> >>>>
>> >>>> I have some Kafka producers, which are listening to webhook
>> >>>> events, and for each webhook event, they post its payload to
>> >>>> a Kafka topic.
>> >>>>
>> >>>> Each payload contains a timestamp from the webhook source.
>> >>>>
>> >>>> This timestamp is the source of truth about which events
>> >>>> happened first, which happened last, etc.
>> >>>>
>> >>>> I need to ensure that the last arrival of a particular type
>> >>>> of message wins.
>> >>>>
>> >>>> E.g, if there are 5 messages, saying the price of a product
>> >>>> with id 1, was set to $1, then $3, then something else, etc,
>> >>>> before finally being set to $10, then I need to make sure
>> >>>> that the final price for that product is $10.
>> >>>>
>> >>>> These messages can be out of order, and I need to determine
>> >>>> the latest arrival based on the timestamp from the webhook
>> >>>> source. (Its atm in a string format which can be parsed)
>> >>>>
>> >>>> Since KTable looks like it uses message keys to determine
>> >>>> what happens - and in this case, the message keys are random,
>> >>>> and the timestamp contained in the value of the message is
>> >>>> what determines the order of the events - any pointers on
>> >>>> what the best way to do this is?
>> >>>>
>> >>>> I'm using kafka streaming, latest version, Java.
>> >>>>
>> >>
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJX9sM4AAoJECnhiMLycopPtDQQAIRZ5X/w4u9tdeBORgLlvvRJ
>> VQpdov1/xhY1VDzNLbqxnW0HlBlPcWl0UJ2gHd9vWbHyGlm0D/amZbaAr+n54xSu
>> NTZ9u4zLDD6bRtNpnoFX8m2lsxb4AAzpzbCQqCCeJRPQ9D5eJCkV8i+mcRo6CA8V
>> JEkb+OZkEFwvlFFr5jUuLfrnFEI4pV+gRguFrmRdbXKAbqGgV8hMk/hS4aNAjiz0
>> hZ3uHW3JAXhn+kgCqykHlsVCHA/yQUU4Gm/5mNuYvrsTYW1UObuBJ/O0SPSfPcs9
>> XnJbV0T6xeNZLbCnGer+IykdLItdjs3slfAxINtyJkLXv7A6kOkE0Odb05gXYiy9
>> b+/vLiEiVnXU/eO70V5kcs3NCCovEu/+vFcsEEVg/UnCXl0K96ywVm44ljXOe80O
>> 4ESuagg6oNO50uLVrOydYGLlgVYjBL/LM/ld4DnWlt37g6r50FTclEkXZExlnWtz
>> pWjOsn/bbsx3Nybcvc4blga/7I5C25yZ44yQCcGtEg5JFqn4+2cUQGy6mOmRUjMV
>> wghdgHOc+A79sSmBx9BuOB4Lt15AYfvMP/NUGjESSKvIEoAbBavQKBFNYcbqMdfX
>> UgYR7Icx5KmdB9ufB2dFMkaD163b1tVUT1nAzyyWcPrCjGQ9n7CDEQA4b9eUR5bH
>> cydxl9H+2QRo1jh8UOlE
>> =Mwrf
>> -----END PGP SIGNATURE-----
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message