spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eiise <>
Subject [Structured Streaming] Event-Time ordering of two Kafka topics with different message volumes
Date Fri, 24 Apr 2020 11:25:39 GMT
we are currently struggling to bring order into a merge of two streaming
data sets from different (keyed -> so we can assume an order per
partition/key) Kafka sources, with 4 partitions each, using the Java API.
On both topics we receive messages that contain varying field-subsets of our
internal schema. Currently just use one dataset with a source reading from
both topics. The data then runs through a flatMapGroupWithState operation.
Our state lifecycle is bound to a start / end trigger from Topic A. Within
this lifecycle the state gets updated with data from topic A AND topic B. 
This leads to various problems: 
- With varying loads and small start/stop intervals one microbatch from
topic A may contain both, the start and stop trigger of a key. In that case
all messages from topic B with with an event time between these triggers get
- The message volume on topic B is (magnitudes higher than the load on topic
A, this may also lead to the event time of processed messages running out of
sync. Especially in cases where bulk data reprocessing is necessary (this
use case occurs and involves data from HDFS being dumped into our
preprocessing pipeline resulting in two source topics with even more
potential to run out of sync)

To cope with this problem we are looking at three different approaches:
- Messages are ingested and distributed to their topics form mqtt. To
introduce - at least processing time - order we could write both message
types A and B to one additional keyed topic. As we are required to also keep
the original topcis for other pipelines this would mitigate our issure at
the expense of massive data duplication.
- Merge data from both topics inside spark: This would either mean
throttling the read from one topic until the minimum timestamp over all
partitions has been caught up by the opposite topic (if possible) or just
buffering all messages per GroupState until timestamps are in sync and
ordering / flushing the lot. 

As of now we are pretty stumped on how to implement the latter options and
would greatly appreciate any opinions on feasibility of each approach, if it
is possible at all or if there is some kind of silver bullet to solve our
problem that we just don't see. 
If you need any more information I will try my best to provide it.


Sent from:

To unsubscribe e-mail:

View raw message