beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [15/50] incubator-beam git commit: [BEAM-809] Create a KryoRegistrator for the SparkRunner.
Date Fri, 28 Oct 2016 14:47:49 GMT
[BEAM-809] Create a KryoRegistrator for the SparkRunner.

Use Class#getName() instead of canonicalName().


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b83858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b83858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b83858

Branch: refs/heads/apex-runner
Commit: 13b83858746356068a6d618e04da6839e837d28c
Parents: 53fe3ee
Author: Sela <ansela@paypal.com>
Authored: Mon Oct 24 22:35:39 2016 +0300
Committer: Sela <ansela@paypal.com>
Committed: Wed Oct 26 18:53:28 2016 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 23 ++++++++++
 .../coders/BeamSparkRunnerRegistrator.java      | 46 ++++++++++++++++++++
 .../spark/translation/SparkContextFactory.java  |  5 ++-
 3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ccec3c6..458205a 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -147,6 +147,29 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.21</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>de.javakaffee</groupId>
+      <artifactId>kryo-serializers</artifactId>
+      <version>0.39</version>
+      <exclusions>
+        <!-- Use Spark's Kryo -->
+        <exclusion>
+          <groupId>com.esotericsoftware</groupId>
+          <artifactId>kryo</artifactId>
+        </exclusion>
+        <!-- We only really need the serializer implementations -->
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
       <version>1.3.9</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
new file mode 100644
index 0000000..0e62781
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import org.apache.spark.serializer.KryoRegistrator;
+
+
+/**
+ * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs.
+ */
+public class BeamSparkRunnerRegistrator implements KryoRegistrator {
+
+  @Override
+  public void registerClasses(Kryo kryo) {
+    UnmodifiableCollectionsSerializer.registerSerializers(kryo);
+    // Guava
+    ImmutableListSerializer.registerSerializers(kryo);
+    ImmutableSetSerializer.registerSerializers(kryo);
+    ImmutableMapSerializer.registerSerializers(kryo);
+    ImmutableMultimapSerializer.registerSerializers(kryo);
+    ReverseListSerializer.registerSerializers(kryo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 4877f6e..ee2104a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.serializer.KryoSerializer;
@@ -85,7 +86,9 @@ public final class SparkContextFactory {
         conf.setMaster(options.getSparkMaster());
       }
       conf.setAppName(options.getAppName());
-      conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
+      // register immutable collections serializers because the SDK uses them.
+      conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName());
+      conf.set("spark.serializer", KryoSerializer.class.getName());
       return new JavaSparkContext(conf);
     }
   }


Mime
View raw message