beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2165] Update Flink to support serializing/deserializing custom user types configured via Jackson modules
Date Thu, 04 May 2017 14:29:36 GMT
Repository: beam
Updated Branches:
  refs/heads/master 749b33f0b -> 3c5891b31


[BEAM-2165] Update Flink to support serializing/deserializing custom user types configured
via Jackson modules


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f53e5d43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f53e5d43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f53e5d43

Branch: refs/heads/master
Commit: f53e5d43d58c79ab9f3d04e112e6f05ad9dfe42f
Parents: 749b33f
Author: Luke Cwik <lcwik@google.com>
Authored: Wed May 3 17:12:20 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu May 4 07:28:43 2017 -0700

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  5 ++
 .../utils/SerializedPipelineOptions.java        | 16 +++-
 .../beam/runners/flink/PipelineOptionsTest.java | 87 ++++++++++++++++++++
 3 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index eb2b005..4122454 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -257,6 +257,11 @@
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 2256bb1..f717fd7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation.utils;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -27,6 +28,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
@@ -42,7 +44,7 @@ public class SerializedPipelineOptions implements Serializable {
     checkNotNull(options, "PipelineOptions must not be null.");
 
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      new ObjectMapper().writeValue(baos, options);
+      createMapper().writeValue(baos, options);
       this.serializedOptions = baos.toByteArray();
     } catch (Exception e) {
       throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
@@ -53,7 +55,7 @@ public class SerializedPipelineOptions implements Serializable {
   public PipelineOptions getPipelineOptions() {
     if (pipelineOptions == null) {
       try {
-        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+        pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
 
         IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
         FileSystems.setDefaultConfigInWorkers(pipelineOptions);
@@ -64,4 +66,14 @@ public class SerializedPipelineOptions implements Serializable {
 
     return pipelineOptions;
   }
+
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+   * for user specified configuration injection into the ObjectMapper. This supports user
custom
+   * types on {@link PipelineOptions}.
+   */
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f53e5d43/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 23740a1..7519dbf 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -23,6 +23,23 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -200,4 +217,74 @@ public class PipelineOptionsTest {
           c.getPipelineOptions().as(MyOptions.class).getTestOption());
     }
   }
+
+  /** PipelineOptions used to test auto registration of Jackson modules. */
+  public interface JacksonIncompatibleOptions extends PipelineOptions {
+    JacksonIncompatible getJacksonIncompatible();
+    void setJacksonIncompatible(JacksonIncompatible value);
+  }
+
+  /** A Jackson {@link Module} to test auto-registration of modules. */
+  @AutoService(Module.class)
+  public static class RegisteredTestModule extends SimpleModule {
+    public RegisteredTestModule() {
+      super("RegisteredTestModule");
+      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+    }
+  }
+
+  /** A class which Jackson does not know how to serialize/deserialize. */
+  public static class JacksonIncompatible {
+    private final String value;
+    public JacksonIncompatible(String value) {
+      this.value = value;
+    }
+  }
+
+  /** A Jackson mixin used to add annotations to other classes. */
+  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+  public static final class JacksonIncompatibleMixin {}
+
+  /** A Jackson deserializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleDeserializer extends
+      JsonDeserializer<JacksonIncompatible> {
+
+    @Override
+    public JacksonIncompatible deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException
{
+      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+    }
+  }
+
+  /** A Jackson serializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible>
{
+
+    @Override
+    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException
{
+      jsonGenerator.writeString(jacksonIncompatible.value);
+    }
+  }
+
+  @Test
+  public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
+    String expectedValue = "testValue";
+    PipelineOptions options = PipelineOptionsFactory
+        .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"")
+        .as(JacksonIncompatibleOptions.class);
+    SerializedPipelineOptions context = new SerializedPipelineOptions(options);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
+      outputStream.writeObject(context);
+    }
+    try (ObjectInputStream inputStream =
+        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+      SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject();
+      assertEquals(expectedValue,
+          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
+              .getJacksonIncompatible().value);
+    }
+  }
 }


Mime
View raw message