beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] beam git commit: [BEAM-2165] Update Spark to support serializing/deserializing custom user types configured via Jackson modules
Date Fri, 05 May 2017 21:28:30 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1ecb111d3 -> 319bb277f


[BEAM-2165] Update Spark to support serializing/deserializing custom user types configured
via Jackson modules


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae70e50c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae70e50c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae70e50c

Branch: refs/heads/master
Commit: ae70e50c7ed639e5607dbfc9e9534b9a5eff10bc
Parents: 1ecb111
Author: Luke Cwik <lcwik@google.com>
Authored: Wed May 3 17:15:52 2017 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Fri May 5 14:28:14 2017 -0700

----------------------------------------------------------------------
 .../spark/translation/SparkRuntimeContext.java  |  17 ++-
 .../translation/SparkRuntimeContextTest.java    | 122 +++++++++++++++++++
 2 files changed, 137 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ae70e50c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 3db1ab5..970a356 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.Serializable;
@@ -26,6 +27,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /**
  * The SparkRuntimeContext allows us to define useful features on the client side before
our
@@ -39,9 +42,19 @@ public class SparkRuntimeContext implements Serializable {
     this.serializedPipelineOptions = serializePipelineOptions(options);
   }
 
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
+   * for user specified configuration injection into the ObjectMapper. This supports user
custom
+   * types on {@link PipelineOptions}.
+   */
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  }
+
   private String serializePipelineOptions(PipelineOptions pipelineOptions) {
     try {
-      return new ObjectMapper().writeValueAsString(pipelineOptions);
+      return createMapper().writeValueAsString(pipelineOptions);
     } catch (JsonProcessingException e) {
       throw new IllegalStateException("Failed to serialize the pipeline options.", e);
     }
@@ -49,7 +62,7 @@ public class SparkRuntimeContext implements Serializable {
 
   private static PipelineOptions deserializePipelineOptions(String serializedPipelineOptions)
{
     try {
-      return new ObjectMapper().readValue(serializedPipelineOptions, PipelineOptions.class);
+      return createMapper().readValue(serializedPipelineOptions, PipelineOptions.class);
     } catch (IOException e) {
       throw new IllegalStateException("Failed to deserialize the pipeline options.", e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ae70e50c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
new file mode 100644
index 0000000..372d46f
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkRuntimeContextTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link SparkRuntimeContext}.
+ */
+@RunWith(JUnit4.class)
+public class SparkRuntimeContextTest {
+  /** PipelineOptions used to test auto registration of Jackson modules. */
+  public interface JacksonIncompatibleOptions extends PipelineOptions {
+    JacksonIncompatible getJacksonIncompatible();
+    void setJacksonIncompatible(JacksonIncompatible value);
+  }
+
+  /** A Jackson {@link Module} to test auto-registration of modules. */
+  @AutoService(Module.class)
+  public static class RegisteredTestModule extends SimpleModule {
+    public RegisteredTestModule() {
+      super("RegisteredTestModule");
+      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
+    }
+  }
+
+  /** A class which Jackson does not know how to serialize/deserialize. */
+  public static class JacksonIncompatible {
+    private final String value;
+    public JacksonIncompatible(String value) {
+      this.value = value;
+    }
+  }
+
+  /** A Jackson mixin used to add annotations to other classes. */
+  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
+  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
+  public static final class JacksonIncompatibleMixin {}
+
+  /** A Jackson deserializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleDeserializer extends
+      JsonDeserializer<JacksonIncompatible> {
+
+    @Override
+    public JacksonIncompatible deserialize(JsonParser jsonParser,
+        DeserializationContext deserializationContext) throws IOException, JsonProcessingException
{
+      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
+    }
+  }
+
+  /** A Jackson serializer for {@link JacksonIncompatible}. */
+  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible>
{
+
+    @Override
+    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
+        SerializerProvider serializerProvider) throws IOException, JsonProcessingException
{
+      jsonGenerator.writeString(jacksonIncompatible.value);
+    }
+  }
+
+  @Test
+  public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
+        .as(JacksonIncompatibleOptions.class);
+    options.setRunner(CrashingRunner.class);
+    Pipeline p = Pipeline.create(options);
+    SparkRuntimeContext context = new SparkRuntimeContext(p);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
+      outputStream.writeObject(context);
+    }
+    try (ObjectInputStream inputStream =
+        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+      SparkRuntimeContext copy = (SparkRuntimeContext) inputStream.readObject();
+      assertEquals("testValue",
+          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
+              .getJacksonIncompatible().value);
+    }
+  }
+}


Mime
View raw message