storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [08/12] storm git commit: STORM-2018: Just the merge
Date Wed, 02 Nov 2016 23:48:42 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 5e5700c..f7cb49e 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -17,12 +17,13 @@
 (ns org.apache.storm.testing
   (:require [org.apache.storm.daemon
              [nimbus :as nimbus]
-             [supervisor :as supervisor]
+             [local-supervisor :as local-supervisor]
              [common :as common]
              [worker :as worker]
              [executor :as executor]])
-  (:require [org.apache.storm [process-simulator :as psim]])
-  (:import [org.apache.commons.io FileUtils])
+  (:import [org.apache.commons.io FileUtils]
+           [org.apache.storm ProcessSimulator]
+           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -31,6 +32,7 @@
   (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
   (:import [org.apache.storm.task TopologyContext])
   (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
+  (:import [org.apache.storm.messaging IContext])
   (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple
             TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
             TestWordSpout MemoryTransactionalSpout])
@@ -108,8 +110,10 @@
                                conf
                                {STORM-LOCAL-DIR tmp-dir
                                 SUPERVISOR-SLOTS-PORTS port-ids})
-        id-fn (if id (fn [] id) supervisor/generate-supervisor-id)
-        daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
+        id (or id (Utils/uuid))
+        isupervisor (proxy [StandaloneSupervisor] []
+                        (generateSupervisorId [] id))
+        daemon (local-supervisor/mk-local-supervisor supervisor-conf (:shared-context cluster-map) isupervisor)]
     (swap! (:supervisors cluster-map) conj daemon)
     (swap! (:tmp-dirs cluster-map) conj tmp-dir)
     daemon))
@@ -135,18 +139,19 @@
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                               (zk/mk-inprocess-zookeeper zk-tmp))
+        nimbus-tmp (local-temp-path)
         daemon-conf (merge (read-storm-config)
                            {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
                             ZMQ-LINGER-MILLIS 0
                             TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
                             TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
                             STORM-CLUSTER-MODE "local"
-                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
+                            BLOBSTORE-SUPERUSER (System/getProperty "user.name")
+                            BLOBSTORE-DIR nimbus-tmp}
                            (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                              {STORM-ZOOKEEPER-PORT zk-port
                               STORM-ZOOKEEPER-SERVERS ["localhost"]})
                            daemon-conf)
