flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-758) Add count method to DataSet and implement CountOperator
Date Sun, 07 Sep 2014 13:52:29 GMT

    [ https://issues.apache.org/jira/browse/FLINK-758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14124904#comment-14124904
] 

ASF GitHub Bot commented on FLINK-758:
--------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/63#discussion_r17214869
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CountOperator.java
---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.commons.lang3.Validate;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.functions.ReduceFunction;
    +import org.apache.flink.api.java.typeutils.BasicTypeInfo;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Iterator;
    +
    +/**
    + * A {@link DataSet} that is the result of a count transformation.
    + * <p/>
    + * The count will be executed as a map-reduce. The map operator maps every element of
the input to a 1 and the all
    + * reduce sums the ones up to the total count.
    + *
    + * @param <IN> The type of the data set aggregated by the operator.
    + */
    +public class CountOperator<IN> extends SingleInputUdfOperator<IN, Long, CountOperator<IN>>
{
    +
    +	private final Grouping<IN> grouping;
    +
    +	public CountOperator(DataSet<IN> input) {
    +		super(input, BasicTypeInfo.LONG_TYPE_INFO);
    +		grouping = null;
    +	}
    +
    +	public CountOperator(Grouping<IN> input) {
    +		super(Validate.notNull(input).getDataSet(), BasicTypeInfo.LONG_TYPE_INFO);
    +		this.grouping = input;
    +	}
    +
    +	@Override
    +	protected org.apache.flink.api.common.operators.SingleInputOperator<?, Long, ?>
translateToDataFlow(
    +			org.apache.flink.api.common.operators.Operator<IN> input) {
    +		if (grouping == null) {
    +			// map to ones
    +			UnaryOperatorInformation<IN, Long> countMapOpInfo =
    +					new UnaryOperatorInformation<IN, Long>(getInputType(), BasicTypeInfo.LONG_TYPE_INFO);
    +			MapOperatorBase<IN, Long, MapFunction<IN, Long>> countMapOp =
    +					new MapOperatorBase<IN, Long, MapFunction<IN, Long>>(
    +							new CountingMapUdf(), countMapOpInfo, "Count: map to ones");
    +
    +			countMapOp.setInput(input);
    +			countMapOp.setDegreeOfParallelism(input.getDegreeOfParallelism());
    +
    +			// sum ones
    +			UnaryOperatorInformation<Long, Long> countReduceOpInfo =
    +					new UnaryOperatorInformation<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    +			ReduceOperatorBase<Long, ReduceFunction<Long>> countReduceOp =
    +					new ReduceOperatorBase<Long, ReduceFunction<Long>>(
    +							new CountingReduceUdf(), countReduceOpInfo, "Count: sum ones");
    +
    +			countReduceOp.setInput(countMapOp);
    +			countReduceOp.setDegreeOfParallelism(1);
    +			countReduceOp.setInitialValue(countReduceOpInfo.getInputType().createSerializer(),
0L);
    +
    +			return countReduceOp;
    +		}
    +		else {
    +			return new ReduceGroupOperator<IN, Long>(grouping, new CountingGroupReduceUdf<IN>())
    --- End diff --
    
    Using a non-combinable GroupReduceFunction for counting is unnecessarily inefficient.
    We could extract the key fields using a Mapper and add a count-1 and use a ReduceFunction
as well.
    This requires a few cases due to different key types but should be the way to go.


> Add count method to DataSet and implement CountOperator
> -------------------------------------------------------
>
>                 Key: FLINK-758
>                 URL: https://issues.apache.org/jira/browse/FLINK-758
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>              Labels: github-import
>             Fix For: pre-apache
>
>         Attachments: pull-request-758-7518001488867571817.patch
>
>
> At the request of @twalthr. This is the count operator I've implemented some time ago
to get the to know the new Java API. It introduces `DataSet.count()`, which is executed as
a map (to ones) and reduce (sum up the ones). I initially didn't do the PR, because of the
following problem: empty DataSets don't work as the first map won't have any input to operate
on.
> If more people think that we should include this operator we can think about a possible
solution to the problem.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/pull/758
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api, 
> Milestone: Release 0.6 (unplanned)
> Created at: Tue May 06 10:42:33 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message