flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1060) Support explicit shuffling of DataSets
Date Mon, 01 Sep 2014 18:15:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117611#comment-14117611
] 

ASF GitHub Bot commented on FLINK-1060:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/108#discussion_r16962375
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
---
    @@ -0,0 +1,249 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.common.functions.util.FunctionUtils;
    +import org.apache.flink.api.common.operators.Operator;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
    +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
    +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
    +import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.types.TypeInformation;
    +
    +public class PartitionedDataSet<IN> {
    +	
    +	private final DataSet<IN> dataSet;
    +	
    +	private final Keys<IN> pKeys;
    +	private final PartitionMethod pMethod;
    +	
    +	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod, Keys<IN>
pKeys) {
    +		this.dataSet = input;
    +		
    +		if(pMethod == PartitionMethod.HASH && pKeys == null) {
    +			throw new IllegalArgumentException("Hash Partitioning requires keys");
    +		} else if(pMethod == PartitionMethod.RANGE) {
    +			throw new UnsupportedOperationException("Range Partitioning not yet supported");
    +		}
    +		
    +		if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType())
{
    +			throw new IllegalArgumentException("Hash Partitioning with key fields only possible
on Tuple DataSets");
    +		}
    +		
    +		this.pMethod = pMethod;
    +		this.pKeys = pKeys;
    +	}
    +	
    +	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
    +		this(input, pMethod, null);
    +	}
    +	
    +	public DataSet<IN> getDataSet() {
    +		return this.dataSet;
    +	}
    +	
    +	
    +	/**
    +	 * Applies a Map transformation on a {@link DataSet}.<br/>
    +	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction}
for each element of the DataSet.
    +	 * Each MapFunction call returns exactly one element.
    +	 * 
    +	 * @param mapper The MapFunction that is called for each element of the DataSet.
    +	 * @return A MapOperator that represents the transformed DataSet.
    +	 * 
    +	 * @see org.apache.flink.api.java.functions.RichMapFunction
    +	 * @see MapOperator
    +	 * @see DataSet
    +	 */
    +	public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
    +		if (mapper == null) {
    +			throw new NullPointerException("Map function must not be null.");
    +		}
    +		if (FunctionUtils.isLambdaFunction(mapper)) {
    +			throw new UnsupportedLambdaExpressionException();
    +		}
    +		return new MapOperator<IN, R>(this, mapper);
    +	}
    +
    +    /**
    +     * Applies a Map-style operation to the entire partition of the data.
    +	 * The function is called once per parallel partition of the data,
    +	 * and the entire partition is available through the given Iterator.
    +	 * The number of elements that each instance of the MapPartition function
    +	 * sees is non deterministic and depends on the degree of parallelism of the operation.
    +	 *
    +	 * This function is intended for operations that cannot transform individual elements,
    +	 * requires no grouping of elements. To transform individual elements,
    +	 * the use of {@code map()} and {@code flatMap()} is preferable.
    +	 *
    +	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
    +     * @return A MapPartitionOperator that represents the transformed DataSet.
    --- End diff --
    
    If you're talking binary, I can fix the violations.
    Otherwise, I guess this is out of scope of this PR ;-)
    
    Will fix the spaces in my files.


> Support explicit shuffling of DataSets
> --------------------------------------
>
>                 Key: FLINK-1060
>                 URL: https://issues.apache.org/jira/browse/FLINK-1060
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API, Optimizer
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> Right now, Flink only shuffles data if it is required by some operation such as Reduce,
Join, or CoGroup. There is no way to explicitly shuffle a data set.
> However, in some situations explicit shuffling would be very helpful including:
> - rebalancing before compute-intensive Map operations
> - balancing, random or hash partitioning before PartitionMap operations (see FLINK-1053)
> - better integration of support for HadoopJobs (see FLINK-838)
> With this issue, I propose to add the following methods to {{DataSet}}
> - {{DataSet.partitionHashBy(int...)}} and {{DataSet.partitionHashBy(KeySelector)}} to
perform an explicit hash partitioning
> - {{DataSet.partitionRandomly()}} to shuffle data completely random
> - {{DataSet.partitionRoundRobin()}} to shuffle data in a round-robin fashion that generates
very even distribution with possible bias due to prior distributions
> The {{DataSet.partitionRoundRobin()}} might not be necessary if we think that random
shuffling balances good enough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message