flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: PartitionByHash and usage of KeySelector
Date Mon, 10 Nov 2014 15:24:17 GMT
Hi Stefano,

right now, there is no such thing as a RichKeyExtractor.

However, KeySelector functions are serialized and passed to the execution
engine. That means, you can configure your KeySelector via the constructor
at program construction time and the "same" object is passed to the engine
at runtime.
The Configuration object is kind of a legacy feature from the time when
user functions were not serializable but new objects were created and

Another alternative is to use a RichMapFunction instead of a KeySelector
and convert a Type A into a Tuple2<Key, A>. In fact this is what happens
internally when using key selector function.

Best, Fabian

2014-11-10 14:36 GMT+01:00 Stefano Bortoli <s.bortoli@gmail.com>:

> Hi Fabian,
> is it possible to create a RichKeySelector? I would need to read some
> configuration files to process the record and build the 'key' using a
> custom function. There is no interface/abstract class to implement/extend
> and I wonder whether this is the right way to do it. Meaning, maybe there
> is I reason I don't get to not have a rich key selection. I thank you a lot
> in advance for you time!
> saluti,
> Stefano
> 2014-11-10 12:05 GMT+01:00 Fabian Hueske <fhueske@apache.org>:
>> Yes, if you'd split the data set manually (maybe using filter) into
>> multiple data sets, you could use Cross.
>> However, Cross is a binary operation, such that you'd need to use it as a
>> self-cross which would result in symmetric pairs as the join.
>> I'm not sure if I would do this in a single job, i.e., run all cross
>> operations concurrently.
>> It might be better to partition the data up-front and run multiple jobs
>> for each group.
>> Best, Fabian
>> 2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>> Thanks a lot Fabian. You clarified many points. Currently I am try to
>>> run the job relying on a global index built with SOLR. It worked on a
>>> dataset of about 1M record, but it failed with obscure exception on the one
>>> of 9.2M. If I cannot make it work, I will go back to the grouping approach.
>>> Just a question. If I create a dataset for each group of a dataset, then
>>> I could use the cross on each of the group. Right? However, I guess it
>>> would be smarter to have a reduceGroup capable of generating just the pairs
>>> that would need to be compared.
>>> thanks a lot again. keep on the great work! :-)
>>> saluti,
>>> Stefano
>>> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fhueske@apache.org>:
>>>> Hi Stefano,
>>>> I'm not sure if we use the same terminology here. What you call
>>>> partitioning might be called grouping in Flinks API / documentation.
>>>> Grouping builds groups of element that share the same key. This is a
>>>> deterministic operation.
>>>> Partitioning distributes elements over a set of machines / parallel
>>>> workers. If this is done using hash partitioning, Flink determines the
>>>> parallel worker for an element by hashing the element's partition key (
>>>> mod(hash(key), #workers) ). Consequently, all elements with the same
>>>> partition key will be shipped to the same worker, BUT also all other
>>>> elements for which mod(hash(key), #workers) is the same will be shipped to
>>>> the same worker. If you partition map over these partitions all of these
>>>> elements will be mixed. If the number of workers (or the hash function)
>>>> changes, partitions will look different. When grouping all elements of the
>>>> group will have the same key (and all elements with that key will be in the
>>>> group).
>>>> Flink's cross operator builds a dataset wide cross product. It does not
>>>> respect groups (or partitions). If you want to build a cross product within
>>>> a group, you can do that with a groupReduce which requires to hold all
>>>> elements of the group in memory or manually spill them to disk in your UDF.
>>>> Alternatively, you can use a self join (join a data set with itself) which
>>>> will give you all pairs of the CP in individual function calls. However,
>>>> Flink is currently not treating self joins special, such that the
>>>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>>>> a-a, b-b, for two element a, b with the same join key).
>>>> If it is possible to combine the marco-parameter keys and the
>>>> minor-blocking keys into a single key, you could specify a key-selector
>>>> function x() and either do
>>>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and
>>>> apply expensive function to each pair of elements* ); or
>>>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>>>> pair and apply expensive compare function* ).
>>>> BTW. there was a similar use case a few days back on the mailing list.
>>>> Might be worth reading that thread [1].
>>>> Since there this is the second time that this issue came up, we might
>>>> consider to add better support for group-wise cross operations.
>>>> Cheers, Fabian
>>>> [1]
>>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html

View raw message