kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ali Akhtar <ali.rac...@gmail.com>
Subject Re: Understanding out of order message processing w/ Streaming
Date Thu, 13 Oct 2016 15:48:48 GMT
I am probably being too ocd anyway. It will almost never happen that
messages from another vm in the same network on ec2 arrive out of order.
Right?

On 13 Oct 2016 8:47 pm, "Ali Akhtar" <ali.rac200@gmail.com> wrote:

> Makes sense. Thanks
>
> On 13 Oct 2016 12:42 pm, "Michael Noll" <michael@confluent.io> wrote:
>
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>>
>> Yes -- if your processing logic depends on the specific ordering of
>> messages (which is the case for you), then you must manually implement
>> this
>> ordering-specific logic at the moment.
>>
>> Other use cases may not need to do that and "just work" even with
>> out-of-order data.  If, for example, you are counting objects or are
>> computing the sum of numbers, then you do not need to anything special.
>>
>>
>>
>>
>>
>> On Wed, Oct 12, 2016 at 10:22 PM, Ali Akhtar <ali.rac200@gmail.com>
>> wrote:
>>
>> > Thanks Matthias.
>> >
>> > So, if I'm understanding this right, Kafka will not discard which
>> messages
>> > which arrive out of order.
>> >
>> > What it will do is show messages in the order in which they arrive.
>> >
>> > But if they arrive out of order, I have to detect / process that myself
>> in
>> > the processor logic.
>> >
>> > Is that correct?
>> >
>> > Thanks.
>> >
>> > On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> > wrote:
>> >
>> > > -----BEGIN PGP SIGNED MESSAGE-----
>> > > Hash: SHA512
>> > >
>> > > Last question first: A KTable is basically in finite window over the
>> > > whole stream providing a single result (that gets updated when new
>> > > data arrives). If you use windows, you cut the overall stream into
>> > > finite subsets and get a result per window. Thus, I guess you do not
>> > > need windows (if I understood you use case correctly).
>> > >
>> > > However, current state of Kafka Streams DSL, you will not be able to
>> > > use KTable (directly -- see suggestion to fix this below) because is
>> > > does (currently) not allow to access the timestamp of the current
>> > > record (thus, you can not know if a record is late or not). You will
>> > > need to use Processor API which allows you to access the current
>> > > records timestamp via the Context object given in init()
>> > >
>> > > Your reasoning about partitions and Streams instances is correct.
>> > > However, the following two are not
>> > >
>> > > > - Because I'm using a KTable, the timestamp of the messages is
>> > > > extracted, and I'm not shown the older bid because I've already
>> > > > processed the later bid. The older bid is ignored.
>> > >
>> > > and
>> > >
>> > > > - Because of this, the replica already knows which timestamps it
>> > > > has processed, and is able to ignore the older messages.
>> > >
>> > > Late arriving records are not dropped but processes regularly. Thus,
>> > > your KTable aggregate function will be called for the late arriving
>> > > record, too (but as described about, you have currently no way to know
>> > > it is a later record).
>> > >
>> > >
>> > > Last but not least, you last statement is a valid concern:
>> > >
>> > > > Also, what will happen if bid 2 arrived and got processed, and then
>> > > > the particular replica crashed, and was restarted. The restarted
>> > > > replica won't have any memory of which timestamps it has previously
>> > > > processed.
>> > > >
>> > > > So if bid 2 got processed, replica crashed and restarted, and then
>> > > > bid 1 arrived, what would happen in that case?
>> > >
>> > > In order to make this work, you would need to store the timestamp in
>> > > you store next to the actual data. Thus, you can compare the timestamp
>> > > of the latest result (safely stored in operator state) with the
>> > > timestamp of the current record.
>> > >
>> > > Does this makes sense?
>> > >
>> > > To fix you issue, you could add a .transformValue() before you KTable,
>> > > which allows you to access the timestamp of a record. If you add this
>> > > timestamp to you value and pass it to KTable afterwards, you can
>> > > access it and it gets also store reliably.
>> > >
>> > > <bid_id : bid_value> => transformValue => <bid_id : {bid_value,
>> > > timestamp} => aggregate
>> > >
>> > > Hope this helps.
>> > >
>> > > - -Matthias
>> > >
>> > >
>> > > On 10/11/16 9:12 PM, Ali Akhtar wrote:
>> > > > P.S, does my scenario require using windows, or can it be achieved
>> > > > using just KTable?
>> > > >
>> > > > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar <ali.rac200@gmail.com>
>> > > > wrote:
>> > > >
>> > > >> Heya,
>> > > >>
>> > > >> Say I'm building a live auction site, with different products.
>> > > >> Different users will bid on different products. And each time
>> > > >> they do, I want to update the product's price, so it should
>> > > >> always have the latest price in place.
>> > > >>
>> > > >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> > > >> the same product 100 ms later.
>> > > >>
>> > > >> The second bid arrives first and the price is updated to $5. Then
>> > > >> the first bid arrives. I want the price to not be updated in this
>> > > >> case, as this bid is older than the one I've already processed.
>> > > >>
>> > > >> Here's my understanding of how I can achieve this with Kafka
>> > > >> Streaming - is my understanding correct?
>> > > >>
>> > > >> - I have a topic for receiving bids. The topic has N partitions,
>> > > >> and I have N replicas of my application which hooks up w/ Kafka
>> > > >> Streaming, up and running.
>> > > >>
>> > > >> - I assume each replica of my app will listen to a different
>> > > >> partition of the topic.
>> > > >>
>> > > >> - A user makes a bid on product A.
>> > > >>
>> > > >> - This is pushed to the topic with the key bid_a
>> > > >>
>> > > >> - Another user makes a bid. This is also pushed with the same
key
>> > > >> (bid_a)
>> > > >>
>> > > >> - The 2nd bid arrives first, and gets processed. Then the first
>> > > >> (older) bid arrives.
>> > > >>
>> > > >> - Because I'm using a KTable, the timestamp of the messages is
>> > > >> extracted, and I'm not shown the older bid because I've already
>> > > >> processed the later bid. The older bid is ignored.
>> > > >>
>> > > >> - All bids on product A go to the same topic partition, and hence
>> > > >> the same replica of my app, because they all have the key bid_a.
>> > > >>
>> > > >> - Because of this, the replica already knows which timestamps
it
>> > > >> has processed, and is able to ignore the older messages.
>> > > >>
>> > > >> Is the above understandning correct?
>> > > >>
>> > > >> Also, what will happen if bid 2 arrived and got processed, and
>> > > >> then the particular replica crashed, and was restarted. The
>> > > >> restarted replica won't have any memory of which timestamps it
>> > > >> has previously processed.
>> > > >>
>> > > >> So if bid 2 got processed, replica crashed and restarted, and
>> > > >> then bid 1 arrived, what would happen in that case?
>> > > >>
>> > > >> Thanks.
>> > > >>
>> > > >
>> > > -----BEGIN PGP SIGNATURE-----
>> > > Comment: GPGTools - https://gpgtools.org
>> > >
>> > > iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
>> > > 6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
>> > > 6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn
>> > > AXi+IZn07fTvQeShahwg7pLL5xbbE4u6w7YBNqTuvlYNglKI2CUK1EE2jw5Gp2sy
>> > > sjnHCIXDCBhFYyxxdKWTsfHEV74wUI4ARvRChJondY/uRxc5u+INCNax79N2Syq9
>> > > S/ffQvaCS5PJ0nwcv2Gu7WDkrxVu+sP+nwSoxoE3bE1iYH91KLmdLlmBnJ9j+6g/
>> > > i7P7+kwf4a04KMZtGXCU2ZGQjnSlIsjTSFuEE8ASFeRkzGBhM1zDoMNHys6dQDSR
>> > > lgB8eIay2jknUeWR+NJLuerwJZTPYfnlPBZ1jYoaKKsnHDleS69sn0BstphZ/3k5
>> > > fsQz435/emecRZI6Vok9+9FvehPmJ0Jsz70sUlhJS7hvpJ+0D+aI0VbRAUxML7QX
>> > > 7IOw3gLGi8K+bCGxB80AidbSGvzcuEqyrW/9wPttgIuqFjfGcF80nyKsvvgySLnE
>> > > 0RlM0qm24fzCzxFlNZQEJrmJ9YsaNWCQ4qhzuwGhQC1bBEa10Jy5Dqjj1lwA/G+v
>> > > wLVWRn2J0n9mKSiOnHki
>> > > =oJIL
>> > > -----END PGP SIGNATURE-----
>> > >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message