flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stephan Ewen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-65) Support custom and efficient (in-memory) pre-aggregations (without Combiner)
Date Fri, 29 Aug 2014 17:13:54 GMT

     [ https://issues.apache.org/jira/browse/FLINK-65?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Stephan Ewen resolved FLINK-65.

       Resolution: Fixed
    Fix Version/s:     (was: pre-apache)
         Assignee: Stephan Ewen

Solved through mapPartition() function, added in d4de9774b3237bb1850024b1208640bc50f7adab

> Support custom and efficient (in-memory) pre-aggregations (without Combiner)
> ----------------------------------------------------------------------------
>                 Key: FLINK-65
>                 URL: https://issues.apache.org/jira/browse/FLINK-65
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>            Assignee: Stephan Ewen
>              Labels: github-import
>             Fix For: 0.7-incubating
> I use and evaluate Stratosphere in the course of my thesis and want to give feedback
on what I found is missing or could be improved. This is written from a user perspective.
> ### Requirement
> We have partitioned input we want group and aggregate (by a grouping-key) to a single
or multiple records. The input is  either streamed from hdfs or pipelined from other contracts.
Our input is not (necessarily) partitioned by the grouping-key. The input we want to aggregate
is the input of any tuple-at-a-time contract such as Map, Cross or Match. We can express this
requirement in terms of SQL: “SELECT key, aggregate(value) FROM records GROUP BY key”.
Common cases are that the key has cardinality 1, 2, or any other small, medium or high cardinality.
> ### Problem
> Currently only the Combine+Reduce/CoGroup strategy is supported: Forward all data to
a Combiner, repartition it’s output by the grouping-key and do the final aggregation in
Reduce. **For N record this always involves emission of N intermediate records to the Combiner**.
Even though the values are just forwarded to the local Combiner, there is a lot of copying,
(de)serialization and probably some networking overhead involved (jobmanager status). Also
the user has to manually write the code to serialize the input to a record and include the
Combiner. If we want to do a join and aggregate, this could mean that a udf does nothing but
to forward the joined tuples to a Combiner/Reducer.
> ### Goal
> Enable efficient in-memory aggregation (for the case where efficiency matters - otherwise
I can use Combiner/Reducer approach), reduce the (de)serialization overhead, reduce the “stupid”
code to be written by the user (e.g. forward all tuples to a combiner).
> ### Use cases
> Some use cases that would benefit from efficient pre-aggregation
> * Group and aggregate with low group-key cardinality: Let’s assume cardinality is 1
and we want to do a simple aggregation like a sum, it is obviously much more efficient (and
easy to code) to just do the pre-aggregation (sum) in the udf, and send a single record to
a reducer.
> * Wordcount: “SELECT word, sum(occurence) FROM word-occurences”. In this case occurence
is a column that has constant value 1. Very simple to aggragete in-memory.
> * Machine Learning: Accuracy computation: “SELECT correctly-classified, sum(correctly-classified)
GROUP BY correctly-classified“. E.g. if we trained a model (e.g. a numeric weight vector
for logistic regression) the accuracy is defined as  #correct-classified/#total. I do this
in one of my jobs and have currently to emit N records with constant value true or false to
> ### Possible solutions
> 1. Give close() the option to write output in all tuple-at-a-time contracts (hadoop-way).
> 2. Give UDF knowledge about whether this is the last element (hasNext). Almost similar
to 1.
> 3. Add iterator-option to tuple-at-a-time contracts (map, cross, match). The contracts
can be configured to pass an iterator over all records that are pipelined/streamed to this
udf. E.g. via CrossContract.builder(MyCross.class).asIterator(true)....build(). I assume that
this is easy to implement because the code that calls the udf probably looks like “while
(it.hasNext){ udf(it.next) }. Not sure if this is true. The user would then implement a separate
stub, e.g. MatchIteratorStub instead of MatchStub.
> 4. Keep it as it is (Combiner is the only way to pre-aggregate)
> ### Discussion
> * The current way, to use a Combiner, is very explicit and gives the system more knowledge
about what happens. In an ideal world, the optimizer chooses how to do the pre-aggragation,
and we would just define the aggregation function in the combiner. Currently however, we have
to hardcode the serialization code that forwards everything to the Combiner and the system
would have to understand and modify the udf to get rid of the serialization and to do a direct
> * Solutions 1-3 do more or less the same. We can write our own pre-aggregation/combiner
code. If cardinality is 1, this is just a counter, if cardinality is medium, we can use a
HashTable. After the udf processed all records, it can send a single or multiple pre-aggregated
records to the Reducer. This is less explicit, but more powerfull and enables high efficiency.
The system however still knows that we do a grouping, because we still have to use Reduce.
> * To make it easy for Hadoop users to switch over, solution 1 or 2 would be fine. Solution
3 is basically the same, but looks a bit different.
> Looking forward to hearing your opinion. I hope I didn't miss anything and the feature
is already existing;)
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/65
> Created by: [andrehacker|https://github.com/andrehacker]
> Labels: 
> Created at: Wed Aug 21 16:50:05 CEST 2013
> State: open

This message was sent by Atlassian JIRA

View raw message