beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [beam] 01/02: [BEAM-8577] Initialize FileSystems during Coder deserialization in Reshuffle reduce phase.
Date Mon, 06 Jan 2020 18:37:04 GMT
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2dc923b0aeb627d0db5d7b2a41f56865659fd629
Author: David Moravek <david.moravek@firma.seznam.cz>
AuthorDate: Thu Nov 7 15:39:14 2019 +0100

    [BEAM-8577] Initialize FileSystems during Coder deserialization in Reshuffle reduce phase.
---
 .../translation/types/CoderTypeSerializer.java     | 19 ++++++++++
 .../translation/types/CoderTypeSerializer.java     | 19 ++++++++++
 .../flink/FlinkBatchTransformTranslators.java      | 18 +++++++---
 .../functions/FlinkIdentityFunction.java           | 42 ++++++++++++++++++++++
 .../translation/types/CoderTypeInformation.java    | 26 ++++++++++++--
 5 files changed, 118 insertions(+), 6 deletions(-)

diff --git a/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
b/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index e29f97e..807faf5 100644
--- a/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/1.7/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.types;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -40,9 +42,26 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T>
{
 
   private final Coder<T> coder;
 
+  /**
+   * {@link SerializablePipelineOptions} deserialization will cause {@link
+   * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
+   * org.apache.beam.sdk.transforms.Reshuffle} translation.
+   */
+  @SuppressWarnings("unused")
+  @Nullable
+  private final SerializablePipelineOptions pipelineOptions;
+
   public CoderTypeSerializer(Coder<T> coder) {
     Preconditions.checkNotNull(coder);
     this.coder = coder;
+    this.pipelineOptions = null;
+  }
+
+  public CoderTypeSerializer(
+      Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) {
+    Preconditions.checkNotNull(coder);
+    this.coder = coder;
+    this.pipelineOptions = pipelineOptions;
   }
 
   @Override
diff --git a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 2ff1cda..276e49c 100644
--- a/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.flink.translation.types;
 
 import java.io.EOFException;
 import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -41,9 +43,26 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T>
{
 
   private final Coder<T> coder;
 
+  /**
+   * {@link SerializablePipelineOptions} deserialization will cause {@link
+   * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
+   * org.apache.beam.sdk.transforms.Reshuffle} translation.
+   */
+  @SuppressWarnings("unused")
+  @Nullable
+  private final SerializablePipelineOptions pipelineOptions;
+
   public CoderTypeSerializer(Coder<T> coder) {
     Preconditions.checkNotNull(coder);
     this.coder = coder;
+    this.pipelineOptions = null;
+  }
+
+  public CoderTypeSerializer(
+      Coder<T> coder, @Nullable SerializablePipelineOptions pipelineOptions) {
+    Preconditions.checkNotNull(coder);
+    this.coder = coder;
+    this.pipelineOptions = pipelineOptions;
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 229eca5..27c9fba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkIdentityFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
@@ -84,6 +85,7 @@ import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.SingleInputUdfOperator;
 
@@ -306,11 +308,19 @@ class FlinkBatchTransformTranslators {
     @Override
     public void translateNode(
         Reshuffle<K, InputT> transform, FlinkBatchTranslationContext context) {
-
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
-
-      context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
+      @SuppressWarnings("unchecked")
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType
=
+          ((CoderTypeInformation) inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      context.setOutputDataSet(context.getOutput(transform), retypedDataSet.rebalance());
     }
   }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java
new file mode 100644
index 0000000..be3db7c
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkIdentityFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * A map function that outputs the input element without any change.
+ *
+ * @param <T> Input element type.
+ */
+public class FlinkIdentityFunction<T> implements MapFunction<T, T> {
+
+  private static FlinkIdentityFunction<?> INSTANCE = new FlinkIdentityFunction<>();
+
+  @SuppressWarnings("unchecked")
+  public static <T> FlinkIdentityFunction<T> of() {
+    return (FlinkIdentityFunction) INSTANCE;
+  }
+
+  private FlinkIdentityFunction() {}
+
+  @Override
+  public T map(T value) {
+    return value;
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index c03bef9..5e76923 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -19,7 +19,10 @@ package org.apache.beam.runners.flink.translation.types;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -33,10 +36,18 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T>
{
 
   private final Coder<T> coder;
+  @Nullable private final SerializablePipelineOptions pipelineOptions;
 
   public CoderTypeInformation(Coder<T> coder) {
     checkNotNull(coder);
     this.coder = coder;
+    this.pipelineOptions = null;
+  }
+
+  private CoderTypeInformation(Coder<T> coder, PipelineOptions pipelineOptions) {
+    checkNotNull(coder);
+    this.coder = coder;
+    this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
   }
 
   public Coder<T> getCoder() {
@@ -70,9 +81,8 @@ public class CoderTypeInformation<T> extends TypeInformation<T>
implements Atomi
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-    return new CoderTypeSerializer<>(coder);
+    return new CoderTypeSerializer<>(coder, pipelineOptions);
   }
 
   @Override
@@ -80,6 +90,18 @@ public class CoderTypeInformation<T> extends TypeInformation<T>
implements Atomi
     return 2;
   }
 
+  /**
+   * Creates a new {@link CoderTypeInformation} with {@link PipelineOptions}, that can be
used for
+   * {@link org.apache.beam.sdk.io.FileSystems} registration.
+   *
+   * @see <a href="https://issues.apache.org/jira/browse/BEAM-8577">Jira issue.</a>
+   * @param pipelineOptions Options of current pipeline.
+   * @return New type information.
+   */
+  public CoderTypeInformation<T> withPipelineOptions(PipelineOptions pipelineOptions)
{
+    return new CoderTypeInformation<>(getCoder(), pipelineOptions);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {


Mime
View raw message