beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Daniel Halperin (JIRA)" <>
Subject [jira] [Commented] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows
Date Wed, 19 Oct 2016 17:27:58 GMT


Daniel Halperin commented on BEAM-696:

Let me try to restate what Kenn said above, and see if that explains why I disagree with this

The model has defined {{Combine.perKey}} as the following composite: {{GroupByKey}} | {{Map\[preserve
K, convert Iterable<V> into Combine(Iterable<V>)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it is possible
to "pre-combine" some of the steps. As Amit has noted, this is an important optimization for
nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, specifically in
the case of merging windows in the main input. I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to "pre-combine",
providing the semantics of {{Combine.perKey}}. Yes this sacrifices an optimization opportunity,
but not really -- only in the case where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the Spark runner currently
does, and instead suggest that a user only use {{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear and let runners
optimize only when it is correct to do so. Users should not have to change their transforms
dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because of a choice
of window, while making the user think about their choice of transform would make this explicit.
That is true. But I think that this would lead to users with incorrect pipelines that might
not notice.

So: I think this is a bug in the Spark Runner, and Spark runner should mimic the logic from
the Flink Runner. Don't use pre-combining if this would result in violating the simple composite
definition of {{Combine.perKey}}.

> Side-Inputs non-deterministic with merging main-input windows
> -------------------------------------------------------------
>                 Key: BEAM-696
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Ben Chambers
>            Assignee: Pei He
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable because triggers
are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to lookup the side-input.
This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may cause problems
with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute within a
Merging WindowFn.
> Possible solution would be to defer running anything that looks up the side-input until
we need to extract an output, and using the main-window at that point. Specifically, if the
main-window is a MergingWindowFn, don't execute any kind of pre-combine, instead buffer all
the inputs and combine later.
> This could still run into some non-determinism if there are triggers controlling when
we extract output.

This message was sent by Atlassian JIRA

View raw message