beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kenneth Knowles (JIRA)" <>
Subject [jira] [Commented] (BEAM-1612) Support real Bundle in Flink runner
Date Wed, 03 May 2017 16:54:04 GMT


Kenneth Knowles commented on BEAM-1612:

I would have thought that Flink would require isomorphic computational patterns for good performance.
At the crudest level, this is the amortization pattern "write, write, write, write, write,
flush" (where "flush" is some arbitrarily complex thing like moving temp resources into place,
etc). So you cannot consider records processed (either via ACK or via considering the elements
included in a checkpoint/savepoint) until the flush occurs.

For my own education, maybe you or [~StephanEwen] can elaborate on the natural way to express
this pattern in Flink, or perhaps why it is not necessary? You may have already explained,
but I could use a refresh, and I'm sure the Beam community will benefit as well.

> Support real Bundle in Flink runner
> -----------------------------------
>                 Key: BEAM-1612
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
> The Bundle is very important in the beam model. Users can use the bundle to flush buffer,
can reuse many heavyweight resources in a bundle. Most IO plugins use the bundle to flush.

> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, such as
first placed in JavaHeap, flush into RocksDbState when invoke finishBundle , this can reduce
the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need support real
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But sometimes this
"Bundle" maybe too big. This depends on the user's checkpoint configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to a full-bundle
by count or eventTime or processTime or {{snapshot}}. We do not need to wait, just call the
startBundle and finishBundle at the right time.

This message was sent by Atlassian JIRA

View raw message