beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <>
Subject [jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
Date Mon, 26 Sep 2016 12:52:20 GMT


Aljoscha Krettek commented on BEAM-644:

[~kenn] I was referring to "two clusters of elements from two separate input elements" but
that's somewhat besides the point because I was thinking about how a Kafka source would be
implemented as a combination of {{DoFn}} plus {{SplittableDoFn}}. There you need to manage
the watermark at the {{SplittableDoFn}} which would be responsible for reading from topics.

I think we might be talking about different things here. As I said, the proposed changes are
very good in how they simplify the API of {{DoFn}} and also clean up stuff around allowed
time skew.

What I was thinking about is in general a problem with watermarks. I though that the proposal
here was meant to fixed that but I don't think we can. What I was trying to get at essentially
boils down to this: If we want our watermark to be 100 % correct then we can never advance
it because we never know what timestamps future elements will have. (For the general case,
where any data with any timestamp can arrive at any point in (processing) time.). I was just
pondering that and I'm afraid it derailed the discussion a bit.

> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>                 Key: BEAM-644
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
> There is a general need, especially important in the presence of SplittableDoFn, to be
able to assign new timestamps to elements without making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one to produce
late data, but does not allow one to shift the watermark so the new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log file that
contains elements for the day preceding the log file. The timestamp on the filename must currently
be the beginning of the log. If such elements are constantly flowing, it may be OK, but since
we don't know that element is coming, in that absence of data, the watermark may advance.
We need a way to keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the following pieces:
>  - A constant duration (positive or negative) D by which to shift the watermark.
>  - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make timestamps
up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew could be
removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were mentioned.

This message was sent by Atlassian JIRA

View raw message