flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] incubator-flink pull request: [FLINK-1060] Added methods to DataSe...
Date Mon, 01 Sep 2014 18:14:29 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/108#discussion_r16962375
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
---
    @@ -0,0 +1,249 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.common.functions.util.FunctionUtils;
    +import org.apache.flink.api.common.operators.Operator;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
    +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
    +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
    +import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.types.TypeInformation;
    +
    +public class PartitionedDataSet<IN> {
    +	
    +	private final DataSet<IN> dataSet;
    +	
    +	private final Keys<IN> pKeys;
    +	private final PartitionMethod pMethod;
    +	
    +	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod, Keys<IN>
pKeys) {
    +		this.dataSet = input;
    +		
    +		if(pMethod == PartitionMethod.HASH && pKeys == null) {
    +			throw new IllegalArgumentException("Hash Partitioning requires keys");
    +		} else if(pMethod == PartitionMethod.RANGE) {
    +			throw new UnsupportedOperationException("Range Partitioning not yet supported");
    +		}
    +		
    +		if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType())
{
    +			throw new IllegalArgumentException("Hash Partitioning with key fields only possible
on Tuple DataSets");
    +		}
    +		
    +		this.pMethod = pMethod;
    +		this.pKeys = pKeys;
    +	}
    +	
    +	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
    +		this(input, pMethod, null);
    +	}
    +	
    +	public DataSet<IN> getDataSet() {
    +		return this.dataSet;
    +	}
    +	
    +	
    +	/**
    +	 * Applies a Map transformation on a {@link DataSet}.<br/>
    +	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction}
for each element of the DataSet.
    +	 * Each MapFunction call returns exactly one element.
    +	 * 
    +	 * @param mapper The MapFunction that is called for each element of the DataSet.
    +	 * @return A MapOperator that represents the transformed DataSet.
    +	 * 
    +	 * @see org.apache.flink.api.java.functions.RichMapFunction
    +	 * @see MapOperator
    +	 * @see DataSet
    +	 */
    +	public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
    +		if (mapper == null) {
    +			throw new NullPointerException("Map function must not be null.");
    +		}
    +		if (FunctionUtils.isLambdaFunction(mapper)) {
    +			throw new UnsupportedLambdaExpressionException();
    +		}
    +		return new MapOperator<IN, R>(this, mapper);
    +	}
    +
    +    /**
    +     * Applies a Map-style operation to the entire partition of the data.
    +	 * The function is called once per parallel partition of the data,
    +	 * and the entire partition is available through the given Iterator.
    +	 * The number of elements that each instance of the MapPartition function
    +	 * sees is non deterministic and depends on the degree of parallelism of the operation.
    +	 *
    +	 * This function is intended for operations that cannot transform individual elements,
    +	 * requires no grouping of elements. To transform individual elements,
    +	 * the use of {@code map()} and {@code flatMap()} is preferable.
    +	 *
    +	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
    +     * @return A MapPartitionOperator that represents the transformed DataSet.
    --- End diff --
    
    If you're talking binary, I can fix the violations.
    Otherwise, I guess this is out of scope of this PR ;-)
    
    Will fix the spaces in my files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message