kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 22:58:17 GMT
Hash: SHA512

Yes, that should work.

On 10/6/16 3:54 PM, Ali Akhtar wrote:
> Thanks! That looks perfect.
> Last q.. is there any shortcut to having the json string messages 
> automatically get serialized to their equivalent Java class via
> Jackson, or such?
> Perhaps I can write a Serde<V> impl which takes the java.lang.Class
> of the class to be mapped, and maps it via Jackson?
> On Fri, Oct 7, 2016 at 3:39 AM, Matthias J. Sax
> <matthias@confluent.io> wrote:
> Exactly. You need to set the key using KStream#selectKey() and 
> re-distribute data via #through().
> About timestamps: you can provide a custom TimestampExtractor that 
> returns the JSON embedded TS instead of record TS (as 
> DefaultTimestampExtractor does)
> See 
> http://docs.confluent.io/3.0.1/streams/developer-guide.html#timestamp-
> -Matthias
> On 10/6/16 2:59 PM, Ali Akhtar wrote:
>>>> Sorry, to be clear:
>>>> - Producers post to topic A - Consumers of topic A receive
>>>> the data, parse it to find the keys, and post the correct key
>>>> + message to Topic B - Topic B is treated as a KTable by 2nd
>>>> consumer layer, and its this layer which does the writes to
>>>> ensure 'last one wins' (Assuming 'last one' can be determined
>>>> using the timestamp in the json of the message)
>>>> On Fri, Oct 7, 2016 at 2:54 AM, Ali Akhtar
>>>> <ali.rac200@gmail.com> wrote:
>>>>> Thanks for the reply.
>>>>> Its not possible to provide keys, unfortunately. (Producer
>>>>> is written by a colleague, and said colleague just wants to
>>>>> provide whatever data the API gives, and leave all
>>>>> processing of the data to me).
>>>>> Perhaps I can use an intermediate kafka topic, and have
>>>>> producers post to that topic w/ whatever data they receive.
>>>>> Then, another consumer can listen to that topic, and use it
>>>>> as a KTable to process data in the order of 'last one
>>>>> winning'.
>>>>> However, the source of truth on the time of the messages,
>>>>> is embedded in the message itself, its not the Kafka
>>>>> internal timestamp.
>>>>> The message is a json string, which contains a 'timestamp' 
>>>>> field, containing a string timestamp, and that string
>>>>> timestamp is the source of truth on when this message was
>>>>> generated.
>>>>> So, is it possible to use a KTable which lets me parse the 
>>>>> message and return the time  which is contained inside the 
>>>>> message, and use THAT time for sorting the messages?
>>>>> On Fri, Oct 7, 2016 at 2:33 AM, Matthias J. Sax 
>>>>> <matthias@confluent.io> wrote:
>>>> It is not global in this sense.
>>>> Thus, you need to ensure that records updating the same
>>>> product, go to the same instance. You can ensure this, by
>>>> given all records of the same product the same key and
>>>> "groupByKey" before processing the data.
>>>> -Matthias
>>>> On 10/6/16 10:55 AM, Ali Akhtar wrote:
>>>>>>>> Thank you, State Store seems promising. But, is it 
>>>>>>>> distributed, or limited to the particular instance of
>>>>>>>> my application?
>>>>>>>> I.e if there are 3 messages, setting product 1's
>>>>>>>> price to $1, $3, and $5, and all 3 of them go to a
>>>>>>>> different instance of my application, will they be
>>>>>>>> able to correctly identify the latest received
>>>>>>>> message using State Store?
>>>>>>>> On Thu, Oct 6, 2016 at 10:48 PM, Matthias J. Sax 
>>>>>>>> <matthias@confluent.io> wrote:
>>>>>>>> What do you mean by "message keys are random" -- do
>>>>>>>> you effectively have no keys and want all messages to
>>>>>>>> be processed as if they all have the same key?
>>>>>>>> To access record TS in general, you need to use
>>>>>>>> Processor API. The given ProcessorContext object
>>>>>>>> given by Processor#init() always return the timestamp
>>>>>>>> of the currently processed on #timestamp().
>>>>>>>> Thus, you can attach a state store to your processor
>>>>>>>> and compare the timestamps of the current record with
>>>>>>>> the timestamp of the one in your store.
>>>>>>>> -Matthias
>>>>>>>> On 10/6/16 8:52 AM, Ali Akhtar wrote:
>>>>>>>>>>> Heya,
>>>>>>>>>>> I have some Kafka producers, which are
>>>>>>>>>>> listening to webhook events, and for each
>>>>>>>>>>> webhook event, they post its payload to a Kafka
>>>>>>>>>>> topic.
>>>>>>>>>>> Each payload contains a timestamp from the
>>>>>>>>>>> webhook source.
>>>>>>>>>>> This timestamp is the source of truth about
>>>>>>>>>>> which events happened first, which happened
>>>>>>>>>>> last, etc.
>>>>>>>>>>> I need to ensure that the last arrival of a 
>>>>>>>>>>> particular type of message wins.
>>>>>>>>>>> E.g, if there are 5 messages, saying the price
>>>>>>>>>>> of a product with id 1, was set to $1, then $3,
>>>>>>>>>>> then something else, etc, before finally being
>>>>>>>>>>> set to $10, then I need to make sure that the
>>>>>>>>>>> final price for that product is $10.
>>>>>>>>>>> These messages can be out of order, and I need
>>>>>>>>>>> to determine the latest arrival based on the
>>>>>>>>>>> timestamp from the webhook source. (Its atm in
>>>>>>>>>>> a string format which can be parsed)
>>>>>>>>>>> Since KTable looks like it uses message keys
>>>>>>>>>>> to determine what happens - and in this case,
>>>>>>>>>>> the message keys are random, and the timestamp
>>>>>>>>>>> contained in the value of the message is what
>>>>>>>>>>> determines the order of the events - any
>>>>>>>>>>> pointers on what the best way to do this is?
>>>>>>>>>>> I'm using kafka streaming, latest version,
>>>>>>>>>>> Java.
Comment: GPGTools - https://gpgtools.org


View raw message