beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee
Date Mon, 06 Feb 2017 04:25:41 GMT


Kenneth Knowles commented on BEAM-1396:

Glad you brought this to me without spending too much time on it. This code is actually meant
to be used in batch mode by any runner; it takes advantage of the batch style to be much more
efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / {{GroupAlsoByWindowViaWindowSetNewDoFn}} are the
"works no matter what" implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to deal with
the watermark, just move it from 0 to infinity, and also the input is sorted by timestamp"
but then we made it weird and added an incidental requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally useful (feel free
to write {{WorksOnlyForTheSparkRunnerGABW}} :-)

For this particular issue, the fix I will take is to remove the for loop over chunks of 1000,
which is the only reason sorting mattered. Essentially this GABW implementation runs "like"
a fully batch-centric version over 1000 elements at a time. The chunking was added to make
batch act somewhat like streaming - multiple outputs per key in GBK - to catch bugs etc. But
now we have a streaming direct runner and we should just focus this on simplicity, correctness,
performance, and usefulness to all the runners.

> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such
> --------------------------------------------------------------------------------------------------
>                 Key: BEAM-1396
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Amit Sela
>            Assignee: Kenneth Knowles
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but nothing
in the SDK guarantees this: 
> If such a chunk of timestamped values will be processed out-of-order I assume we'd end
up with an {{IllegalStateException}} thrown here:
> I suggest we go ahead and add sorting before processing the bundle in chunks - this might
prove expensive in extreme cases where a very large bundle with very few keys is processed,
but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't see an issue
here, though [] suggested we add a "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove difficult since
we would have to preset the order which is the problem to begin with :-)

This message was sent by Atlassian JIRA

View raw message