flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2997) Support range partition with user customized data distribution.
Date Mon, 21 Mar 2016 23:44:25 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205375#comment-15205375
] 

ASF GitHub Bot commented on FLINK-2997:
---------------------------------------

Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56916757
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
---
    @@ -45,35 +46,48 @@
     	private final PartitionMethod pMethod;
     	private final String partitionLocationName;
     	private final Partitioner<?> customPartitioner;
    -	
    -	
    +	private final DataDistribution distribution;
    +
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T>
pKeys, String partitionLocationName) {
    -		this(input, pMethod, pKeys, null, null, partitionLocationName);
    +		this(input, pMethod, pKeys, null, null, null, partitionLocationName);
     	}
    -	
    +
    +	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T>
pKeys, DataDistribution distribution, String partitionLocationName) {
    +		this(input, pMethod, pKeys, null, null, distribution, partitionLocationName);
    +	}
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName)
{
    -		this(input, pMethod, null, null, null, partitionLocationName);
    +		this(input, pMethod, null, null, null, null, partitionLocationName);
     	}
     	
     	public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?>
customPartitioner, String partitionLocationName) {
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName);
     	}
     	
    -	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P>
customPartitioner, 
    +	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P>
customPartitioner,
     			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
     	{
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo,
partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo,
null, partitionLocationName);
     	}
     	
    -	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod,
Keys<T> pKeys, Partitioner<P> customPartitioner, 
    -			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
    +	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod,
Keys<T> pKeys, Partitioner<P> customPartitioner,
    +			TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String
partitionLocationName)
     	{
     		super(input, input.getType());
     		
     		Preconditions.checkNotNull(pMethod);
     		Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE,
"Partitioning requires keys");
     		Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner
!= null, "Custom partioning requires a partitioner.");
    -
    +		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE,
"Customized data distribution is only neccessary for range partition.");
    +		
    +		if (distribution != null) {
    +			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(),
"The number of key fields in the distribution and range partitioner should be the same.");
    +			for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
    +				Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
"The types of key from the distribution and range partitioner are not equal.");
    --- End diff --
    
    code modified and rebase the new commit with previous one.( you must have stayed up late
last night :) )


> Support range partition with user customized data distribution.
> ---------------------------------------------------------------
>
>                 Key: FLINK-2997
>                 URL: https://issues.apache.org/jira/browse/FLINK-2997
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of the source
data, and they can build customized data distribution to do range partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message