spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "" <>
Subject How to prevent and track data loss/dropped due to watermark during structure streaming aggregation
Date Sat, 18 Jan 2020 08:57:27 GMT
We have a scenario to group raw records by correlation id every 3 minutes and
append groupped result to some HDFS store, below is an example of our query

    val df= records.readStream.format("SomeDataSource")
       .selectExpr("current_timestamp() as CurrentTime", "*")
      .withWatermark("CurrentTime", "2 minute")
      .groupBy(window($"CurrentTime", "3 minute"), $"CorrelationId")
      .agg(collect_list(col("data")) as "Records")
      .repartition(100, $"CorrelationId")
      .select($"CorrelationId", $"Records")

We want include delayed data even if there is processing delay in the
pipeline, and have the SLA of 5 minutes meaning once any record is read into
spark, we want to see the groupped output flush to hdfs within 5 minutes.

So, let's say during shuffle stage (groupby) or write stage, we have a delay
of 5 to 10 minutes, will we lose data due to watermark of 2 minutes here?
(sometimes it is ok to break SLA but we cannot afford data loss)   If so,
how can we prevent data loss or track the amount of data is being dropped in
this case?  

Note that, extending watermark to longer windows won't work in our append
scenario, because aggregate data won't be output to write stage until the
watermark timer is up.



Sent from:

To unsubscribe e-mail:

View raw message