flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3665) Range partitioning lacks support to define sort orders
Date Wed, 20 Apr 2016 13:39:25 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249899#comment-15249899
] 

ASF GitHub Bot commented on FLINK-3665:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60407974
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new
LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>
collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected)
{
    --- End diff --
    
    Yes, looks correct (except for the `<=` instead of `<`). Thanks for clarifying!


> Range partitioning lacks support to define sort orders
> ------------------------------------------------------
>
>                 Key: FLINK-3665
>                 URL: https://issues.apache.org/jira/browse/FLINK-3665
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Fabian Hueske
>             Fix For: 1.1.0
>
>
> {{DataSet.partitionByRange()}} does not allow to specify the sort order of fields. This
is fine if range partitioning is used to reduce skewed partitioning. 
> However, it is not sufficient if range partitioning is used to sort a data set in parallel.

> Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily changed,
I propose to add a method {{withOrders(Order... orders)}} to {{PartitionOperator}}. The method
should throw an exception if the partitioning method of {{PartitionOperator}} is not range
partitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message