flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2576) Add outer joins to API and Optimizer
Date Fri, 02 Oct 2015 12:05:26 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941060#comment-14941060
] 

ASF GitHub Bot commented on FLINK-2576:
---------------------------------------

Github user jkovacs commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1138#discussion_r41014523
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
---
    @@ -0,0 +1,314 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators.base;
    +
    +import org.apache.commons.collections.ResettableIterator;
    +import org.apache.commons.collections.iterators.ListIteratorWrapper;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.functions.util.CopyingListCollector;
    +import org.apache.flink.api.common.functions.util.FunctionUtils;
    +import org.apache.flink.api.common.operators.BinaryOperatorInformation;
    +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
    +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
    +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
    +import org.apache.flink.api.common.operators.util.UserCodeWrapper;
    +import org.apache.flink.api.common.typeinfo.AtomicType;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.CompositeType;
    +import org.apache.flink.api.common.typeutils.GenericPairComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1,
IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> {
    +
    +	public static enum OuterJoinType {LEFT, RIGHT, FULL}
    +
    +	private OuterJoinType outerJoinType;
    +
    +	public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1,
IN2, OUT> operatorInfo,
    +			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType)
{
    +		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
    +		this.outerJoinType = outerJoinType;
    +	}
    +
    +	public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT>
operatorInfo,
    +			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType)
{
    +		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2,
name);
    +		this.outerJoinType = outerJoinType;
    +	}
    +
    +	public OuterJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1,
