storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [06/16] storm git commit: Using java serialization for all places where the contract with customer is to use java serialization.
Date Wed, 18 Mar 2015 19:39:13 GMT
Using java serialization for all places where the contract with customer is to use java serialization.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/43197660
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/43197660
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/43197660

Branch: refs/heads/master
Commit: 431976607857d5d29913ba055082a546a22be16e
Parents: 0571e22
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Tue Feb 17 10:57:25 2015 -0800
Committer: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Committed: Tue Feb 17 10:57:25 2015 -0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/config.clj             |  2 +-
 storm-core/src/clj/backtype/storm/thrift.clj             | 10 +++++-----
 .../src/jvm/backtype/storm/topology/TopologyBuilder.java |  4 ++--
 storm-core/src/jvm/backtype/storm/utils/LocalState.java  |  4 ++--
 storm-core/src/jvm/backtype/storm/utils/Utils.java       | 11 ++++++++++-
 storm-core/src/jvm/storm/trident/Stream.java             |  2 +-
 storm-core/src/jvm/storm/trident/TridentTopology.java    |  4 ++--
 7 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index d09b31b..00396c7 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -212,7 +212,7 @@
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         conf-path (supervisor-stormconf-path stormroot)
         topology-path (supervisor-stormcode-path stormroot)]
-    (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path)) java.util.Map))
+    (merge conf (Utils/javaDeserialize (FileUtils/readFileToByteArray (File. conf-path))
java.util.Map))
     ))
 
 (defn read-supervisor-topology

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
index 5bc1150..0ff6dd0 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -122,7 +122,7 @@
 
 (defnk mk-spout-spec*
   [spout outputs :p nil :conf nil]
-  (SpoutSpec. (ComponentObject/serialized_java (Utils/serialize spout))
+  (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
               (mk-plain-component-common {} outputs p :conf conf)))
 
 (defn mk-shuffle-grouping
@@ -157,11 +157,11 @@
   [^ComponentObject obj]
   (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
     (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
-  (Utils/deserialize (.get_serialized_java obj) Serializable))
+  (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
 
 (defn serialize-component-object
   [obj]
-  (ComponentObject/serialized_java (Utils/serialize obj)))
+  (ComponentObject/serialized_java (Utils/javaSerialize obj)))
 
 (defn- mk-grouping
   [grouping-spec]
@@ -172,7 +172,7 @@
         grouping-spec
 
         (instance? CustomStreamGrouping grouping-spec)
-        (Grouping/custom_serialized (Utils/serialize grouping-spec))
+        (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
 
         (instance? JavaObject grouping-spec)
         (Grouping/custom_object grouping-spec)
@@ -212,7 +212,7 @@
 (defnk mk-bolt-spec*
   [inputs bolt outputs :p nil :conf nil]
   (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
-    (Bolt. (ComponentObject/serialized_java (Utils/serialize bolt))
+    (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
            common)))
 
 (defnk mk-spout-spec

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 0a47626..9d8f271 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -104,12 +104,12 @@ public class TopologyBuilder {
         for(String boltId: _bolts.keySet()) {
             IRichBolt bolt = _bolts.get(boltId);
             ComponentCommon common = getComponentCommon(boltId, bolt);
-            boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)),
common));
+            boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)),
common));
         }
         for(String spoutId: _spouts.keySet()) {
             IRichSpout spout = _spouts.get(spoutId);
             ComponentCommon common = getComponentCommon(spoutId, spout);
-            spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)),
common));
+            spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)),
common));
             
         }
         return new StormTopology(spoutSpecs,

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/jvm/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 561988c..65f2152 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -64,7 +64,7 @@ public class LocalState {
             if (serialized.length == 0) {
                 LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
             } else {
-                result = Utils.deserialize(serialized, Map.class);
+                result = Utils.javaDeserialize(serialized, Map.class);
             }
         }
         return result;
@@ -99,7 +99,7 @@ public class LocalState {
     }
     
     private void persist(Map<Object, Object> val, boolean cleanup) throws IOException
{
-        byte[] toWrite = Utils.serialize(val);
+        byte[] toWrite = Utils.javaSerialize(val);
         String newPath = _vs.createVersion();
         File file = new File(newPath);
         FileUtils.writeByteArrayToFile(file, toWrite);

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index fd56539..ba1a2ab 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -52,6 +52,7 @@ public class Utils {
     public static final String DEFAULT_STREAM_ID = "default";
 
     private static SerializationDelegate serializationDelegate;
+    private static final DefaultSerializationDelegate javaSerializationDelegate = new DefaultSerializationDelegate();
 
     static {
         Map conf = readStormConfig();
@@ -75,6 +76,14 @@ public class Utils {
         return serializationDelegate.deserialize(serialized, clazz);
     }
 
+    public static byte[] javaSerialize(Object obj) {
+        return javaSerializationDelegate.serialize(obj);
+    }
+
+    public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
+        return javaSerializationDelegate.deserialize(serialized, clazz);
+    }
+
     public static <T> String join(Iterable<T> coll, String sep) {
         Iterator<T> it = coll.iterator();
         String ret = "";
@@ -211,7 +220,7 @@ public class Utils {
 
     public static Object getSetComponentObject(ComponentObject obj) {
         if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
-            return Utils.deserialize(obj.get_serialized_java(), Serializable.class);
+            return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
         } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
             return obj.get_java_object();
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/jvm/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/Stream.java b/storm-core/src/jvm/storm/trident/Stream.java
index c308745..f4c515e 100644
--- a/storm-core/src/jvm/storm/trident/Stream.java
+++ b/storm-core/src/jvm/storm/trident/Stream.java
@@ -94,7 +94,7 @@ public class Stream implements IAggregatableStream {
     }
     
     public Stream partition(CustomStreamGrouping partitioner) {
-        return partition(Grouping.custom_serialized(Utils.serialize(partitioner)));
+        return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
     }
     
     public Stream shuffle() {

http://git-wip-us.apache.org/repos/asf/storm/blob/43197660/storm-core/src/jvm/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/TridentTopology.java b/storm-core/src/jvm/storm/trident/TridentTopology.java
index 25506c8..3ed2386 100644
--- a/storm-core/src/jvm/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/storm/trident/TridentTopology.java
@@ -666,7 +666,7 @@ public class TridentTopology {
     private static boolean isIdentityPartition(PartitionNode n) {
         Grouping g = n.thriftGrouping;
         if(g.is_set_custom_serialized()) {
-            CustomStreamGrouping csg = (CustomStreamGrouping) Utils.deserialize(g.get_custom_serialized(),
Serializable.class);
+            CustomStreamGrouping csg = (CustomStreamGrouping) Utils.javaDeserialize(g.get_custom_serialized(),
Serializable.class);
             return csg instanceof IdentityGrouping;
         }
         return false;
@@ -725,7 +725,7 @@ public class TridentTopology {
     
     private static PartitionNode makeIdentityPartition(Node basis) {
         return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields,
-            Grouping.custom_serialized(Utils.serialize(new IdentityGrouping())));
+            Grouping.custom_serialized(Utils.javaSerialize(new IdentityGrouping())));
     }
     
     


Mime
View raw message