kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: Kafka Streams topology does not replay correctly
Date Wed, 17 Jan 2018 03:14:18 GMT
I meant “Thanks, yes I will try replacing...”

вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky <dminkovsky@gmail.com>:

> Thanks, yes try replacing the KStream-KTable joins with
> KStream#transform()s and a store. Not sure why you mean I’d need to buffer
> multiple records. The KStream has incoming events, and #transform() will
> let me mount the store and use it how I please. Within an application
> instance, any other KStream#transform()s using the same store will see the
> same data in real time.
>
> Now suppose I have three topics, each with events like this, each on their
> own KStream:
>
> T1 join
> T2 settings-confirm
> T3 settings-update
>
> Will the topology call the join transform before the settings-confirm
> transform before the settings-update transform?
>
>
>
> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax <matthias@confluent.io>:
>
>> You have more flexibility of course and thus can get better results. But
>> your code must be able to buffer multiple records from the KTable and
>> KStream input and also store the corresponding timestamps to perform the
>> join correctly. It's not trivial but also also not rocket-science.
>>
>> If we need stronger guarantees, it's the best way to follow though atm,
>> until we have addressed those issues. Planned for 1.2.0 release.
>>
>> -Matthias
>>
>>
>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>> > Right now I am thinking of re-writing anything that has these
>> problematic
>> > KStream/KTable joins as KStream#transform() wherein the state store is
>> > manually used. Does that makes sense as an option for me?
>> >
>> > -Dmitry
>> >
>> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkovsky@gmail.com
>> >
>> > wrote:
>> >
>> >> Earlier today I posted this question to SO
>> >> <
>> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
>> >
>> >> :
>> >>
>> >>> I have a topology that looks like this:
>> >>
>> >>     KTable<ByteString, User> users = topology.table(USERS,
>> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>> >>
>> >>     KStream<ByteString, JoinRequest> joinRequests =
>> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> >> joinRequestSerde))
>> >>         .mapValues(entityTopologyProcessor::userNew)
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>> >>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>> >>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>> >>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>> At runtime this topology works fine. Users are created with join
>> >> requests. They confirm their settings with settings confirm requests.
>> They
>> >> update their settings with settings update requests.
>> >>>
>> >>> However, reprocessing this topology does not produce the original
>> >> results. Specifically, the settings update joiner does not see the user
>> >> that resulted from the settings confirm joiner, even though in terms of
>> >> timestamps, many seconds elapse from the time the user is created, to
>> the
>> >> time the user is confirmed to the time the user updates their settings.
>> >>>
>> >>> I'm at a loss. I've tried turning off caching/logging on the user
>> table.
>> >> No idea what to do to make this reprocess properly.
>> >>
>> >> ----
>> >>
>> >> The response by Matthias, also on SO:
>> >>
>> >>> A KStream-KTable join is not 100% deterministic (and might never
>> become
>> >> 100% deterministic). We are aware of the problem and discuss
>> solutions, to
>> >> at least mitigate the issue.
>> >>>
>> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> >> control easily for which topics and/or partitions the broker returns
>> data.
>> >> And depending on the order in which we receive data from the broker,
>> the
>> >> result might slightly differ.
>> >>>
>> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>> >>>
>> >>> This blog post might help, too: https://www.confluent.io/blog/
>> >> crossing-streams-joins-apache-kafka/
>> >>
>> >> ----
>> >>
>> >> I don't really know what to do with this response. I have been aware of
>> >> some "slight" discrepancy that might occur in edge cases with
>> >> KStream-KTable joins for some time now, but what I'm seeing is not a
>> slight
>> >> discrepancy but very different results.
>> >>
>> >> I looked at the JIRA Matthias linked
>> >> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data
>> has
>> >> no late arriving records. I don't know about the empty buffers. I have
>> read
>> >> the blog post he linked several times already.
>> >>
>> >> Can someone please suggest how I may obviate this problem? For example
>> >>
>> >>    - Would it make sense for me to try launching the topology with
>> fewer
>> >>    threads during the reprocess?
>> >>    - Would it make sense for launch the topology with fewer input
>> tasks?
>> >>    - Would it make sense to increase size of the stream buffer?
>> >>
>> >> I am at a total loss at this point. I cannot believe that there is
>> nothing
>> >> I can do to replay this data and perform the migration I am trying to
>> >> perform, in order to release a next version of my application. Am I
>> totally
>> >> screwed?
>> >>
>> >>
>> >> Thank you,
>> >> Dmitry
>> >>
>> >>
>> >>
>> >>
>> >
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message