kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 22:58:17 GMT
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Yes, that should work.

On 10/6/16 3:54 PM, Ali Akhtar wrote:
> Thanks! That looks perfect.
> 
> Last q.. is there any shortcut to having the json string messages 
> automatically get serialized to their equivalent Java class via
> Jackson, or such?
> 
> Perhaps I can write a Serde<V> impl which takes the java.lang.Class
> of the class to be mapped, and maps it via Jackson?
> 
> On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax
> <matthias@confluent.io> wrote:
> 
> Exactly. You need to set the key using KStream#selectKey() and 
> re-distribute data via #through().
> 
> About timestamps: you can provide a custom TimestampExtractor that 
> returns the JSON embedded TS instead of record TS (as 
> DefaultTimestampExtractor does)
> 
> See 
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-
ex
>
> 
tractor
> 
> 
> -Matthias
> 
> On 10/6/16 2:59 PM, Ali Akhtar wrote:
>>>> Sorry, to be clear:
>>>> 
>>>> - Producers post to topic A - Consumers of topic A receive
>>>> the data, parse it to find the keys, and post the correct key
>>>> + message to Topic B - Topic B is treated as a KTable by 2nd
>>>> consumer layer, and its this layer which does the writes to
>>>> ensure 'last one wins' (Assuming 'last one' can be determined
>>>> using the timestamp in the json of the message)
>>>> 
>>>> On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar
>>>> <ali.rac200@gmail.com> wrote:
>>>> 
>>>>> Thanks for the reply.
>>>>> 
>>>>> Its not possible to provide keys, unfortunately. (Producer
>>>>> is written by a colleague, and said colleague just wants to
>>>>> provide whatever data the API gives, and leave all
>>>>> processing of the data to me).
>>>>> 
>>>>> Perhaps I can use an intermediate kafka topic, and have
>>>>> producers post to that topic w/ whatever data they receive.
>>>>> Then, another consumer can listen to that topic, and use it
>>>>> as a KTable to process data in the order of 'last one
>>>>> winning'.
>>>>> 
>>>>> However, the source of truth on the time of the messages,
>>>>> is embedded in the message itself, its not the Kafka
>>>>> internal timestamp.
>>>>> 
>>>>> The message is a json string, which contains a 'timestamp' 
>>>>> field, containing a string timestamp, and that string
>>>>> timestamp is the source of truth on when this message was
>>>>> generated.
>>>>> 
>>>>> So, is it possible to use a KTable which lets me parse the 
>>>>> message and return the time  which is contained inside the 
>>>>> message, and use THAT time for sorting the messages?
>>>>> 
>>>>> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax 
>>>>> <matthias@confluent.io> wrote:
>>>>> 
>>>> It is not global in this sense.
>>>> 
>>>> Thus, you need to ensure that records updating the same
>>>> product, go to the same instance. You can ensure this, by
>>>> given all records of the same product the same key and
>>>> "groupByKey" before processing the data.
>>>> 
>>>> -Matthias
>>>> 
>>>> On 10/6/16 10:55 AM, Ali Akhtar wrote:
>>>>>>>> Thank you, State Store seems promising. But, is it 
>>>>>>>> distributed, or limited to the particular instance of
>>>>>>>> my application?
>>>>>>>> 
>>>>>>>> I.e if there are 3 messages, setting product 1's
>>>>>>>> price to $1, $3, and $5, and all 3 of them go to a
>>>>>>>> different instance of my application, will they be
>>>>>>>> able to correctly identify the latest received
>>>>>>>> message using State Store?
>>>>>>>> 
>>>>>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax 
>>>>>>>> <matthias@confluent.io> wrote:
>>>>>>>> 
>>>>>>>> What do you mean by "message keys are random" -- do
>>>>>>>> you effectively have no keys and want all messages to
>>>>>>>> be processed as if they all have the same key?
>>>>>>>> 
>>>>>>>> To access record TS in general, you need to use
>>>>>>>> Processor API. The given ProcessorContext object
>>>>>>>> given by Processor#init() always return the timestamp
>>>>>>>> of the currently processed on #timestamp().
>>>>>>>> 
>>>>>>>> Thus, you can attach a state store to your processor
>>>>>>>> and compare the timestamps of the current record with
>>>>>>>> the timestamp of the one in your store.
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote:
>>>>>>>>>>> Heya,
>>>>>>>>>>> 
>>>>>>>>>>> I have some Kafka producers, which are
>>>>>>>>>>> listening to webhook events, and for each
>>>>>>>>>>> webhook event, they post its payload to a Kafka
>>>>>>>>>>> topic.
>>>>>>>>>>> 
>>>>>>>>>>> Each payload contains a timestamp from the
>>>>>>>>>>> webhook source.
>>>>>>>>>>> 
>>>>>>>>>>> This timestamp is the source of truth about
>>>>>>>>>>> which events happened first, which happened
>>>>>>>>>>> last, etc.
>>>>>>>>>>> 
>>>>>>>>>>> I need to ensure that the last arrival of a 
>>>>>>>>>>> particular type of message wins.
>>>>>>>>>>> 
>>>>>>>>>>> E.g, if there are 5 messages, saying the price
>>>>>>>>>>> of a product with id 1, was set to $1, then $3,
>>>>>>>>>>> then something else, etc, before finally being
>>>>>>>>>>> set to $10, then I need to make sure that the
>>>>>>>>>>> final price for that product is $10.
>>>>>>>>>>> 
>>>>>>>>>>> These messages can be out of order, and I need
>>>>>>>>>>> to determine the latest arrival based on the
>>>>>>>>>>> timestamp from the webhook source. (Its atm in
>>>>>>>>>>> a string format which can be parsed)
>>>>>>>>>>> 
>>>>>>>>>>> Since KTable looks like it uses message keys
>>>>>>>>>>> to determine what happens - and in this case,
>>>>>>>>>>> the message keys are random, and the timestamp
>>>>>>>>>>> contained in the value of the message is what
>>>>>>>>>>> determines the order of the events - any
>>>>>>>>>>> pointers on what the best way to do this is?
>>>>>>>>>>> 
>>>>>>>>>>> I'm using kafka streaming, latest version,
>>>>>>>>>>> Java.
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9tcJAAoJECnhiMLycopPIQMQAI/alRCktJdQ9+FQIC81xoCq
DnLNTszPIdC7RzA/zqN1lvE1WtkRL8IrqqW8VsqnuermppF1KFWxGL8/x/gwC6qY
iCzhL4YU6nRWx0E9VGtfp6A9/aFsRIT5KHXHSugjfqBpcw5Wl1skKt5dsh8Hl2vT
3g5oNeCkxl/8Yh/0yJPOlpT9ie7G+WfS+fEUyYKbd9Em2He6DMixRZ69MbFaq0Wk
F0sPJcbOge7JRcPNThCud62N1j4b6sAjRZkorxrNcoU9tkAC9FNyTpCQFF+AoooI
XMsU111tsrKIynwNHWwGhm1L6GivJm1NJosM68g3/M/PB2zb8Sc1erokk6C3DdeB
CJV34brqUfpG4C8YE2W2s0ROVN38qXm7/+RLI8+qKFrrUHBgPiP6msPa5kPPisLE
sAZx0lktRQRn6k3SdtnfjETWIzXYj8cdFO8ALPmSs/81o9db8GkmaQy0e670tAcf
i/S8f8wvDyS8G/74kILEYZ1kb0WW2+Yb68C70o7hlHR78yAbouiJuK2J1iCYPluI
4aBza16f0RKdwvGbIq4ZU50bwAvEov1hukjHkuQhHWdpiQErzZufRXjvkGUQwpHw
2LquB3iWsRjROt//bjAm6x63uPt/mu/eRV0LqyD/pSkvr7EM3/WuLEprYRTbkJrc
auKVSXbzM4ChDoG7xMqi
=JF24
-----END PGP SIGNATURE-----

Mime
View raw message