beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 03/13: Remove CombineGlobally translation because it is less performant than the beam sdk one (key + combinePerKey.withHotkeyFanout)
Date Fri, 05 Jul 2019 13:53:13 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e80c908a1967e276e6f3cc1364b69cc1fa538034
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Jul 3 16:57:55 2019 +0200

    Remove CombineGlobally translation because it is less performant than the beam sdk one
(key + combinePerKey.withHotkeyFanout)
---
 .../batch/CombineGloballyTranslatorBatch.java      | 83 ----------------------
 .../translation/batch/PipelineTranslatorBatch.java |  1 -
 2 files changed, 84 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
deleted file mode 100644
index 30101dc..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.MapPartitionsFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.KeyValueGroupedDataset;
-import scala.Tuple2;
-
-/**
- * By default Combine.globally is translated as a composite transform that does a Pardo (to
key the
- * input PCollection with Void keys and then a Combine.PerKey transform. The problem is that
- * Combine.PerKey uses spark GroupByKey which is not very performant due to shuffle. So we
add a
- * special CombineGloballyTranslator that does not need GroupByKey
- */
-class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
-    implements TransformTranslator<PTransform<PCollection<InputT>, PCollection<OutputT>>>
{
-
-  @Override
-  public void translateTransform(
-      PTransform<PCollection<InputT>, PCollection<OutputT>> transform,
TranslationContext context) {
-
-    Combine.Globally combineTransform = (Combine.Globally) transform;
-    @SuppressWarnings("unchecked")
-    final PCollection<InputT> input = (PCollection<InputT>) context.getInput();
-    @SuppressWarnings("unchecked")
-    final PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
-    @SuppressWarnings("unchecked")
-    final Combine.CombineFn<InputT, AccumT, OutputT> combineFn =
-        (Combine.CombineFn<InputT, AccumT, OutputT>) combineTransform.getFn();
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-    Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
-
-    // applying a groupByKey avoids for some reason that the spark structured streaming fmwk
-    // casts data to Row which makes it impossible to deserialize without
-    // the coder shipped into the data. For performance reasons
-    // (avoid memory consumption and having to deserialize), we do not ship coder + data.
-
-    KeyValueGroupedDataset<Integer, WindowedValue<InputT>> groupedDataset = inputDataset
-        .groupByKey((MapFunction<WindowedValue<InputT>, Integer>) value ->
1,
-            EncoderHelpers.genericEncoder());
-
-    Dataset<Tuple2<Integer, Iterable<WindowedValue<OutputT>>>> combinedDataset
= groupedDataset
-        .agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
-
-    Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset = combinedDataset.map(
-        (MapFunction<Tuple2<Integer, Iterable<WindowedValue<OutputT>>>,
Iterable<WindowedValue<OutputT>>>) value -> value._2,
-        EncoderHelpers.genericEncoder());
-
-    Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
-        (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
-            windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());
-    context.putDataset(output, outputDataset);
-  }
-}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 7f7d962..83a9491 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -60,7 +60,6 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
 
   static {
     TRANSFORM_TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    TRANSFORM_TRANSLATORS.put(Combine.Globally.class, new CombineGloballyTranslatorBatch());
     TRANSFORM_TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
 
     // TODO: Do we need to have a dedicated translator for {@code Reshuffle} if it's deprecated?


Mime
View raw message