kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: Kafka Streams topology does not replay correctly
Date Wed, 17 Jan 2018 13:25:01 GMT
> That depends what data the consumer fetches and this part is hard to
predict. For this reason, you need to buffer multiple records in a
store, in case data does not arrive in the order and you need it
(between different topics) and later do the processing in the correct
order when you got all data you need. Does this make sense?

I understand. Thanks for the explanation. That’s what I concluded when I
was wondering you were talking about buffers. All this time I though the
StreamThread did this to some extent. Fundamental misconception on my part.
So the “best effort” synchronization doesn’t apply at all across
independent streams? Only applies in the case of joins? Does it apply in
the case of merge?

Thank you,
Dmitry


ср, 17 янв. 2018 г. в 2:39, Matthias J. Sax <matthias@confluent.io>:

> >>> The KStream has incoming events, and #transform() will
> >>> let me mount the store and use it how I please. Within an application
> >>> instance, any other KStream#transform()s using the same store will see
> the
> >>> same data in real time.
>
> That sounds basically correct. But you don't know the order (between
> different topics) in which you will receive the data.
>
> >>> Will the topology call the join transform before the settings-confirm
> >>> transform before the settings-update transform?
>
> That depends what data the consumer fetches and this part is hard to
> predict. For this reason, you need to buffer multiple records in a
> store, in case data does not arrive in the order and you need it
> (between different topics) and later do the processing in the correct
> order when you got all data you need. Does this make sense?
>
> This is the underlying problem for KStream-KTable join, too. If might
> happen hat we get 100 KTable records that we all process before we
> receive 100 KStream records. For the correct result it might be required
> to get 50 KTable and 50 KStream in the first poll call and the rest in
> the second. But we don't know and just process whatever we get.
>
>
> -Matthias
>
>
> On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
> > I meant “Thanks, yes I will try replacing...”
> >
> > вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dminkovsky@gmail.com>:
> >
> >> Thanks, yes try replacing the KStream-KTable joins with
> >> KStream#transform()s and a store. Not sure why you mean I’d need to
> buffer
> >> multiple records. The KStream has incoming events, and #transform() will
> >> let me mount the store and use it how I please. Within an application
> >> instance, any other KStream#transform()s using the same store will see
> the
> >> same data in real time.
> >>
> >> Now suppose I have three topics, each with events like this, each on
> their
> >> own KStream:
> >>
> >> T1 join
> >> T2 settings-confirm
> >> T3 settings-update
> >>
> >> Will the topology call the join transform before the settings-confirm
> >> transform before the settings-update transform?
> >>
> >>
> >>
> >> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matthias@confluent.io>:
> >>
> >>> You have more flexibility of course and thus can get better results.
> But
> >>> your code must be able to buffer multiple records from the KTable and
> >>> KStream input and also store the corresponding timestamps to perform
> the
> >>> join correctly. It's not trivial but also also not rocket-science.
> >>>
> >>> If we need stronger guarantees, it's the best way to follow though atm,
> >>> until we have addressed those issues. Planned for 1.2.0 release.
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> >>>> Right now I am thinking of re-writing anything that has these
> >>> problematic
> >>>> KStream/KTable joins as KStream#transform() wherein the state store
is
> >>>> manually used. Does that makes sense as an option for me?
> >>>>
> >>>> -Dmitry
> >>>>
> >>>> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <
> dminkovsky@gmail.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Earlier today I posted this question to SO
> >>>>> <
> >>>
> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
> >>>>
> >>>>> :
> >>>>>
> >>>>>> I have a topology that looks like this:
> >>>>>
> >>>>>     KTable<ByteString, User> users = topology.table(USERS,
> >>>>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
> >>>>>
> >>>>>     KStream<ByteString, JoinRequest> joinRequests =
> >>>>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >>>>> joinRequestSerde))
> >>>>>         .mapValues(entityTopologyProcessor::userNew)
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >>>>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >>>>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
> >>>>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
> >>>>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >>>>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
> >>>>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> >>>>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>>>>
> >>>>>> At runtime this topology works fine. Users are created with
join
> >>>>> requests. They confirm their settings with settings confirm requests.
> >>> They
> >>>>> update their settings with settings update requests.
> >>>>>>
> >>>>>> However, reprocessing this topology does not produce the original
> >>>>> results. Specifically, the settings update joiner does not see the
> user
> >>>>> that resulted from the settings confirm joiner, even though in terms
> of
> >>>>> timestamps, many seconds elapse from the time the user is created,
to
> >>> the
> >>>>> time the user is confirmed to the time the user updates their
> settings.
> >>>>>>
> >>>>>> I'm at a loss. I've tried turning off caching/logging on the
user
> >>> table.
> >>>>> No idea what to do to make this reprocess properly.
> >>>>>
> >>>>> ----
> >>>>>
> >>>>> The response by Matthias, also on SO:
> >>>>>
> >>>>>> A KStream-KTable join is not 100% deterministic (and might never
> >>> become
> >>>>> 100% deterministic). We are aware of the problem and discuss
> >>> solutions, to
> >>>>> at least mitigate the issue.
> >>>>>>
> >>>>>> One problem is, that if a Consumer fetches from the brokers,
we
> cannot
> >>>>> control easily for which topics and/or partitions the broker returns
> >>> data.
> >>>>> And depending on the order in which we receive data from the broker,
> >>> the
> >>>>> result might slightly differ.
> >>>>>>
> >>>>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >>>>>>
> >>>>>> This blog post might help, too: https://www.confluent.io/blog/
> >>>>> crossing-streams-joins-apache-kafka/
> >>>>>
> >>>>> ----
> >>>>>
> >>>>> I don't really know what to do with this response. I have been aware
> of
> >>>>> some "slight" discrepancy that might occur in edge cases with
> >>>>> KStream-KTable joins for some time now, but what I'm seeing is not
a
> >>> slight
> >>>>> discrepancy but very different results.
> >>>>>
> >>>>> I looked at the JIRA Matthias linked
> >>>>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However,
my data
> >>> has
> >>>>> no late arriving records. I don't know about the empty buffers.
I
> have
> >>> read
> >>>>> the blog post he linked several times already.
> >>>>>
> >>>>> Can someone please suggest how I may obviate this problem? For
> example
> >>>>>
> >>>>>    - Would it make sense for me to try launching the topology with
> >>> fewer
> >>>>>    threads during the reprocess?
> >>>>>    - Would it make sense for launch the topology with fewer input
> >>> tasks?
> >>>>>    - Would it make sense to increase size of the stream buffer?
> >>>>>
> >>>>> I am at a total loss at this point. I cannot believe that there
is
> >>> nothing
> >>>>> I can do to replay this data and perform the migration I am trying
to
> >>>>> perform, in order to release a next version of my application. Am
I
> >>> totally
> >>>>> screwed?
> >>>>>
> >>>>>
> >>>>> Thank you,
> >>>>> Dmitry
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message