kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Sukhinin <b.sukhi...@gmail.com>
Subject Re: Event-time skew when processing messages from different partitions
Date Fri, 04 Dec 2020 13:48:23 GMT
Matthias, thanks a lot for your reply and for the blog post. It took some time for me to wrap
my head around Kafka Streams "continuous refinement" approach and stop trying to mimick what
I've had in Flink.

Regards,
Boris.

On 2020/11/16 19:28:58, "Matthias J. Sax" <mjsax@apache.org> wrote: 
> I guess you mean `map()` not `mapValues()` in your example, because a
> `mapValues()` would not result in data repartitioning.
> 
> > Should it be a real concern, especially when processing large amounts of historical
data? 
> 
> It could be.
> 
> > Increasing the grace period doesn't seem like a valid option because it would delay
emitting the final aggregation result.
> 
> Well, this is the trade-off you face, and there is no way around it.
> Also, if you process historic data, latency is usually not the biggest
> concern. And for the "live" use case when you process from the tail of
> the input topics, the unorder should be bounded.
> 
> > Apache Flink handles this by emitting the lowest watermark on partitions merge.
> 
> Yes, but Flink would also emit the final result with larger latency,
> because the watermark would arrive later if an upstream tasks lags. I
> guess it's fair to say that Flink would be more "adaptive" here and if
> there is no lag it would close earlier than Kafka Streams. -- Thus, for
> Kafka Streams you might want to have a larger grace period if you
> process history data and a shorter grace period for the "live" case.
> 
> > Does Kafka Streams offer something to deal with this scenario?
> 
> Not at the moment. As said above, you could set different grace period
> for different scenarios.
> 
> 
> Shameless plug: I recently discussed this topic in a podcast:
> https://developer.confluent.io/podcast/why-kafka-streams-does-not-use-watermarks-ft-matthias-j-sax
> 
> 
> Hope this helps.
> 
> -Matthias
> 
> 
> On 11/16/20 9:38 AM, Boris Sukhinin wrote:
> > Let me clarify the question a little bit.
> > 
> > Consider the following code:
> > stream
> >   .mapValues(...)
> >   .groupBy(...)
> >   .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
> >   .aggregate(...)
> > 
> > I assume mapValues() operation could be slow for some tasks for whatever reason,
and because of that tasks do process messages at a different pace. When a shuffle happens
at the aggregate() operator, task 0 could have processed messages up to time T while task
1 is still at (T-skew), but messages from both tasks end up interleaved in a single partition
of the internal topic (corresponding to the grouping key).
> > 
> > My concern is that when skew is large enough (more than 10 seconds in my example),
messages from the lagging task 1 will be dropped.
> > 
> > Regards,
> > Boris
> > 
> > On 2020/11/16 10:33:10, ���������� ��������������
<b.sukhinin@gmail.com> wrote: 
> >> Hi All,
> >>
> >> Let's consider a topic with multiple partitions and messages written in event-time
order without any particular partitioning scheme. Kafka Streams application does some transformations
on these messages, then groups by some key, and then aggregates messages by an event-time
window with the given grace period.
> >>
> >> Each task could process incoming messages at a different speed (e.g., because
running on servers with different performance characteristics). This means that after groupBy
shuffle, event-time ordering will not be preserved between messages in the same partition
of the internal topic when they originate from different tasks. After a while, this event-time
skew could become larger than the aggregation window size + grace period, which would lead
to dropping messages originating from the lagging task.
> >>
> >> Should it be a real concern, especially when processing large amounts of historical
data? Increasing the grace period doesn't seem like a valid option because it would delay
emitting the final aggregation result. Apache Flink handles this by emitting the lowest watermark
on partitions merge. Does Kafka Streams offer something to deal with this scenario?
> >>
> >> Regards,
> >> Boris
> >>
> 

Mime
View raw message