beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 30/37: Create a Tuple2Coder to encode scala tuple2
Date Thu, 24 Oct 2019 10:15:39 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 21accab89a4333b32003121269ab31b436e0dd2c
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Mon Sep 30 11:25:04 2019 +0200

    Create a Tuple2Coder to encode scala tuple2
---
 .../translation/helpers/Tuple2Coder.java           | 62 ++++++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
new file mode 100644
index 0000000..1743a01
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java
@@ -0,0 +1,62 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import scala.Tuple2;
+
+/**
+ * Beam coder to encode/decode Tuple2 scala types.
+ * @param <T1> first field type parameter
+ * @param <T2> second field type parameter
+ */
+public class Tuple2Coder<T1, T2> extends StructuredCoder<Tuple2<T1, T2>>
{
+  private final Coder<T1> firstFieldCoder;
+  private final Coder<T2> secondFieldCoder;
+
+  public static <K, V> Tuple2Coder<K, V> of(Coder<K> firstFieldCoder, Coder<V>
secondFieldCoder) {
+    return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder);
+  }
+
+  private Tuple2Coder(Coder<T1> firstFieldCoder, Coder<T2> secondFieldCoder)
{
+    this.firstFieldCoder = firstFieldCoder;
+    this.secondFieldCoder = secondFieldCoder;
+  }
+
+
+  @Override public void encode(Tuple2<T1, T2> value, OutputStream outStream)
+      throws IOException {
+    firstFieldCoder.encode(value._1(), outStream);
+    secondFieldCoder.encode(value._2(), outStream);
+  }
+
+  @Override public Tuple2<T1, T2> decode(InputStream inStream) throws IOException {
+    T1 firstField = firstFieldCoder.decode(inStream);
+    T2 secondField = secondFieldCoder.decode(inStream);
+    return Tuple2.apply(firstField, secondField);
+  }
+
+  @Override public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(firstFieldCoder, secondFieldCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder);
+    verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder);
+  }
+
+  /** Returns the coder for first field. */
+  public Coder<T1> getFirstFieldCoder() {
+    return firstFieldCoder;
+  }
+
+  /** Returns the coder for second field. */
+  public Coder<T2> getSecondFieldCoder() {
+    return secondFieldCoder;
+  }
+}


Mime
View raw message