flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: PartitionByHash and usage of KeySelector
Date Mon, 10 Nov 2014 09:50:28 GMT
Hi Stefano,

I'm not sure if we use the same terminology here. What you call
partitioning might be called grouping in Flinks API / documentation.

Grouping builds groups of element that share the same key. This is a
deterministic operation.
Partitioning distributes elements over a set of machines / parallel
workers. If this is done using hash partitioning, Flink determines the
parallel worker for an element by hashing the element's partition key (
mod(hash(key), #workers) ). Consequently, all elements with the same
partition key will be shipped to the same worker, BUT also all other
elements for which mod(hash(key), #workers) is the same will be shipped to
the same worker. If you partition map over these partitions all of these
elements will be mixed. If the number of workers (or the hash function)
changes, partitions will look different. When grouping all elements of the
group will have the same key (and all elements with that key will be in the

Flink's cross operator builds a dataset wide cross product. It does not
respect groups (or partitions). If you want to build a cross product within
a group, you can do that with a groupReduce which requires to hold all
elements of the group in memory or manually spill them to disk in your UDF.
Alternatively, you can use a self join (join a data set with itself) which
will give you all pairs of the CP in individual function calls. However,
Flink is currently not treating self joins special, such that the
performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
a-a, b-b, for two element a, b with the same join key).

If it is possible to combine the marco-parameter keys and the
minor-blocking keys into a single key, you could specify a key-selector
function x() and either do
- dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
expensive function to each pair of elements* ); or
- dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric pair
and apply expensive compare function* ).

BTW. there was a similar use case a few days back on the mailing list.
Might be worth reading that thread [1].
Since there this is the second time that this issue came up, we might
consider to add better support for group-wise cross operations.

Cheers, Fabian


View raw message