beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Groh (JIRA)" <>
Subject [jira] [Assigned] (BEAM-1287) Give new DoFn the ability to output to a particular window
Date Wed, 14 Jun 2017 22:40:00 GMT


Thomas Groh reassigned BEAM-1287:

    Assignee: Thomas Groh

> Give new DoFn the ability to output to a particular window
> ----------------------------------------------------------
>                 Key: BEAM-1287
>                 URL:
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model, sdk-java-core, sdk-py
>            Reporter: Kenneth Knowles
>            Assignee: Thomas Groh
> The new {{DoFn}} design allows us to have specialized output receivers, such as a key-preserving
output (the default is non-key-preserving) or non-window-preserving (the default is window-preserving)
output. This JIRA is for the latter, with an emphasis on making the two as analogous as we
> {code}
> new DoFn<A, B>() {
>   @ProcessElement
>   public void processElement(ProcessContext c, OutputToWindow receiver) {
>     receiver.outputWithTimestamp(value, timestamp, window);
>   }
> }
> {code}
> After this change, window assignment need not be a primitive.
> Why is this OK? The primary motivation for keeping windows strongly separated is because
they yield parallelism if we don't impose any requirement that multiple windows for a single
key be co-located or linearized. We should be able to process a single key with millions of
non-merging windows in parallel without having to reify the windows (though this isn't _that_
bad). That is a major change/improvement over the vague assumption that keys are the atom
of parallelism.
> This change will not remove this property, as it pertains to input and state. The analogy
with keys:
>  - Stateful DoFn requires the ability to access key-and-window state. For some runners,
perhaps this does not require colocation. For runners that want to do this efficiently/locally,
it means some key-and-window colocation operation followed by only key-and-window preserving
transforms. So outputting to a new window breaks the invariant, just as a non-key-preserving
transform would. Until we had the new {{DoFn}} we couldn't know if non-window-preserving output
was used.
>  - Non-key-preserving output also breaks any idea that combined aggregates are actually
one per key, etc. So windows can work the same way.
>  - Timestamps are interesting. By analogy with keys, timestamps would be just part of
the value and able to change freely. This doesn't work so well because of lateness. To avoid
digging deeper into changing anything, this proposal just suggests that a timestamp is provided,
and whether it is allowed to be late is governed by the same rules as {{outputWithTimestamp}}.
>  - Not clear if this has uses for merging windows.
> This change is entirely backwards compatible, but given that it removes a primitive and
is rather little effort, it might bear earlier consideration. No work will begin until it
is brought to the dev list.

This message was sent by Atlassian JIRA

View raw message