flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: PartitionByHash and usage of KeySelector
Date Mon, 10 Nov 2014 11:05:47 GMT
Yes, if you'd split the data set manually (maybe using filter) into
multiple data sets, you could use Cross.
However, Cross is a binary operation, such that you'd need to use it as a
self-cross which would result in symmetric pairs as the join.

I'm not sure if I would do this in a single job, i.e., run all cross
operations concurrently.
It might be better to partition the data up-front and run multiple jobs for
each group.

Best, Fabian

2014-11-10 11:08 GMT+01:00 Stefano Bortoli <s.bortoli@gmail.com>:

> Thanks a lot Fabian. You clarified many points. Currently I am try to run
> the job relying on a global index built with SOLR. It worked on a dataset
> of about 1M record, but it failed with obscure exception on the one of
> 9.2M. If I cannot make it work, I will go back to the grouping approach.
>
> Just a question. If I create a dataset for each group of a dataset, then I
> could use the cross on each of the group. Right? However, I guess it would
> be smarter to have a reduceGroup capable of generating just the pairs that
> would need to be compared.
>
> thanks a lot again. keep on the great work! :-)
>
> saluti,
> Stefano
>
>
> 2014-11-10 10:50 GMT+01:00 Fabian Hueske <fhueske@apache.org>:
>
>> Hi Stefano,
>>
>> I'm not sure if we use the same terminology here. What you call
>> partitioning might be called grouping in Flinks API / documentation.
>>
>> Grouping builds groups of element that share the same key. This is a
>> deterministic operation.
>> Partitioning distributes elements over a set of machines / parallel
>> workers. If this is done using hash partitioning, Flink determines the
>> parallel worker for an element by hashing the element's partition key (
>> mod(hash(key), #workers) ). Consequently, all elements with the same
>> partition key will be shipped to the same worker, BUT also all other
>> elements for which mod(hash(key), #workers) is the same will be shipped to
>> the same worker. If you partition map over these partitions all of these
>> elements will be mixed. If the number of workers (or the hash function)
>> changes, partitions will look different. When grouping all elements of the
>> group will have the same key (and all elements with that key will be in the
>> group).
>>
>> Flink's cross operator builds a dataset wide cross product. It does not
>> respect groups (or partitions). If you want to build a cross product within
>> a group, you can do that with a groupReduce which requires to hold all
>> elements of the group in memory or manually spill them to disk in your UDF.
>> Alternatively, you can use a self join (join a data set with itself) which
>> will give you all pairs of the CP in individual function calls. However,
>> Flink is currently not treating self joins special, such that the
>> performance could be optimized. You'll also get symmetric pairs (a-b, b-a,
>> a-a, b-b, for two element a, b with the same join key).
>>
>> If it is possible to combine the marco-parameter keys and the
>> minor-blocking keys into a single key, you could specify a key-selector
>> function x() and either do
>> - dataSet.groupBy(x).reduceGroup( *read full group into memory, and apply
>> expensive function to each pair of elements* ); or
>> - dataSet.join(dataSet).where(x).equalTo(x).join( *check of symmetric
>> pair and apply expensive compare function* ).
>>
>> BTW. there was a similar use case a few days back on the mailing list.
>> Might be worth reading that thread [1].
>> Since there this is the second time that this issue came up, we might
>> consider to add better support for group-wise cross operations.
>>
>> Cheers, Fabian
>>
>> [1]
>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/load-balancing-groups-td2287.html
>>
>>
>>
>

Mime
View raw message