storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/4] storm git commit: STORM-1248: port backtype.storm.messaging.loader to java
Date Fri, 12 Feb 2016 20:06:49 GMT
Repository: storm
Updated Branches:
  refs/heads/master 9ddd29ff2 -> 12ceb0975


STORM-1248: port backtype.storm.messaging.loader to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/36aa7b07
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/36aa7b07
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/36aa7b07

Branch: refs/heads/master
Commit: 36aa7b07344fe6b0caf46b3592d1754891ff9597
Parents: 3339dae
Author: Abhishek Agarwal <abhishek.agarwal@inmobi.com>
Authored: Fri Feb 12 00:33:56 2016 +0530
Committer: Abhishek Agarwal <abhishek.agarwal@inmobi.com>
Committed: Fri Feb 12 00:33:56 2016 +0530

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 13 ++++----
 .../clj/org/apache/storm/messaging/loader.clj   | 34 --------------------
 .../clj/org/apache/storm/messaging/local.clj    | 23 -------------
 storm-core/src/clj/org/apache/storm/testing.clj |  8 +++--
 4 files changed, 11 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 48934f6..0a2a6d6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -21,14 +21,13 @@
   (:require [org.apache.storm.daemon [executor :as executor]])
   (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]])
   (:require [clojure.set :as set])
-  (:require [org.apache.storm.messaging.loader :as msg-loader])
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
   (:import [java.util ArrayList HashMap])
   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils
WorkerBackpressureThread DisruptorQueue])
   (:import [org.apache.storm.grouping LoadMapping])
   (:import [org.apache.storm.messaging TransportFactory])
-  (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus
ConnectionWithStatus$Status])
+  (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus
ConnectionWithStatus$Status DeserializingConnectionCallback])
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
   (:import [org.apache.storm.generated StormTopology])
@@ -461,11 +460,11 @@
             )))))
 
 (defn register-callbacks [worker]
-  (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port
worker))
-  (msg-loader/register-callback (:transfer-local-fn worker)
-                                (:receiver worker)
-                                (:storm-conf worker)
-                                (worker-context worker)))
+  (let [transfer-local-fn (:transfer-local-fn worker) ^IConnection socket (:receiver worker)]
+    (log-message "Registering IConnectionCallbacks for " (:assignment-id worker) ":" (:port
worker))
+    (.registerRecv socket (DeserializingConnectionCallback. (:storm-conf worker)
+                                                            (worker-context worker)
+                                                            transfer-local-fn))))
 
 (defn- close-resources [worker]
   (let [dr (:default-shared-resources worker)]

http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/messaging/loader.clj b/storm-core/src/clj/org/apache/storm/messaging/loader.clj
deleted file mode 100644
index b190ab0..0000000
--- a/storm-core/src/clj/org/apache/storm/messaging/loader.clj
+++ /dev/null
@@ -1,34 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.loader
-  (:import [org.apache.storm.messaging IConnection DeserializingConnectionCallback])
-  (:require [org.apache.storm.messaging [local :as local]]))
-
-(defn mk-local-context []
-  (local/mk-context))
-
-(defn- mk-connection-callback
-  "make an IConnectionCallback"
-  [transfer-local-fn storm-conf worker-context]
-  (DeserializingConnectionCallback. storm-conf
-                                    worker-context
-                                    (fn [batch]
-                                      (transfer-local-fn batch))))
-
-(defn register-callback
-  "register the local-transfer-fn with the server"
-  [transfer-local-fn ^IConnection socket storm-conf worker-context]
-  (.registerRecv socket (mk-connection-callback transfer-local-fn storm-conf worker-context)))

http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/messaging/local.clj b/storm-core/src/clj/org/apache/storm/messaging/local.clj
deleted file mode 100644
index 32fbb34..0000000
--- a/storm-core/src/clj/org/apache/storm/messaging/local.clj
+++ /dev/null
@@ -1,23 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.messaging.local
-  (:import [org.apache.storm.messaging IContext])
-  (:import [org.apache.storm.messaging.local Context]))
-
-(defn mk-context [] 
-  (let [context  (Context.)]
-    (.prepare ^IContext context nil)
-    context))

http://git-wip-us.apache.org/repos/asf/storm/blob/36aa7b07/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index cc78659..12828d6 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -44,9 +44,9 @@
   (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor])
   (:import [org.apache.storm.tuple Tuple])
   (:import [org.apache.storm.generated StormTopology])
-  (:import [org.apache.storm.task TopologyContext])
+  (:import [org.apache.storm.task TopologyContext]
+           (org.apache.storm.messaging IContext))
   (:require [org.apache.storm [zookeeper :as zk]])
-  (:require [org.apache.storm.messaging.loader :as msg-loader])
   (:require [org.apache.storm.daemon.acker :as acker])
   (:use [org.apache.storm cluster util thrift config log local-state]))
 
@@ -117,7 +117,9 @@
 
 (defn mk-shared-context [conf]
   (if-not (conf STORM-LOCAL-MODE-ZMQ)
-    (msg-loader/mk-local-context)))
+    (let [context  (org.apache.storm.messaging.local.Context.)]
+      (.prepare ^IContext context nil)
+      context)))
 
 (defn start-nimbus-daemon [conf nimbus]
   (let [server (ThriftServer. conf (Nimbus$Processor. nimbus)


Mime
View raw message