kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasily Sulatskov <Vasily.Sulats...@cfm.fr>
Subject Ordering of messages in the same kafka streams sub-topology with multiple sinks for the same topic
Date Tue, 03 Dec 2019 12:32:49 GMT
Hello,

I wonder if ordering of the messages is preserved by kafka streams when the messages are processes
by the same sub-topology without redistribution and in the end there are multiple sinks for
the same topic. 

I couldn't find the answer to this question in the docs/mailing list/stack overflow.

You can arrive to this situation with the code like this:
 
val source = builder.stream[Key, Value]("input")
source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

Basically it's two different processing branches, that process each input value slightly differently.
I.e. if one branch produces a message, in response to an input message, the other branch will
produce the message as well. So keeping the ordering in this case means, all messages produces
for earlier source messages on one branch should precede messages produced by the other branch
for later source messages.

Here's my topology:

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000019 (topics: [input])
      --> KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
      <-- KSTREAM-SOURCE-0000000019
    Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
      --> KSTREAM-TRANSFORM-0000000023
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
      --> KSTREAM-MAP-0000000027
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-MAPVALUES-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-FILTER-0000000025
      <-- KSTREAM-TRANSFORM-0000000023
    Processor: KSTREAM-MAP-0000000027 (stores: [])
      --> KSTREAM-FILTER-0000000028
      <-- KSTREAM-TRANSFORM-0000000021
    Processor: KSTREAM-FILTER-0000000025 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-MAP-0000000027
    Sink: KSTREAM-SINK-0000000026 (topic: output)
      <-- KSTREAM-FILTER-0000000025
    Sink: KSTREAM-SINK-0000000029 (topic: output)
      <-- KSTREAM-FILTER-0000000028

On one hand I guess that it all information coming from one partition will be processed by
one thread, so it can keep the order of the messages, but on the other hand I see two independent
sinks in the topology, with independent buffers etc I guess. So in the end I am not sure what's
going to happen.

I would guess that it can work because sinks probably have the same buffer size, but it's
not guaranteed. I can imagine a following failure scenario: a write by one sink can succeed
while the write by the other sink fails, so a batch of messages gets delivered to the output
partition out of order. 

Can someone please clarify what happens in this case? Is there an ordering guarantee? Can
this streams be merged while preserving ordering?

I know that regular Source.merge() doesn't preserve ordering, but in this case I know that
there's no repartitioning etc, and messages basically appear on the same "tick", so it feels
like there should be a way to do this. Can I keep ordering if I replace my transformers with
processors and manually connect them to the same sink?


--
Best regards,
Vasily Sulatskov


Mime
View raw message