flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
Date Sun, 19 Jun 2016 12:14:05 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338503#comment-15338503

ASF GitHub Bot commented on FLINK-3477:

Github user ggevay commented on a diff in the pull request:

    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
    @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
    -   * Creates a new [[DataSet]] by merging the elements of each group (elements with the
same key)
    -   * using an associative reduce function.
    -   */
    +    * Creates a new [[DataSet]] by merging the elements of each group (elements with
the same key)
    +    * using an associative reduce function.
    +    */
       def reduce(fun: (T, T) => T): DataSet[T] = {
    +    reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
    +  }
    +  /**
    +   * Special [[reduce]] operation for explicitly telling the system what strategy to
use for the
    +   * combine phase.
    +   * If null is given as the strategy, then the optimizer will pick the strategy.
    +   */
    +  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
    --- End diff --
    Unfortunately this doesn't seem to work either: japicmp is complaining [1] that the return
type of `DataSet.reduce` has changed (to the newly created `ReduceOperator` scala class),
which breaks binary compatibility [2]. I'm out of any better ideas than to just go back to
adding overloads that take the `CombineHint` as an additional parameter.
    [1] http://compalg.inf.elte.hu/~ggevay/japicmp.diff
    [2] https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.15

> Add hash-based combine strategy for ReduceFunction
> --------------------------------------------------
>                 Key: FLINK-3477
>                 URL: https://issues.apache.org/jira/browse/FLINK-3477
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Gabor Gevay
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a single value.
A Reduce function is incrementally applied to compute a final aggregated value. This allows
to hold the preaggregated value in a hash-table and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash table. The
hash table should support in place updates of elements (if the updated value has the same
size as the new value) but also appending updates with invalidation of the old value (if the
binary length of the new value differs). The hash table needs to be able to evict and emit
all elements if it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to {{DataSet.reduce()}} and {{Grouping.reduce()}}
to allow users to pick the execution strategy.

This message was sent by Atlassian JIRA

View raw message