IN2, OUT> operatorInfo,
    +			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType)
{
    +		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2,
name);
    +		this.outerJoinType = outerJoinType;
    +	}
    +
    +	public void setOuterJoinType(OuterJoinType outerJoinType) {
    +		this.outerJoinType = outerJoinType;
    +	}
    +
    +	public OuterJoinType getOuterJoinType() {
    +		return outerJoinType;
    +	}
    +
    +	@Override
    +	protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2>
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception
{
    +		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
    +		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
    +		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
    +
    +		TypeComparator<IN1> leftComparator = buildComparatorFor(0, executionConfig, leftInformation);
    +		TypeComparator<IN2> rightComparator = buildComparatorFor(1, executionConfig,
rightInformation);
    +
    +		TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
    +		TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
    +
    +		OuterJoinListIterator<IN1, IN2> outerJoinIterator =
    +				new OuterJoinListIterator<>(leftInput, leftSerializer, leftComparator,
    +						rightInput, rightSerializer, rightComparator, outerJoinType);
    +
    +		// --------------------------------------------------------------------
    +		// Run UDF
    +		// --------------------------------------------------------------------
    +		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
    +
    +		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
    +		FunctionUtils.openFunction(function, this.parameters);
    +
    +
    +		List<OUT> result = new ArrayList<>();
    +		Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
    +
    +		while (outerJoinIterator.next()) {
    +			IN1 left = outerJoinIterator.getLeft();
    +			IN2 right = outerJoinIterator.getRight();
    +			function.join(left == null ? null : leftSerializer.copy(left), right == null ? null
: rightSerializer.copy(right), collector);
    +		}
    +
    +		return result;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T> TypeComparator<T> buildComparatorFor(int input, ExecutionConfig
executionConfig, TypeInformation<T> typeInformation) {
    +		TypeComparator<T> comparator;
    +		if (typeInformation instanceof AtomicType) {
    +			comparator = ((AtomicType<T>) typeInformation).createComparator(true, executionConfig);
    +		} else if (typeInformation instanceof CompositeType) {
    +			int[] keyPositions = getKeyColumns(input);
    +			boolean[] orders = new boolean[keyPositions.length];
    +			Arrays.fill(orders, true);
    +
    +			comparator = ((CompositeType<T>) typeInformation).createComparator(keyPositions,
orders, 0, executionConfig);
    +		} else {
    +			throw new RuntimeException("Type information for input of type " + typeInformation.getClass()
    +					.getCanonicalName() + " is not supported. Could not generate a comparator.");
    +		}
    +		return comparator;
    +	}
    +
    +	private static class OuterJoinListIterator<IN1, IN2> {
    +
    +
    +		private static enum MatchStatus {
    +			NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, FIRST_EMPTY, SECOND_EMPTY
    +		}
    +
    +		private OuterJoinType outerJoinType;
    +
    +		private ListKeyGroupedIterator<IN1> leftGroupedIterator;
    +		private ListKeyGroupedIterator<IN2> rightGroupedIterator;
    +		private Iterable<IN1> currLeftSubset;
    +		private ResettableIterator currLeftIterator;
    +		private Iterable<IN2> currRightSubset;
    +		private ResettableIterator currRightIterator;
    +
    +		private MatchStatus matchStatus;
    +		private GenericPairComparator<IN1, IN2> pairComparator;
    +
    +		private IN1 leftReturn;
    +		private IN2 rightReturn;
    +
    +		public OuterJoinListIterator(List<IN1> leftInput, TypeSerializer<IN1> leftSerializer,
final TypeComparator<IN1> leftComparator,
    +				List<IN2> rightInput, TypeSerializer<IN2> rightSerializer, final TypeComparator<IN2>
rightComparator,
    +				OuterJoinType outerJoinType) {
    +			this.outerJoinType = outerJoinType;
    +			pairComparator = new GenericPairComparator<>(leftComparator, rightComparator);
    +			leftGroupedIterator = new ListKeyGroupedIterator<>(leftInput, leftSerializer,
leftComparator);
    +			rightGroupedIterator = new ListKeyGroupedIterator<>(rightInput, rightSerializer,
rightComparator);
    +			// ----------------------------------------------------------------
    +			// Sort
    +			// ----------------------------------------------------------------
    +			Collections.sort(leftInput, new Comparator<IN1>() {
    +				@Override
    +				public int compare(IN1 o1, IN1 o2) {
    +					return leftComparator.compare(o1, o2);
    +				}
    +			});
    +
    +			Collections.sort(rightInput, new Comparator<IN2>() {
    +				@Override
    +				public int compare(IN2 o1, IN2 o2) {
    +					return rightComparator.compare(o1, o2);
    +				}
    +			});
    +
    +		}
    +
    +		@SuppressWarnings("unchecked")
    +		private boolean next() throws IOException {
    +			boolean hasMoreElements;
    +			if ((currLeftIterator == null || !currLeftIterator.hasNext()) && (currRightIterator
== null || !currRightIterator.hasNext())) {
    +				hasMoreElements = nextGroups(outerJoinType);
    +				if (hasMoreElements) {
    +					if (outerJoinType != OuterJoinType.LEFT) {
    +						currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator());
    +					}
    +					leftReturn = (IN1) currLeftIterator.next();
    +					if (outerJoinType != OuterJoinType.RIGHT) {
    +						currRightIterator = new ListIteratorWrapper(currRightSubset.iterator());
    +					}
    +					rightReturn = (IN2) currRightIterator.next();
    +					return true;
    +				} else {
    +					//no more elements
    +					return false;
    +				}
    +			} else if (currLeftIterator.hasNext() && !currRightIterator.hasNext()) {
    --- End diff --
    
    Technically true, but I believe from the control flow that scenario is impossible, since
either both iterators get reassigned something non-null at the same time, or both remain null
and the method returns `false` (no more elements). @r-pogalz can you confirm that or can we
make this more explicit?


> Add outer joins to API and Optimizer
> ------------------------------------
>
>                 Key: FLINK-2576
>                 URL: https://issues.apache.org/jira/browse/FLINK-2576
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Java API, Optimizer, Scala API
>            Reporter: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and to the optimizer
of Flink.
> Initially, the execution strategy should be a sort-merge outer join (FLINK-2105) but
can later be extended to hash joins for left/right outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message