kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Understanding out of order message processing w/ Streaming
Date Wed, 12 Oct 2016 20:22:36 GMT
Thanks Matthias.

So, if I'm understanding this right, Kafka will not discard which messages
which arrive out of order.

What it will do is show messages in the order in which they arrive.

But if they arrive out of order, I have to detect / process that myself in
the processor logic.

Is that correct?

Thanks.

On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Last question first: A KTable is basically in finite window over the
> whole stream providing a single result (that gets updated when new
> data arrives). If you use windows, you cut the overall stream into
> finite subsets and get a result per window. Thus, I guess you do not
> need windows (if I understood you use case correctly).
>
> However, current state of Kafka Streams DSL, you will not be able to
> use KTable (directly -- see suggestion to fix this below) because is
> does (currently) not allow to access the timestamp of the current
> record (thus, you can not know if a record is late or not). You will
> need to use Processor API which allows you to access the current
> records timestamp via the Context object given in init()
>
> Your reasoning about partitions and Streams instances is correct.
> However, the following two are not
>
> > - Because I'm using a KTable, the timestamp of the messages is
> > extracted, and I'm not shown the older bid because I've already
> > processed the later bid. The older bid is ignored.
>
> and
>
> > - Because of this, the replica already knows which timestamps it
> > has processed, and is able to ignore the older messages.
>
> Late arriving records are not dropped but processes regularly. Thus,
> your KTable aggregate function will be called for the late arriving
> record, too (but as described about, you have currently no way to know
> it is a later record).
>
>
> Last but not least, you last statement is a valid concern:
>
> > Also, what will happen if bid 2 arrived and got processed, and then
> > the particular replica crashed, and was restarted. The restarted
> > replica won't have any memory of which timestamps it has previously
> > processed.
> >
> > So if bid 2 got processed, replica crashed and restarted, and then
> > bid 1 arrived, what would happen in that case?
>
> In order to make this work, you would need to store the timestamp in
> you store next to the actual data. Thus, you can compare the timestamp
> of the latest result (safely stored in operator state) with the
> timestamp of the current record.
>
> Does this makes sense?
>
> To fix you issue, you could add a .transformValue() before you KTable,
> which allows you to access the timestamp of a record. If you add this
> timestamp to you value and pass it to KTable afterwards, you can
> access it and it gets also store reliably.
>
> <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
> timestamp} => aggregate
>
> Hope this helps.
>
> - -Matthias
>
>
> On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > P.S, does my scenario require using windows, or can it be achieved
> > using just KTable?
> >
> > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac200@gmail.com>
> > wrote:
> >
> >> Heya,
> >>
> >> Say I'm building a live auction site, with different products.
> >> Different users will bid on different products. And each time
> >> they do, I want to update the product's price, so it should
> >> always have the latest price in place.
> >>
> >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> >> the same product 100 ms later.
> >>
> >> The second bid arrives first and the price is updated to $5. Then
> >> the first bid arrives. I want the price to not be updated in this
> >> case, as this bid is older than the one I've already processed.
> >>
> >> Here's my understanding of how I can achieve this with Kafka
> >> Streaming - is my understanding correct?
> >>
> >> - I have a topic for receiving bids. The topic has N partitions,
> >> and I have N replicas of my application which hooks up w/ Kafka
> >> Streaming, up and running.
> >>
> >> - I assume each replica of my app will listen to a different
> >> partition of the topic.
> >>
> >> - A user makes a bid on product A.
> >>
> >> - This is pushed to the topic with the key bid_a
> >>
> >> - Another user makes a bid. This is also pushed with the same key
> >> (bid_a)
> >>
> >> - The 2nd bid arrives first, and gets processed. Then the first
> >> (older) bid arrives.
> >>
> >> - Because I'm using a KTable, the timestamp of the messages is
> >> extracted, and I'm not shown the older bid because I've already
> >> processed the later bid. The older bid is ignored.
> >>
> >> - All bids on product A go to the same topic partition, and hence
> >> the same replica of my app, because they all have the key bid_a.
> >>
> >> - Because of this, the replica already knows which timestamps it
> >> has processed, and is able to ignore the older messages.
> >>
> >> Is the above understandning correct?
> >>
> >> Also, what will happen if bid 2 arrived and got processed, and
> >> then the particular replica crashed, and was restarted. The
> >> restarted replica won't have any memory of which timestamps it
> >> has previously processed.
> >>
> >> So if bid 2 got processed, replica crashed and restarted, and
> >> then bid 1 arrived, what would happen in that case?
> >>
> >> Thanks.
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
> 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
> 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
> AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
> sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
> S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
> i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
> lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
> fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
> 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
> 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
> wLVWRn2J0n9mKSiOnHki
> =oJIL
> -----END PGP SIGNATURE-----
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message