kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Understanding out of order message processing w/ Streaming
Date Thu, 13 Oct 2016 15:47:09 GMT
Makes sense. Thanks

On 13 Oct 2016 12:42 pm, "Michael Noll" <michael@confluent.io> wrote:

> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
>
> Yes -- if your processing logic depends on the specific ordering of
> messages (which is the case for you), then you must manually implement this
> ordering-specific logic at the moment.
>
> Other use cases may not need to do that and "just work" even with
> out-of-order data.  If, for example, you are counting objects or are
> computing the sum of numbers, then you do not need to anything special.
>
>
>
>
>
> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:
>
> > Thanks Matthias.
> >
> > So, if I'm understanding this right, Kafka will not discard which
> messages
> > which arrive out of order.
> >
> > What it will do is show messages in the order in which they arrive.
> >
> > But if they arrive out of order, I have to detect / process that myself
> in
> > the processor logic.
> >
> > Is that correct?
> >
> > Thanks.
> >
> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA512
> > >
> > > Last question first: A KTable is basically in finite window over the
> > > whole stream providing a single result (that gets updated when new
> > > data arrives). If you use windows, you cut the overall stream into
> > > finite subsets and get a result per window. Thus, I guess you do not
> > > need windows (if I understood you use case correctly).
> > >
> > > However, current state of Kafka Streams DSL, you will not be able to
> > > use KTable (directly -- see suggestion to fix this below) because is
> > > does (currently) not allow to access the timestamp of the current
> > > record (thus, you can not know if a record is late or not). You will
> > > need to use Processor API which allows you to access the current
> > > records timestamp via the Context object given in init()
> > >
> > > Your reasoning about partitions and Streams instances is correct.
> > > However, the following two are not
> > >
> > > > - Because I'm using a KTable, the timestamp of the messages is
> > > > extracted, and I'm not shown the older bid because I've already
> > > > processed the later bid. The older bid is ignored.
> > >
> > > and
> > >
> > > > - Because of this, the replica already knows which timestamps it
> > > > has processed, and is able to ignore the older messages.
> > >
> > > Late arriving records are not dropped but processes regularly. Thus,
> > > your KTable aggregate function will be called for the late arriving
> > > record, too (but as described about, you have currently no way to know
> > > it is a later record).
> > >
> > >
> > > Last but not least, you last statement is a valid concern:
> > >
> > > > Also, what will happen if bid 2 arrived and got processed, and then
> > > > the particular replica crashed, and was restarted. The restarted
> > > > replica won't have any memory of which timestamps it has previously
> > > > processed.
> > > >
> > > > So if bid 2 got processed, replica crashed and restarted, and then
> > > > bid 1 arrived, what would happen in that case?
> > >
> > > In order to make this work, you would need to store the timestamp in
> > > you store next to the actual data. Thus, you can compare the timestamp
> > > of the latest result (safely stored in operator state) with the
> > > timestamp of the current record.
> > >
> > > Does this makes sense?
> > >
> > > To fix you issue, you could add a .transformValue() before you KTable,
> > > which allows you to access the timestamp of a record. If you add this
> > > timestamp to you value and pass it to KTable afterwards, you can
> > > access it and it gets also store reliably.
> > >
> > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
> > > timestamp} => aggregate
> > >
> > > Hope this helps.
> > >
> > > - -Matthias
> > >
> > >
> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > > > P.S, does my scenario require using windows, or can it be achieved
> > > > using just KTable?
> > > >
> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac200@gmail.com>
> > > > wrote:
> > > >
> > > >> Heya,
> > > >>
> > > >> Say I'm building a live auction site, with different products.
> > > >> Different users will bid on different products. And each time
> > > >> they do, I want to update the product's price, so it should
> > > >> always have the latest price in place.
> > > >>
> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> > > >> the same product 100 ms later.
> > > >>
> > > >> The second bid arrives first and the price is updated to $5. Then
> > > >> the first bid arrives. I want the price to not be updated in this
> > > >> case, as this bid is older than the one I've already processed.
> > > >>
> > > >> Here's my understanding of how I can achieve this with Kafka
> > > >> Streaming - is my understanding correct?
> > > >>
> > > >> - I have a topic for receiving bids. The topic has N partitions,
> > > >> and I have N replicas of my application which hooks up w/ Kafka
> > > >> Streaming, up and running.
> > > >>
> > > >> - I assume each replica of my app will listen to a different
> > > >> partition of the topic.
> > > >>
> > > >> - A user makes a bid on product A.
> > > >>
> > > >> - This is pushed to the topic with the key bid_a
> > > >>
> > > >> - Another user makes a bid. This is also pushed with the same key
> > > >> (bid_a)
> > > >>
> > > >> - The 2nd bid arrives first, and gets processed. Then the first
> > > >> (older) bid arrives.
> > > >>
> > > >> - Because I'm using a KTable, the timestamp of the messages is
> > > >> extracted, and I'm not shown the older bid because I've already
> > > >> processed the later bid. The older bid is ignored.
> > > >>
> > > >> - All bids on product A go to the same topic partition, and hence
> > > >> the same replica of my app, because they all have the key bid_a.
> > > >>
> > > >> - Because of this, the replica already knows which timestamps it
> > > >> has processed, and is able to ignore the older messages.
> > > >>
> > > >> Is the above understandning correct?
> > > >>
> > > >> Also, what will happen if bid 2 arrived and got processed, and
> > > >> then the particular replica crashed, and was restarted. The
> > > >> restarted replica won't have any memory of which timestamps it
> > > >> has previously processed.
> > > >>
> > > >> So if bid 2 got processed, replica crashed and restarted, and
> > > >> then bid 1 arrived, what would happen in that case?
> > > >>
> > > >> Thanks.
> > > >>
> > > >
> > > -----BEGIN PGP SIGNATURE-----
> > > Comment: GPGTools - https://gpgtools.org
> > >
> > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
> > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
> > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
> > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
> > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
> > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
> > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
> > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
> > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
> > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
> > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
> > > wLVWRn2J0n9mKSiOnHki
> > > =oJIL
> > > -----END PGP SIGNATURE-----
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message