beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
Date Fri, 23 Sep 2016 17:02:20 GMT


Kenneth Knowles commented on BEAM-644:

[~aljoscha], are you referring to two clusters of output arising from the same input element,
or two clusters of output from two separate input elements, with no data in between? Let me
try to write this up again; I am just rephrasing your question and Ben's answer to see if
I got it right.

In the case where the two clusters around {{t}} and {{t+100}} arise from the same input element
(for example, the same subscription being read by the {{SplittableDoFn}}) then the {{futureOutputWatermark()}}
method of the {{ProcessContinuation}} manages the watermark in a manner identically to {{UnboundedSource}}
today. This is #1 from [~bchambers] if I understand correctly. This method will be polled
regularly in the absence of data, so you just have to do the best you can to not move it forward
too fast, the same as today.

In the case where there are two input elements that results in initial clusters of output
that are respectively around {{t}} and {{t+100}} (if they are non-initial clusters, then I
believe it reduces to the above case) then either the timestamps of those elements can give
a close enough approximation so the input watermark is in the right place, or an explicit
shift can be added using this new proposed primitive, or some similar capability. This is
#2 from [~bchambers].

Do you have a different case in mind?

> Primitive to shift the watermark while assigning timestamps
> -----------------------------------------------------------
>                 Key: BEAM-644
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
> There is a general need, especially important in the presence of SplittableDoFn, to be
able to assign new timestamps to elements without making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one to produce
late data, but does not allow one to shift the watermark so the new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log file that
contains elements for the day preceding the log file. The timestamp on the filename must currently
be the beginning of the log. If such elements are constantly flowing, it may be OK, but since
we don't know that element is coming, in that absence of data, the watermark may advance.
We need a way to keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the following pieces:
>  - A constant duration (positive or negative) D by which to shift the watermark.
>  - A function from TimestampedElement<T> to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make timestamps
up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew could be
removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were mentioned.

This message was sent by Atlassian JIRA

View raw message