kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: Ordering of messages in the same kafka streams sub-topology with multiple sinks for the same topic
Date Tue, 03 Dec 2019 15:59:59 GMT
Hi Vasily,

Probably in this case, with the constraints you’re providing, the first branch would output
first, but I wouldn’t depend on it. Any small change in your program could mess this up,
and also any change in Streams could alter the exact execution order also. 

The right way to think about these programs is as “data flows”. You’re taking a stream
of data and defining two separate branches into smaller streams, and then later on merging
those back into one stream. In general, there would be no defined ordering, just like if you
imagine doing the same thing with literal water streams. 

If you want a guarantee about the relative ordering, You’d have to use a specific operator
that does what you want. If nothing else comes to mind, then a custom transformer or processor
that gets records from both branches, and buffers records from the second so that it can emit
the record from the first branch first would do the trick. 


On Tue, Dec 3, 2019, at 06:32, Vasily Sulatskov wrote:
> Hello,
> I wonder if ordering of the messages is preserved by kafka streams when 
> the messages are processes by the same sub-topology without 
> redistribution and in the end there are multiple sinks for the same 
> topic. 
> I couldn't find the answer to this question in the docs/mailing 
> list/stack overflow.
> You can arrive to this situation with the code like this:
> val source = builder.stream[Key, Value]("input")
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> Basically it's two different processing branches, that process each 
> input value slightly differently. I.e. if one branch produces a 
> message, in response to an input message, the other branch will produce 
> the message as well. So keeping the ordering in this case means, all 
> messages produces for earlier source messages on one branch should 
> precede messages produced by the other branch for later source messages.
> Here's my topology:
>   Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000019 (topics: [input])
>       --> KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
>       --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
>       <-- KSTREAM-SOURCE-0000000019
>     Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
>       --> KSTREAM-TRANSFORM-0000000023
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
>       --> KSTREAM-MAP-0000000027
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
>       --> KSTREAM-MAP-0000000024
>       <-- KSTREAM-MAPVALUES-0000000022
>     Processor: KSTREAM-MAP-0000000024 (stores: [])
>       --> KSTREAM-FILTER-0000000025
>       <-- KSTREAM-TRANSFORM-0000000023
>     Processor: KSTREAM-MAP-0000000027 (stores: [])
>       --> KSTREAM-FILTER-0000000028
>       <-- KSTREAM-TRANSFORM-0000000021
>     Processor: KSTREAM-FILTER-0000000025 (stores: [])
>       --> KSTREAM-SINK-0000000026
>       <-- KSTREAM-MAP-0000000024
>     Processor: KSTREAM-FILTER-0000000028 (stores: [])
>       --> KSTREAM-SINK-0000000029
>       <-- KSTREAM-MAP-0000000027
>     Sink: KSTREAM-SINK-0000000026 (topic: output)
>       <-- KSTREAM-FILTER-0000000025
>     Sink: KSTREAM-SINK-0000000029 (topic: output)
>       <-- KSTREAM-FILTER-0000000028
> On one hand I guess that it all information coming from one partition 
> will be processed by one thread, so it can keep the order of the 
> messages, but on the other hand I see two independent sinks in the 
> topology, with independent buffers etc I guess. So in the end I am not 
> sure what's going to happen.
> I would guess that it can work because sinks probably have the same 
> buffer size, but it's not guaranteed. I can imagine a following failure 
> scenario: a write by one sink can succeed while the write by the other 
> sink fails, so a batch of messages gets delivered to the output 
> partition out of order. 
> Can someone please clarify what happens in this case? Is there an 
> ordering guarantee? Can this streams be merged while preserving 
> ordering?
> I know that regular Source.merge() doesn't preserve ordering, but in 
> this case I know that there's no repartitioning etc, and messages 
> basically appear on the same "tick", so it feels like there should be a 
> way to do this. Can I keep ordering if I replace my transformers with 
> processors and manually connect them to the same sink?
> --
> Best regards,
> Vasily Sulatskov

View raw message