spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bowden, Chris" <>
Subject Re: Writing record once after the watermarking interval in Spark Structured Streaming
Date Fri, 30 Mar 2018 01:17:29 GMT
The watermark is just a user-provided indicator to spark that it's ok to drop internal state
after some period of time. The watermark "interval" doesn't directly dictate whether hot rows
are sent to a sink. Think of a hot row as data younger than the watermark. However, the watermark
will prevent cold rows from being fully processed and sent to the sink (e.g., rows older than
the watermark). There is no notion of requesting all data be queued and released only after
the watermark has advanced past the time-based groups in that queue.

If you want to ensure only one row per time-based group is sent to the sink, you could get
fancy with timeouts and flatMapGroupsWithState. Keep in mind, even in this scenario, the same
row may be sent more than once if a micro-batch is reprocessed (this is why it is important
for sinks to be idempotent, because it's really at-least-once effectively exactly-once).

In general, I would assume you care about this fine-grained control because your sink is not


From: karthikjay <>
Sent: Thursday, March 29, 2018 5:10:09 PM
Subject: Writing record once after the watermarking interval in Spark Structured Streaming

I have the following query:

    val ds = dataFrame
      .filter(! $"requri".endsWith(".m3u8"))
      .filter(! $"bserver".contains("trimmer"))
      .withWatermark("time", "120 seconds")
      .agg(sum("bytes")/1000000 as "byte_count")

How do I implement a foreach writer so that its process method is triggered
only once for every watermarking interval. i.e in the aforementioned
example, I will get the following

10.00-10.01 Channel-1 100(bytes)
10.00-10.01 Channel-2 120(bytes)
10.01-10.02 Channel-1 110(bytes)

Sent from:

To unsubscribe e-mail:

View raw message