-        nimbus-tmp (local-temp-path)
         port-counter (mk-counter supervisor-slot-port-min)
         nimbus (nimbus/service-handler
                 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
@@ -172,17 +177,17 @@
     cluster-map))
 
 (defn get-supervisor [cluster-map supervisor-id]
-  (let [finder-fn #(= (.get-id %) supervisor-id)]
+  (let [finder-fn #(= (.getId %) supervisor-id)]
     (find-first finder-fn @(:supervisors cluster-map))))
 
 (defn kill-supervisor [cluster-map supervisor-id]
-  (let [finder-fn #(= (.get-id %) supervisor-id)
+  (let [finder-fn #(= (.getId %) supervisor-id)
         supervisors @(:supervisors cluster-map)
         sup (find-first finder-fn
                         supervisors)]
     ;; tmp-dir will be taken care of by shutdown
     (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
-    (.shutdown sup)))
+    (.close sup)))
 
 (defn kill-local-storm-cluster [cluster-map]
   (.shutdown (:nimbus cluster-map))
@@ -196,10 +201,10 @@
   (.close (:state cluster-map))
   (.disconnect (:storm-cluster-state cluster-map))
   (doseq [s @(:supervisors cluster-map)]
-    (.shutdown-all-workers s)
+    (.shutdownAllWorkers s)
     ;; race condition here? will it launch the workers again?
-    (supervisor/kill-supervisor s))
-  (psim/kill-all-processes)
+    (.close s))
+  (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))
     (do
       (log-message "Shutting down in process zookeeper")
@@ -235,19 +240,23 @@
     (while-timeout timeout-ms (not (apredicate))
       (Time/sleep 100))))
 
+(defn is-supervisor-waiting [^Supervisor supervisor]
+   (.isWaiting supervisor))
+
+
 (defn wait-until-cluster-waiting
   "Wait until the cluster is idle. Should be used with time simulation."
   ([cluster-map] (wait-until-cluster-waiting cluster-map TEST-TIMEOUT-MS))
   ([cluster-map timeout-ms]
   ;; wait until all workers, supervisors, and nimbus is waiting
   (let [supervisors @(:supervisors cluster-map)
-        workers (filter (partial satisfies? common/DaemonCommon) (psim/all-processes))
+        workers (filter (partial satisfies? common/DaemonCommon) (clojurify-structure (ProcessSimulator/getAllProcessHandles)))
         daemons (concat
                   [(:nimbus cluster-map)]
-                  supervisors
                   ; because a worker may already be dead
                   workers)]
-    (while-timeout timeout-ms (not (every? (memfn waiting?) daemons))
+    (while-timeout timeout-ms (or (not (every? (memfn waiting?) daemons))
+                                (not (every? is-supervisor-waiting supervisors)))
                    (Thread/sleep (rand-int 20))
                    ;;      (doseq [d daemons]
                    ;;        (if-not ((memfn waiting?) d)
@@ -278,7 +287,8 @@
                f# (future (while @keep-waiting?# (simulate-wait ~cluster-sym)))]
            (kill-local-storm-cluster ~cluster-sym)
            (reset! keep-waiting?# false)
-            @f#)))))
+            @f#
+           (.setSubject (ReqContext/context) nil))))))
 
 (defmacro with-simulated-time-local-cluster
   [& args]
@@ -360,35 +370,6 @@
         worker->port (ls-approved-workers supervisor-state)]
     (worker->port worker-id)))
 
-(defn mk-capture-shutdown-fn
-  [capture-atom]
-  (let [existing-fn supervisor/shutdown-worker]
-    (fn [supervisor worker-id]
-      (let [conf (:conf supervisor)
-            supervisor-id (:supervisor-id supervisor)
-            port (find-worker-port conf worker-id)
-            existing (get @capture-atom [supervisor-id port] 0)]
-        (swap! capture-atom assoc [supervisor-id port] (inc existing))
-        (existing-fn supervisor worker-id)))))
-
-(defmacro capture-changed-workers
-  [& body]
-  `(let [launch-captured# (atom {})
-         shutdown-captured# (atom {})]
-     (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
-                      supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
-                     ~@body
-                     {:launched @launch-captured#
-                      :shutdown @shutdown-captured#})))
-
-(defmacro capture-launched-workers
-  [& body]
-  `(:launched (capture-changed-workers ~@body)))
-
-(defmacro capture-shutdown-workers
-  [& body]
-  `(:shutdown (capture-changed-workers ~@body)))
-
 (defnk aggregated-stat
   [cluster-map storm-name stat-key :component-ids nil]
   (let [state (:storm-cluster-state cluster-map)
@@ -659,6 +640,7 @@
                      ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
                      ;; (println "Processed: " (global-amt track-id "processed"))
                      ;; (println "Transferred: " (global-amt track-id "transferred"))
+                    (advance-time-secs! 1)
                     (Thread/sleep (rand-int 200)))
       (reset! (:last-spout-emit tracked-topology) target))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
new file mode 100644
index 0000000..202df12
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+import org.apache.storm.daemon.Shutdownable;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In local mode, {@code ProcessSimulator} keeps track of Shutdownable objects
+ * in place of actual processes (in cluster mode).
+ */
+public class ProcessSimulator {
+    private static Logger LOG = LoggerFactory.getLogger(ProcessSimulator.class);
+    private static Object lock = new Object();
+    protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>();
+
+    /**
+     * Register a process' handle
+     * 
+     * @param pid
+     * @param shutdownable
+     */
+    public static void registerProcess(String pid, Shutdownable shutdownable) {
+        processMap.put(pid, shutdownable);
+    }
+
+    /**
+     * Get all process handles
+     * 
+     * @return
+     */
+    public static Collection<Shutdownable> getAllProcessHandles() {
+        return processMap.values();
+    }
+
+    /**
+     * Kill a process
+     * 
+     * @param pid
+     */
+    public static void killProcess(String pid) {
+        synchronized (lock) {
+            LOG.info("Begin killing process " + pid);
+            Shutdownable shutdownHandle = processMap.get(pid);
+            if (shutdownHandle != null) {
+                shutdownHandle.shutdown();
+            }
+            processMap.remove(pid);
+            LOG.info("Successfully killed process " + pid);
+        }
+    }
+
+    /**
+     * Kill all processes
+     */
+    public static void killAllProcesses() {
+        Set<String> pids = processMap.keySet();
+        for (String pid : pids) {
+            killProcess(pid);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormTimer.java b/storm-core/src/jvm/org/apache/storm/StormTimer.java
new file mode 100644
index 0000000..4be62b4
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/StormTimer.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Random;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The timer defined in this file is very similar to java.util.Timer, except
+ * it integrates with Storm's time simulation capabilities. This lets us test
+ * code that does asynchronous work on the timer thread
+ */
+
+public class StormTimer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(StormTimer.class);
+
+    public static class QueueEntry {
+        public final Long endTimeMs;
+        public final Runnable func;
+        public final String id;
+
+        public QueueEntry(Long endTimeMs, Runnable func, String id) {
+            this.endTimeMs = endTimeMs;
+            this.func = func;
+            this.id = id;
+        }
+    }
+
+    public static class StormTimerTask extends Thread {
+
+        //initialCapacity set to 11 since its the default inital capacity of PriorityBlockingQueue
+        private PriorityBlockingQueue<QueueEntry> queue = new PriorityBlockingQueue<QueueEntry>(11, new Comparator<QueueEntry>() {
+            @Override
+            public int compare(QueueEntry o1, QueueEntry o2) {
+                return o1.endTimeMs.intValue() - o2.endTimeMs.intValue();
+            }
+        });
+
+        // boolean to indicate whether timer is active
+        private AtomicBoolean active = new AtomicBoolean(false);
+
+        // function to call when timer is killed
+        private Thread.UncaughtExceptionHandler onKill;
+
+        //random number generator
+        private Random random = new Random();
+
+        @Override
+        public void run() {
+            while (this.active.get()) {
+                QueueEntry queueEntry = null;
+                try {
+                    queueEntry = this.queue.peek();
+                    if ((queueEntry != null) && (Time.currentTimeMillis() >= queueEntry.endTimeMs)) {
+                        // It is imperative to not run the function
+                        // inside the timer lock. Otherwise, it is
+                        // possible to deadlock if the fn deals with
+                        // other locks, like the submit lock.
+                        this.queue.remove(queueEntry);
+                        queueEntry.func.run();
+                    } else if (queueEntry != null) {
+                        //  If any events are scheduled, sleep until
+                        // event generation. If any recurring events
+                        // are scheduled then we will always go
+                        // through this branch, sleeping only the
+                        // exact necessary amount of time. We give
+                        // an upper bound, e.g. 1000 millis, to the
+                        // sleeping time, to limit the response time
+                        // for detecting any new event within 1 secs.
+                        Time.sleep(Math.min(1000, (queueEntry.endTimeMs - Time.currentTimeMillis())));
+                    } else {
+                        // Otherwise poll to see if any new event
+                        // was scheduled. This is, in essence, the
+                        // response time for detecting any new event
+                        // schedulings when there are no scheduled
+                        // events.
+                        Time.sleep(1000);
+                    }
+                } catch (Throwable e) {
+                    if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))) {
+                        this.onKill.uncaughtException(this, e);
+                        this.setActive(false);
+                    }
+                }
+            }
+        }
+
+        public void setOnKillFunc(Thread.UncaughtExceptionHandler onKill) {
+            this.onKill = onKill;
+        }
+
+        public void setActive(boolean flag) {
+            this.active.set(flag);
+        }
+
+        public boolean isActive() {
+            return this.active.get();
+        }
+
+        public void add(QueueEntry queueEntry) {
+            this.queue.add(queueEntry);
+        }
+    }
+
+    //task to run
+    private StormTimerTask task = new StormTimerTask();
+
+    /**
+     * Makes a Timer in the form of a StormTimerTask Object
+     * @param name name of the timer
+     * @param onKill function to call when timer is killed unexpectedly
+     * @return StormTimerTask object that was initialized
+     */
+    public StormTimer (String name, Thread.UncaughtExceptionHandler onKill) {
+        if (onKill == null) {
+            throw new RuntimeException("onKill func is null!");
+        }
+        if (name == null) {
+            this.task.setName("timer");
+        } else {
+            this.task.setName(name);
+        }
+        this.task.setOnKillFunc(onKill);
+        this.task.setActive(true);
+
+        this.task.setDaemon(true);
+        this.task.setPriority(Thread.MAX_PRIORITY);
+        this.task.start();
+    }
+
+    /**
+     * Schedule a function to be executed in the timer
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param func the function to run
+     * @param checkActive whether to check is the timer is active
+     * @param jitterMs add jitter to the run
+     */
+    public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+        if (func == null) {
+            throw new RuntimeException("function to schedule is null!");
+        }
+        if (checkActive) {
+            checkActive();
+        }
+        String id = Utils.uuid();
+        long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
+        if (jitterMs > 0) {
+            endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
+        }
+        task.add(new QueueEntry(endTimeMs, func, id));
+    }
+
+    public void schedule(int delaySecs, Runnable func) {
+        schedule(delaySecs, func, true, 0);
+    }
+
+    /**
+     * Schedule a function to run recurrently
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param func the function to run
+     */
+    public void scheduleRecurring(int delaySecs, final int recurSecs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
+            @Override
+            public void run() {
+                func.run();
+                // This avoids a race condition with cancel-timer.
+                schedule(recurSecs, this, false, 0);
+            }
+        });
+    }
+
+    /**
+     * schedule a function to run recurrently with jitter
+     * @param delaySecs the number of seconds to delay before running the function
+     * @param recurSecs the time between each invocation
+     * @param jitterMs jitter added to the run
+     * @param func the function to run
+     */
+    public void scheduleRecurringWithJitter(int delaySecs, final int recurSecs, final int jitterMs, final Runnable func) {
+        schedule(delaySecs, new Runnable() {
+            @Override
+            public void run() {
+                func.run();
+                // This avoids a race condition with cancel-timer.
+                schedule(recurSecs, this, false, jitterMs);
+            }
+        });
+    }
+
+    /**
+     * check if timer is active
+     */
+    private void checkActive() {
+        if (!this.task.isActive()) {
+            throw new IllegalStateException("Timer is not active");
+        }
+    }
+
+    /**
+     * cancel timer
+     */
+
+    @Override
+    public void close() throws Exception {
+        checkActive();
+        this.task.setActive(false);
+        this.task.interrupt();
+        this.task.join();
+    }
+
+    /**
+     * is timer waiting. Used in timer simulation
+     */
+    public boolean isTimerWaiting() {
+        return Time.isThreadWaiting(task);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java b/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
new file mode 100644
index 0000000..043dd0c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.storm.zookeeper.ZkEventTypes;
+import org.apache.storm.zookeeper.ZkKeeperStates;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultWatcherCallBack implements WatcherCallBack {
+
+    private static Logger LOG = LoggerFactory.getLogger(DefaultWatcherCallBack.class);
+
+    @Override
+    public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+        LOG.debug("Zookeeper state update:  {}, {}, {}", ZkKeeperStates.getStateName(state), ZkEventTypes.getTypeName(type), path);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java b/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java
new file mode 100644
index 0000000..41a50ec
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface WatcherCallBack {
+    public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java b/storm-core/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
new file mode 100644
index 0000000..75b0e99
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/callback/ZKStateChangedCallback.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.callback;
+
+import org.apache.zookeeper.Watcher;
+
+public interface ZKStateChangedCallback {
+    public void changed(Watcher.Event.EventType type, String path);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/ClusterStateFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterStateFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterStateFactory.java
index 6474d82..fcf5db0 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterStateFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterStateFactory.java
@@ -21,7 +21,9 @@ import clojure.lang.APersistentMap;
 import java.util.List;
 import org.apache.zookeeper.data.ACL;
 
-public interface ClusterStateFactory {
+import org.apache.storm.cluster.StateStorageFactory;
+
+public interface ClusterStateFactory extends StateStorageFactory {
     
     ClusterState mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
new file mode 100644
index 0000000..96c177b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClusterUtils {
+
+    public static final String ZK_SEPERATOR = "/";
+
+    public static final String ASSIGNMENTS_ROOT = "assignments";
+    public static final String CODE_ROOT = "code";
+    public static final String STORMS_ROOT = "storms";
+    public static final String SUPERVISORS_ROOT = "supervisors";
+    public static final String WORKERBEATS_ROOT = "workerbeats";
+    public static final String BACKPRESSURE_ROOT = "backpressure";
+    public static final String ERRORS_ROOT = "errors";
+    public static final String BLOBSTORE_ROOT = "blobstore";
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
+    public static final String NIMBUSES_ROOT = "nimbuses";
+    public static final String CREDENTIALS_ROOT = "credentials";
+    public static final String LOGCONFIG_ROOT = "logconfigs";
+    public static final String PROFILERCONFIG_ROOT = "profilerconfigs";
+
+    public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
+    public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
+    public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
+    public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
+    public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
+    public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
+    public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT;
+    public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT;
+    public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT;
+    public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT;
+
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static final ClusterUtils INSTANCE = new ClusterUtils();
+    private static ClusterUtils _instance = INSTANCE;
+
+    /**
+     * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     *
+     * @param u a Cluster instance
+     */
+    public static void setInstance(ClusterUtils u) {
+        _instance = u;
+    }
+
+    /**
+     * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no longer desired.
+     */
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
+    public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = null;
+        String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+        if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
+            aclList = new ArrayList<>();
+            ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
+            aclList.add(acl1);
+            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+            aclList.add(acl2);
+        }
+        return aclList;
+    }
+
+    public static String supervisorPath(String id) {
+        return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String assignmentPath(String id) {
+        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String blobstorePath(String key) {
+        return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String blobstoreMaxKeySequenceNumberPath(String key) {
+        return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String nimbusPath(String id) {
+        return NIMBUSES_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String stormPath(String id) {
+        return STORMS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String workerbeatStormRoot(String stormId) {
+        return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String workerbeatPath(String stormId, String node, Long port) {
+        return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String backpressureStormRoot(String stormId) {
+        return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String backpressurePath(String stormId, String node, Long port) {
+        return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String errorStormRoot(String stormId) {
+        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String errorPath(String stormId, String componentId) {
+        try {
+            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public static String lastErrorPath(String stormId, String componentId) {
+        return errorPath(stormId, componentId) + "-last-error";
+    }
+
+    public static String credentialsPath(String stormId) {
+        return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String logConfigPath(String stormId) {
+        return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId) {
+        return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) {
+        return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType;
+    }
+
+    public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) {
+        if (serialized != null) {
+            return Utils.deserialize(serialized, clazz);
+        }
+        return null;
+    }
+
+    /**
+     * Ensures that we only return heartbeats for executors assigned to this worker
+     * @param executors
+     * @param workerHeartbeat
+     * @return
+     */
+    public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>();
+        Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
+        for (ExecutorInfo executor : executors) {
+            if (executorStatsMap.containsKey(executor)) {
+                int time = workerHeartbeat.get_time_secs();
+                int uptime = workerHeartbeat.get_uptime_secs();
+                ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
+                ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
+                executorWhb.put(executor, executorBeat);
+            }
+        }
+        return executorWhb;
+    }
+
+    public IStormClusterState mkStormClusterStateImpl(Object stateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+        if (stateStorage instanceof IStateStorage) {
+            return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false);
+        } else {
+            IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context);
+            return new StormClusterStateImpl(Storage, acls, context, true);
+        }
+    }
+
+    public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        String className = null;
+        IStateStorage stateStorage = null;
+        if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
+            className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE);
+        } else {
+            className = "org.apache.storm.cluster.ZKStateStorageFactory";
+        }
+        Class clazz = Class.forName(className);
+        StateStorageFactory storageFactory = (StateStorageFactory) clazz.newInstance();
+        stateStorage = storageFactory.mkStore(config, auth_conf, acls, context);
+        return stateStorage;
+    }
+
+    public static IStateStorage mkStateStorage(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
+    }
+
+    public static IStormClusterState mkStormClusterState(Object StateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+        return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
+    }
+
+    public static String stringifyError(Throwable error) {
+        StringWriter result = new StringWriter();
+        PrintWriter printWriter = new PrintWriter(result);
+        error.printStackTrace(printWriter);
+        return result.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
new file mode 100644
index 0000000..b32615e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.ExecutorStats;
+
+public class ExecutorBeat {
+    private final int timeSecs;
+    private final int uptime;
+    private final ExecutorStats stats;
+
+    public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
+        this.timeSecs = timeSecs;
+        this.uptime = uptime;
+        this.stats = stats;
+    }
+
+    public int getTimeSecs() {
+        return timeSecs;
+    }
+
+    public int getUptime() {
+        return uptime;
+    }
+
+    public ExecutorStats getStats() {
+        return stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
new file mode 100644
index 0000000..aa731ff
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * StateStorage provides the API for the pluggable state store used by the
+ * Storm daemons. Data is stored in path/value format, and the store supports
+ * listing sub-paths at a given path.
+ * All data should be available across all nodes with eventual consistency.
+ *
+ * IMPORTANT NOTE:
+ * Heartbeats have different api calls used to interact with them. The root
+ * path (/) may or may not be the same as the root path for the other api calls.
+ *
+ * For example, performing these two calls:
+ *     set_data("/path", data, acls);
+ *     void set_worker_hb("/path", heartbeat, acls);
+ * may or may not cause a collision in "/path".
+ * Never use the same paths with the *_hb* methods as you do with the others.
+ */
+public interface IStateStorage {
+    
+    /**
+     * Registers a callback function that gets called when CuratorEvents happen.
+     * @param callback is a clojure IFn that accepts the type - translated to
+     * clojure keyword as in zookeeper - and the path: (callback type path)
+     * @return is an id that can be passed to unregister(...) to unregister the
+     * callback.
+     */
+    String register(ZKStateChangedCallback callback);
+
+    /**
+     * Unregisters a callback function that was registered with register(...).
+     * @param id is the String id that was returned from register(...).
+     */
+    void unregister(String id);
+
+    /**
+     * Path will be appended with a monotonically increasing integer, a new node
+     * will be created there, and data will be put at that node.
+     * @param path The path that the monotonically increasing integer suffix will
+     * be added to.
+     * @param data The data that will be written at the suffixed path's node.
+     * @param acls The acls to apply to the path. May be null.
+     * @return The path with the integer suffix appended.
+     */
+    String create_sequential(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Creates nodes for path and all its parents. Path elements are separated by
+     * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
+     * @param path The path to create, along with all its parents.
+     * @param acls The acls to apply to the path. May be null.
+     * @return path
+     */
+    void mkdirs(String path, List<ACL> acls);
+
+    /**
+     * Deletes the node at a given path, and any child nodes that may exist.
+     * @param path The path to delete
+     */
+    void delete_node(String path);
+
+    /**
+     * Creates an ephemeral node at path. Ephemeral nodes are destroyed
+     * by the store when the client disconnects.
+     * @param path The path where a node will be created.
+     * @param data The data to be written at the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Gets the 'version' of the node at a path. Optionally sets a watch
+     * on that node. The version should increase whenever a write happens.
+     * @param path The path to get the version of.
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The integer version of this node.
+     */
+    Integer get_version(String path, boolean watch) throws Exception;
+
+    /**
+     * Check if a node exists and optionally set a watch on the path.
+     * @param path The path to check for the existence of a node.
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return Whether or not a node exists at path.
+     */
+    boolean node_exists(String path, boolean watch);
+
+    /**
+     * Get a list of paths of all the child nodes which exist immediately
+     * under path.
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return list of string paths under path.
+     */
+    List<String> get_children(String path, boolean watch);
+
+    /**
+     * Close the connection to the data store.
+     */
+    void close();
+
+    /**
+     * Set the value of the node at path to data.
+     * @param path The path whose node we want to set.
+     * @param data The data to put in the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_data(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Get the data from the node at path
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The data at the node.
+     */
+    byte[] get_data(String path, boolean watch);
+
+    /**
+     * Get the data at the node along with its version. Data is returned
+     * in an Map with the keys data and version.
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return the data with a version
+     */
+    VersionedData<byte[]> get_data_with_version(String path, boolean watch);
+
+    /**
+     * Write a worker heartbeat at the path.
+     * @param path The path whose node we want to set.
+     * @param data The data to put in the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_worker_hb(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Get the heartbeat from the node at path
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The heartbeat at the node.
+     */
+    byte[] get_worker_hb(String path, boolean watch);
+
+    /**
+     * Get a list of paths of all the child nodes which exist immediately
+     * under path. This is similar to get_children, but must be used for
+     * any nodes
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return list of string paths under path.
+     */
+    List<String> get_worker_hb_children(String path, boolean watch);
+
+    /**
+     * Deletes the heartbeat at a given path, and any child nodes that may exist.
+     * @param path The path to delete.
+     */
+    void delete_worker_hb(String path);
+
+    /**
+     * Add a StateStorageListener to the connection.
+     * @param listener A StateStorageListener to handle changing cluster state
+     * events.
+     */
+    void add_listener(final ConnectionStateListener listener);
+
+    /**
+     * Force consistency on a path. Any writes committed on the path before
+     * this call will be completely propagated when it returns.
+     * @param path The path to synchronize.
+     */
+    void sync_path(String path);
+
+    /**
+     * Allows us to delete the znodes within /storm/blobstore/key_name
+     * whose znodes start with the corresponding nimbusHostPortInfo
+     * @param path /storm/blobstore/key_name
+     * @param nimbusHostPortInfo Contains the host port information of
+     * a nimbus node.
+     */
+    void delete_node_blobstore(String path, String nimbusHostPortInfo);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
new file mode 100644
index 0000000..a6f07ed
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map;
+
+public interface IStormClusterState {
+    public List<String> assignments(Runnable callback);
+
+    public Assignment assignmentInfo(String stormId, Runnable callback);
+
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+
+    public List<String> blobstoreInfo(String blobKey);
+
+    public List nimbuses();
+
+    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+
+    public List<String> activeStorms();
+
+    public StormBase stormBase(String stormId, Runnable callback);
+
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+
+    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+
+    public List<String> supervisors(Runnable callback);
+
+    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+
+    public void setupHeatbeats(String stormId);
+
+    public void teardownHeartbeats(String stormId);
+
+    public void teardownTopologyErrors(String stormId);
+
+    public List<String> heartbeatStorms();
+
+    public List<String> errorTopologies();
+
+    public List<String> backpressureTopologies();
+
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+
+    public LogConfig topologyLogConfig(String stormId, Runnable cb);
+
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+
+    public void removeWorkerHeartbeat(String stormId, String node, Long port);
+
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+
+    public void workerBackpressure(String stormId, String node, Long port, boolean on);
+
+    public boolean topologyBackpressure(String stormId, Runnable callback);
+
+    public void setupBackpressure(String stormId);
+
+    public void removeBackpressure(String stormId);
+
+    public void removeWorkerBackpressure(String stormId, String node, Long port);
+
+    public void activateStorm(String stormId, StormBase stormBase);
+
+    public void updateStorm(String stormId, StormBase newElems);
+
+    public void removeStormBase(String stormId);
+
+    public void setAssignment(String stormId, Assignment info);
+
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
+
+    public List<String> activeKeys();
+
+    public List<String> blobstore(Runnable callback);
+
+    public void removeStorm(String stormId);
+
+    public void removeBlobstoreKey(String blobKey);
+
+    public void removeKeyVersion(String blobKey);
+
+    public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+
+    public List<ErrorInfo> errors(String stormId, String componentId);
+
+    public ErrorInfo lastError(String stormId, String componentId);
+
+    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
+
+    public Credentials credentials(String stormId, Runnable callback);
+
+    public void disconnect();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
new file mode 100644
index 0000000..0929750
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.data.ACL;
+
+public interface StateStorageFactory {
+
+    IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
new file mode 100644
index 0000000..972d778
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements IStormClusterState {
+
+    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+    private IStateStorage stateStorage;
+
+    private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
+    private AtomicReference<Runnable> supervisorsCallback;
+    // we want to reigister a topo directory getChildren callback for all workers of this dir
+    private ConcurrentHashMap<String, Runnable> backPressureCallback;
+    private AtomicReference<Runnable> assignmentsCallback;
+    private ConcurrentHashMap<String, Runnable> stormBaseCallback;
+    private AtomicReference<Runnable> blobstoreCallback;
+    private ConcurrentHashMap<String, Runnable> credentialsCallback;
+    private ConcurrentHashMap<String, Runnable> logConfigCallback;
+
+    private List<ACL> acls;
+    private String stateId;
+    private boolean solo;
+
+    public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+        this.stateStorage = StateStorage;
+        this.solo = solo;
+        this.acls = acls;
+
+        assignmentInfoCallback = new ConcurrentHashMap<>();
+        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+        assignmentVersionCallback = new ConcurrentHashMap<>();
+        supervisorsCallback = new AtomicReference<>();
+        backPressureCallback = new ConcurrentHashMap<>();
+        assignmentsCallback = new AtomicReference<>();
+        stormBaseCallback = new ConcurrentHashMap<>();
+        credentialsCallback = new ConcurrentHashMap<>();
+        logConfigCallback = new ConcurrentHashMap<>();
+        blobstoreCallback = new AtomicReference<>();
+
+        stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+            public void changed(Watcher.Event.EventType type, String path) {
+                List<String> toks = Zookeeper.tokenizePath(path);
+                int size = toks.size();
+                if (size >= 1) {
+                    String root = toks.get(0);
+                    if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+                        if (size == 1) {
+                            // set null and get the old value
+                            issueCallback(assignmentsCallback);
+                        } else {
+                            issueMapCallback(assignmentInfoCallback, toks.get(1));
+                            issueMapCallback(assignmentVersionCallback, toks.get(1));
+                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+                        }
+
+                    } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+                        issueCallback(supervisorsCallback);
+                    } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+                        issueCallback(blobstoreCallback);
+                    } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+                        issueMapCallback(stormBaseCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+                        issueMapCallback(credentialsCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+                        issueMapCallback(backPressureCallback, toks.get(1));
+                    } else {
+                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+                        Runtime.getRuntime().exit(30);
+                    }
+
+                }
+
+                return;
+            }
+
+        });
+
+        String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, 
+                              ClusterUtils.STORMS_SUBTREE, 
+                              ClusterUtils.SUPERVISORS_SUBTREE, 
+                              ClusterUtils.WORKERBEATS_SUBTREE,
+                              ClusterUtils.ERRORS_SUBTREE, 
+                              ClusterUtils.BLOBSTORE_SUBTREE, 
+                              ClusterUtils.NIMBUSES_SUBTREE, 
+                              ClusterUtils.LOGCONFIG_SUBTREE,
+                              ClusterUtils.BACKPRESSURE_SUBTREE };
+        for (String path : pathlist) {
+            this.stateStorage.mkdirs(path, acls);
+        }
+
+    }
+
+    protected void issueCallback(AtomicReference<Runnable> cb) {
+        Runnable callback = cb.getAndSet(null);
+        if (callback != null)
+            callback.run();
+    }
+
+    protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
+        Runnable callback = callbackConcurrentHashMap.remove(key);
+        if (callback != null)
+            callback.run();
+    }
+
+    @Override
+    public List<String> assignments(Runnable callback) {
+        if (callback != null) {
+            assignmentsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public Assignment assignmentInfo(String stormId, Runnable callback) {
+        if (callback != null) {
+            assignmentInfoCallback.put(stormId, callback);
+        }
+        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+    }
+
+    @Override
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
+        if (callback != null) {
+            assignmentInfoWithVersionCallback.put(stormId, callback);
+        }
+        Assignment assignment = null;
+        Integer version = 0;
+        VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+        if (dataWithVersion != null) {
+            assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
+            version = dataWithVersion.getVersion();
+        }
+        return new VersionedData<Assignment>(version, assignment);
+    }
+
+    @Override
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
+        if (callback != null) {
+            assignmentVersionCallback.put(stormId, callback);
+        }
+        return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstoreInfo(String blobKey) {
+        String path = ClusterUtils.blobstorePath(blobKey);
+        stateStorage.sync_path(path);
+        return stateStorage.get_children(path, false);
+    }
+
+    @Override
+    public List nimbuses() {
+        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+        List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+        for (String nimbusId : nimbusIds) {
+            byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+            NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+            nimbusSummaries.add(nimbusSummary);
+        }
+        return nimbusSummaries;
+    }
+
+    @Override
+    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+        // explicit delete for ephmeral node to ensure this session creates the entry.
+        stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+        stateStorage.add_listener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+                if (connectionState.equals(ConnectionState.RECONNECTED)) {
+                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+                    // explicit delete for ephmeral node to ensure this session creates the entry.
+                    stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+                    stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+                }
+
+            }
+        });
+
+        stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+    }
+
+    @Override
+    public List<String> activeStorms() {
+        return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+    }
+
+    @Override
+    public StormBase stormBase(String stormId, Runnable callback) {
+        if (callback != null) {
+            stormBaseCallback.put(stormId, callback);
+        }
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+    }
+
+    @Override
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+        byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+        return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+    }
+
+    @Override
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) {
+        List<ProfileRequest> requests = new ArrayList<>();
+        List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId);
+        for (ProfileRequest profileRequest : profileRequests) {
+            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+            if (nodeInfo1.equals(nodeInfo))
+                requests.add(profileRequest);
+        }
+        return requests;
+    }
+
+    @Override
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId) {
+        List<ProfileRequest> profileRequests = new ArrayList<>();
+        String path = ClusterUtils.profilerConfigPath(stormId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> strs = stateStorage.get_children(path, false);
+            for (String str : strs) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+                byte[] raw = stateStorage.get_data(childPath, false);
+                ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+                if (request != null)
+                    profileRequests.add(request);
+            }
+        }
+        return profileRequests;
+    }
+
+    @Override
+    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+    }
+
+    @Override
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.delete_node(path);
+    }
+
+    /**
+     * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
+     * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
+     * situations like that
+     * 
+     * @param stormId
+     * @param executorNodePort
+     * @return
+     */
+    @Override
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
+
+        Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
+
+        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+            String node = entry.getKey().get_node();
+            Long port = entry.getKey().get_port_iterator().next();
+            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+            List<ExecutorInfo> executorInfoList = new ArrayList<>();
+            for (List<Long> list : entry.getValue()) {
+                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+            }
+            if (whb != null)
+                executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+        }
+        return executorWhbs;
+    }
+
+    @Override
+    public List<String> supervisors(Runnable callback) {
+        if (callback != null) {
+            supervisorsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public SupervisorInfo supervisorInfo(String supervisorId) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+    }
+
+    @Override
+    public void setupHeatbeats(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void teardownHeartbeats(String stormId) {
+        try {
+            stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown heartbeats for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void teardownTopologyErrors(String stormId) {
+        try {
+            stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown errors for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public List<String> heartbeatStorms() {
+        return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> errorTopologies() {
+        return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> backpressureTopologies() {
+        return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
+    }
+
+    @Override
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+    }
+
+    @Override
+    public LogConfig topologyLogConfig(String stormId, Runnable cb) {
+        if (cb != null){
+            logConfigCallback.put(stormId, cb);
+        }
+        String path = ClusterUtils.logConfigPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+    }
+
+    @Override
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+        if (info != null) {
+            String path = ClusterUtils.workerbeatPath(stormId, node, port);
+            stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+        }
+    }
+
+    @Override
+    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+        String path = ClusterUtils.workerbeatPath(stormId, node, port);
+        stateStorage.delete_worker_hb(path);
+    }
+
+    @Override
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+    }
+
+    /**
+     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
+     * 
+     * @param stormId
+     * @param node
+     * @param port
+     * @param on
+     */
+    @Override
+    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+        String path = ClusterUtils.backpressurePath(stormId, node, port);
+        boolean existed = stateStorage.node_exists(path, false);
+        if (existed) {
+            if (on == false)
+                stateStorage.delete_node(path);
+
+        } else {
+            if (on == true) {
+                stateStorage.set_ephemeral_node(path, null, acls);
+            }
+        }
+    }
+
+    /**
+     * Check whether a topology is in throttle-on status or not:
+     * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
+     * 
+     * @param stormId
+     * @param callback
+     * @return
+     */
+    @Override
+    public boolean topologyBackpressure(String stormId, Runnable callback) {
+        if (callback != null) {
+            backPressureCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.backpressureStormRoot(stormId);
+        List<String> childrens = null;
+        if(stateStorage.node_exists(path, false)) {
+            childrens = stateStorage.get_children(path, callback != null);
+        } else {
+            childrens = new ArrayList<>();
+        }
+        return childrens.size() > 0;
+
+    }
+
+    @Override
+    public void setupBackpressure(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void removeBackpressure(String stormId) {
+        try {
+            stateStorage.delete_node(ClusterUtils.backpressureStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown backpressure node for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void removeWorkerBackpressure(String stormId, String node, Long port) {
+        String path = ClusterUtils.backpressurePath(stormId, node, port);
+        boolean existed = stateStorage.node_exists(path, false);
+        if (existed) {
+            stateStorage.delete_node(path);
+        }
+    }
+
+    @Override
+    public void activateStorm(String stormId, StormBase stormBase) {
+        String path = ClusterUtils.stormPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+    }
+
+    /**
+     * To update this function due to APersistentMap/APersistentSet is clojure's structure
+     * 
+     * @param stormId
+     * @param newElems
+     */
+    @Override
+    public void updateStorm(String stormId, StormBase newElems) {
+
+        StormBase stormBase = stormBase(stormId, null);
+        if (stormBase.get_component_executors() != null) {
+
+            Map<String, Integer> newComponentExecutors = new HashMap<>();
+            Map<String, Integer> componentExecutors = newElems.get_component_executors();
+            // componentExecutors maybe be APersistentMap, which don't support "put"
+            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+                newComponentExecutors.put(entry.getKey(), entry.getValue());
+            }
+            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+                if (!componentExecutors.containsKey(entry.getKey())) {
+                    newComponentExecutors.put(entry.getKey(), entry.getValue());
+                }
+            }
+            if (newComponentExecutors.size() > 0)
+                newElems.set_component_executors(newComponentExecutors);
+        }
+
+        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+        /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+        Set<String> debugOptionsKeys = new HashSet<>();
+        debugOptionsKeys.addAll(oldComponentDebug.keySet());
+        debugOptionsKeys.addAll(newComponentDebug.keySet());
+        for (String key : debugOptionsKeys) {
+            boolean enable = false;
+            double samplingpct = 0;
+            if (oldComponentDebug.containsKey(key)) {
+                enable = oldComponentDebug.get(key).is_enable();
+                samplingpct = oldComponentDebug.get(key).get_samplingpct();
+            }
+            if (newComponentDebug.containsKey(key)) {
+                enable = newComponentDebug.get(key).is_enable();
+                samplingpct += newComponentDebug.get(key).get_samplingpct();
+            }
+            DebugOptions debugOptions = new DebugOptions();
+            debugOptions.set_enable(enable);
+            debugOptions.set_samplingpct(samplingpct);
+            ComponentDebug.put(key, debugOptions);
+        }
+        if (ComponentDebug.size() > 0) {
+            newElems.set_component_debug(ComponentDebug);
+        }
+
+        if (StringUtils.isBlank(newElems.get_name())) {
+            newElems.set_name(stormBase.get_name());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        if (newElems.get_num_workers() == 0) {
+            newElems.set_num_workers(stormBase.get_num_workers());
+        }
+        if (newElems.get_launch_time_secs() == 0) {
+            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+        }
+        if (StringUtils.isBlank(newElems.get_owner())) {
+            newElems.set_owner(stormBase.get_owner());
+        }
+        if (newElems.get_topology_action_options() == null) {
+            newElems.set_topology_action_options(stormBase.get_topology_action_options());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+    }
+
+    @Override
+    public void removeStormBase(String stormId) {
+        stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+    }
+
+    @Override
+    public void setAssignment(String stormId, Assignment info) {
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+    }
+
+    @Override
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+        String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+        LOG.info("set-path: {}", path);
+        stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+        stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+        stateStorage.set_ephemeral_node(path, null, acls);
+    }
+
+    @Override
+    public List<String> activeKeys() {
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstore(Runnable callback) {
+        if (callback != null) {
+            blobstoreCallback.set(callback);
+        }
+        stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+    }
+
+    @Override
+    public void removeStorm(String stormId) {
+        stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+        stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+        stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+        stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+        removeStormBase(stormId);
+    }
+
+    @Override
+    public void removeBlobstoreKey(String blobKey) {
+        LOG.debug("remove key {}", blobKey);
+        stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+    }
+
+    @Override
+    public void removeKeyVersion(String blobKey) {
+        stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+    }
+
+    @Override
+    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
+
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
+        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
+        errorInfo.set_host(node);
+        errorInfo.set_port(port.intValue());
+        byte[] serData = Utils.serialize(errorInfo);
+        stateStorage.mkdirs(path, acls);
+        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+        stateStorage.set_data(lastErrorPath, serData, acls);
+        List<String> childrens = stateStorage.get_children(path, false);
+
+        Collections.sort(childrens, new Comparator<String>() {
+            public int compare(String arg0, String arg1) {
+                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+            }
+        });
+
+        while (childrens.size() > 10) {
+            stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+        }
+    }
+
+    @Override
+    public List<ErrorInfo> errors(String stormId, String componentId) {
+        List<ErrorInfo> errorInfos = new ArrayList<>();
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> childrens = stateStorage.get_children(path, false);
+            for (String child : childrens) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+                ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+                if (errorInfo != null)
+                    errorInfos.add(errorInfo);
+            }
+        }
+        Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+            public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+                return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
+            }
+        });
+
+        return errorInfos;
+    }
+
+    @Override
+    public ErrorInfo lastError(String stormId, String componentId) {
+
+        String path = ClusterUtils.lastErrorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+            return errorInfo;
+        }
+
+        return null;
+    }
+
+    @Override
+    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+        String path = ClusterUtils.credentialsPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+    }
+
+    @Override
+    public Credentials credentials(String stormId, Runnable callback) {
+        if (callback != null) {
+            credentialsCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.credentialsPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+    }
+
+    @Override
+    public void disconnect() {
+        stateStorage.unregister(stateId);
+        if (solo)
+            stateStorage.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8210b87/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
new file mode 100644
index 0000000..3de2a88
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/VersionedData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public class VersionedData<D> {
+    private final int version;
+    private final D data;
+    
+    public VersionedData(int version, D data) {
+        this.version = version;
+        this.data = data;
+    }
+    
+    public int getVersion() {
+        return version;
+    }
+    
+    public D getData() {
+        return data;
+    }
+}


Mime
View raw message