kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Handling out of order messages without KTables
Date Thu, 06 Oct 2016 17:48:48 GMT
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

What do you mean by "message keys are random" -- do you effectively
have no keys and want all messages to be processed as if they all have
the same key?

To access record TS in general, you need to use Processor API. The
given ProcessorContext object given by Processor#init() always return
the timestamp of the currently processed on #timestamp().

Thus, you can attach a state store to your processor and compare the
timestamps of the current record with the timestamp of the one in your
store.

- -Matthias

On 10/6/16 8:52 AM, Ali Akhtar wrote:
> Heya,
> 
> I have some Kafka producers, which are listening to webhook events,
> and for each webhook event, they post its payload to a Kafka
> topic.
> 
> Each payload contains a timestamp from the webhook source.
> 
> This timestamp is the source of truth about which events happened
> first, which happened last, etc.
> 
> I need to ensure that the last arrival of a particular type of
> message wins.
> 
> E.g, if there are 5 messages, saying the price of a product with id
> 1, was set to $1, then $3, then something else, etc, before finally
> being set to $10, then I need to make sure that the final price for
> that product is $10.
> 
> These messages can be out of order, and I need to determine the
> latest arrival based on the timestamp from the webhook source. (Its
> atm in a string format which can be parsed)
> 
> Since KTable looks like it uses message keys to determine what
> happens - and in this case, the message keys are random, and the
> timestamp contained in the value of the message is what determines
> the order of the events - any pointers on what the best way to do
> this is?
> 
> I'm using kafka streaming, latest version, Java.
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX9o6AAAoJECnhiMLycopPIjEP/At7i3ttitxDkDUKeaaemZbD
BF6XX3GMt1TDM6h+PwryjssZmHpJgmdFmNOZe1HkZIrJKmwDP7lxtA5OoKCXeJxm
2O+vbaTSYi5W5isCVYRjNnGWjxjrdoD+HQoPElUM8byfqlIBBrmsE0NZ3P7WEpKI
vnpOJJLd/mNfrBgXuXPClvOXl45uRyHyPBoAZ6sZEoNn9QxXXKGLuelpElpF2sGz
lMFlELW90SUGjt+dVF61WtcdNj9Bn0pzjR5PYGq29wwOPiZqc8P7anithZi0k4Eb
VBupjQMzivrvbdZ9ebnZljsf4G/6omTCFhf5y4ph6y8453Qr/4E/b9LKkM3gI79M
NzjyCyPCSAjtO+TTIyQdn6fUvUDa7qeEC+CiqtlILdSeWxP5SajJKIHxbUpjLWEQ
FByC21KKI20Lh81Pd4NqCddiUpOZae4ZTpv7Pa30YjKB3ofOjeHnIiFuic9lQ7Ro
SJ8GuDWI1hxQ9T3h0uz3eqR1mpZvpIWlVq3bccT7d9olx39VW8c5/GYiZtibPZjb
g4r459CSfcar5olR8TSbxivXxglj7vjn74DR9zQAorc3wsmcXigiYac8yhpwnCzQ
os1ChrT7XDtTbaPNnE9JZeGFX920/6pJdSbpBf8OmFiT7arVEujyQgwLIKQaWU4j
0EhFm3pJKOsXvWS5sqEc
=f+6C
-----END PGP SIGNATURE-----

Mime
View raw message