spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jose Torres (JIRA)" <>
Subject [jira] [Commented] (SPARK-23703) Collapse sequential watermarks
Date Thu, 03 May 2018 15:20:00 GMT


Jose Torres commented on SPARK-23703:

I'm no longer entirely convinced that this (and the parent JIRA) are correct. We might not
want to support these scenarios at all.

The question here is what we should do with the query:

df.withWatermark(“a”, …)
   .withWatermark(“b”, …)

What we do right now is definitely wrong. We (in MicroBatchExecution) calculate separate watermarks
on "a" and "b", take their minimum, and then pass that as the watermark value to the aggregate.
But the aggregate only sees "b" as a watermarked column, because only "b" has EventTimeWatermark.delayKey
set in its attribute metadata at the aggregate node. EventTimeWatermark("b").output erases
the metadata for "a" in its output.

So we need to somehow resolve this mismatch.

> Collapse sequential watermarks 
> -------------------------------
>                 Key: SPARK-23703
>                 URL:
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Jose Torres
>            Priority: Major
> When there are two sequential EventTimeWatermark nodes in a query plan, the topmost one
overrides the column tracking metadata from its children, but leaves the nodes themselves
untouched. When there is no intervening stateful operation to consume the watermark, we should
remove the lower node entirely.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message