kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Streams topology does not replay correctly
Date Wed, 17 Jan 2018 02:39:06 GMT
You have more flexibility of course and thus can get better results. But
your code must be able to buffer multiple records from the KTable and
KStream input and also store the corresponding timestamps to perform the
join correctly. It's not trivial but also also not rocket-science.

If we need stronger guarantees, it's the best way to follow though atm,
until we have addressed those issues. Planned for 1.2.0 release.

-Matthias


On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> Right now I am thinking of re-writing anything that has these problematic
> KStream/KTable joins as KStream#transform() wherein the state store is
> manually used. Does that makes sense as an option for me?
> 
> -Dmitry
> 
> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky <dminkovsky@gmail.com>
> wrote:
> 
>> Earlier today I posted this question to SO
>> <https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly>
>> :
>>
>>> I have a topology that looks like this:
>>
>>     KTable<ByteString, User> users = topology.table(USERS,
>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>>
>>     KStream<ByteString, JoinRequest> joinRequests =
>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> joinRequestSerde))
>>         .mapValues(entityTopologyProcessor::userNew)
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>     topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>>         .join(users, entityTopologyProcessor::userSettingsConfirm,
>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>     topology.stream(SETTINGS_UPDATE_REQUESTS,
>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>>         .join(users, entityTopologyProcessor::userSettingsUpdate,
>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>>         .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>> At runtime this topology works fine. Users are created with join
>> requests. They confirm their settings with settings confirm requests. They
>> update their settings with settings update requests.
>>>
>>> However, reprocessing this topology does not produce the original
>> results. Specifically, the settings update joiner does not see the user
>> that resulted from the settings confirm joiner, even though in terms of
>> timestamps, many seconds elapse from the time the user is created, to the
>> time the user is confirmed to the time the user updates their settings.
>>>
>>> I'm at a loss. I've tried turning off caching/logging on the user table.
>> No idea what to do to make this reprocess properly.
>>
>> ----
>>
>> The response by Matthias, also on SO:
>>
>>> A KStream-KTable join is not 100% deterministic (and might never become
>> 100% deterministic). We are aware of the problem and discuss solutions, to
>> at least mitigate the issue.
>>>
>>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> control easily for which topics and/or partitions the broker returns data.
>> And depending on the order in which we receive data from the broker, the
>> result might slightly differ.
>>>
>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>>>
>>> This blog post might help, too: https://www.confluent.io/blog/
>> crossing-streams-joins-apache-kafka/
>>
>> ----
>>
>> I don't really know what to do with this response. I have been aware of
>> some "slight" discrepancy that might occur in edge cases with
>> KStream-KTable joins for some time now, but what I'm seeing is not a slight
>> discrepancy but very different results.
>>
>> I looked at the JIRA Matthias linked
>> <https://issues.apache.org/jira/browse/KAFKA-3514>. However, my data has
>> no late arriving records. I don't know about the empty buffers. I have read
>> the blog post he linked several times already.
>>
>> Can someone please suggest how I may obviate this problem? For example
>>
>>    - Would it make sense for me to try launching the topology with fewer
>>    threads during the reprocess?
>>    - Would it make sense for launch the topology with fewer input tasks?
>>    - Would it make sense to increase size of the stream buffer?
>>
>> I am at a total loss at this point. I cannot believe that there is nothing
>> I can do to replay this data and perform the migration I am trying to
>> perform, in order to release a next version of my application. Am I totally
>> screwed?
>>
>>
>> Thank you,
>> Dmitry
>>
>>
>>
>>
> 


Mime
View raw message