storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/27] storm git commit: port org.apache.storm.cluster.cluster.clj
Date Wed, 24 Feb 2016 16:17:11 GMT
Repository: storm
Updated Branches:
  refs/heads/master 11232b539 -> 71d615b7c


port org.apache.storm.cluster.cluster.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/682d31c8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/682d31c8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/682d31c8

Branch: refs/heads/master
Commit: 682d31c8a5f53a59f47b4df4bd35ff828e7e0aa5
Parents: 66d7a39
Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Authored: Mon Feb 1 15:17:49 2016 +0800
Committer: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Committed: Mon Feb 1 15:17:49 2016 +0800

----------------------------------------------------------------------
 .../jvm/org/apache/storm/cluster/Cluster.java   | 204 ++++++
 .../org/apache/storm/cluster/ClusterState.java  |  11 +-
 .../storm/cluster/DistributedClusterState.java  | 269 ++++++++
 .../apache/storm/cluster/StormClusterState.java | 129 ++++
 .../storm/cluster/StormZkClusterState.java      | 662 +++++++++++++++++++
 5 files changed, 1271 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/682d31c8/storm-core/src/jvm/org/apache/storm/cluster/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/Cluster.java b/storm-core/src/jvm/org/apache/storm/cluster/Cluster.java
new file mode 100644
index 0000000..2d6f306
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/Cluster.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Cluster {
+
+    public static final String ZK_SEPERATOR = "/";
+
+    public static final String ASSIGNMENTS_ROOT = "assignments";
+    public static final String CODE_ROOT = "code";
+    public static final String STORMS_ROOT = "storms";
+    public static final String SUPERVISORS_ROOT = "supervisors";
+    public static final String WORKERBEATS_ROOT = "workerbeats";
+    public static final String BACKPRESSURE_ROOT = "backpressure";
+    public static final String ERRORS_ROOT = "errors";
+    public static final String BLOBSTORE_ROOT = "blobstore";
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
+    public static final String NIMBUSES_ROOT = "nimbuses";
+    public static final String CREDENTIALS_ROOT = "credentials";
+    public static final String LOGCONFIG_ROOT = "logconfigs";
+    public static final String PROFILERCONFIG_ROOT = "profilerconfigs";
+
+    public static final String ASSIGNMENTS_SUBTREE;
+    public static final String STORMS_SUBTREE;
+    public static final String SUPERVISORS_SUBTREE;
+    public static final String WORKERBEATS_SUBTREE;
+    public static final String BACKPRESSURE_SUBTREE;
+    public static final String ERRORS_SUBTREE;
+    public static final String BLOBSTORE_SUBTREE;
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE;
+    public static final String NIMBUSES_SUBTREE;
+    public static final String CREDENTIALS_SUBTREE;
+    public static final String LOGCONFIG_SUBTREE;
+    public static final String PROFILERCONFIG_SUBTREE;
+
+    static {
+        ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
+        STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
+        SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
+        WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
+        BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+        ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
+        BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
+        BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
+        NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT;
+        CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT;
+        LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT;
+        PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT;
+    }
+
+    public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = new ArrayList<>();
+        String payload = (String)topoConf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
+        if (Utils.isZkAuthenticationConfiguredStormServer(topoConf)){
+            ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
+            aclList.add(acl1);
+            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+            aclList.add(acl2);
+        }
+        return aclList;
+    }
+
+    public static String supervisorPath(String id) {
+        return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String assignmentPath(String id) {
+        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String blobstorePath(String key) {
+        return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String blobstoreMaxKeySequenceNumberPath(String key) {
+        return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String nimbusPath(String id) {
+        return NIMBUSES_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String stormPath(String id) {
+        return STORMS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String workerbeatStormRoot(String stormId) {
+        return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String workerbeatPath(String stormId, String node, Long port) {
+        return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String backpressureStormRoot(String stormId) {
+        return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String backpressurePath(String stormId, String node, Long port) {
+        return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String errorStormRoot(String stormId) {
+        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String errorPath(String stormId, String componentId) throws UnsupportedEncodingException {
+        return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
+    }
+
+    public static String lastErrorPath(String stormId, String componentId) throws UnsupportedEncodingException {
+        return errorPath(stormId, componentId) + "-last-error";
+    }
+
+    public static String credentialsPath(String stormId) {
+        return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String logConfigPath(String stormId) {
+        return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId) {
+        return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) {
+        return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType;
+    }
+
+    public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz){
+        if (serialized != null){
+            return Utils.deserialize(serialized, clazz);
+        }
+        return null;
+    }
+
+    //Ensures that we only return heartbeats for executors assigned to this worker
+    public static Map<ExecutorInfo, ClusterWorkerHeartbeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat){
+        Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhb = new HashMap<>();
+        Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
+        for (ExecutorInfo executor : executors){
+            if(executorStatsMap.containsKey(executor)){
+                executorWhb.put(executor, workerHeartbeat);
+            }
+        }
+        return executorWhb;
+    }
+    
+    // TO be remove
+    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+        if (map == null) {
+            return rtn;
+        }
+        for (Map.Entry<K, V> entry : map.entrySet()) {
+            K key = entry.getKey();
+            V val = entry.getValue();
+            List<K> list = rtn.get(val);
+            if (list == null) {
+                list = new ArrayList<K>();
+                rtn.put(entry.getValue(), list);
+            }
+            list.add(key);
+        }
+        return rtn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/682d31c8/storm-core/src/jvm/org/apache/storm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterState.java
index fdac92c..51e42ff 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterState.java
@@ -20,6 +20,9 @@ package org.apache.storm.cluster;
 import clojure.lang.APersistentMap;
 import clojure.lang.IFn;
 import java.util.List;
+
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.Callback;
 import org.apache.zookeeper.data.ACL;
 
 /**
@@ -47,7 +50,7 @@ public interface ClusterState {
      * @return is an id that can be passed to unregister(...) to unregister the
      * callback.
      */
-    String register(IFn callback);
+    String register(Callback callback);
 
     /**
      * Unregisters a callback function that was registered with register(...).
@@ -73,7 +76,7 @@ public interface ClusterState {
      * @param acls The acls to apply to the path. May be null.
      * @return path
      */
-    String mkdirs(String path, List<ACL> acls);
+    void mkdirs(String path, List<ACL> acls);
 
     /**
      * Deletes the node at a given path, and any child nodes that may exist.
@@ -99,7 +102,7 @@ public interface ClusterState {
      * register method. Very useful for catching updates to nodes.
      * @return The integer version of this node.
      */
-    Integer get_version(String path, boolean watch);
+    Integer get_version(String path, boolean watch) throws Exception;
 
     /**
      * Check if a node exists and optionally set a watch on the path.
@@ -197,7 +200,7 @@ public interface ClusterState {
      * @param listener A ClusterStateListener to handle changing cluster state
      * events.
      */
-    void add_listener(ClusterStateListener listener);
+    void add_listener(final ConnectionStateListener listener);
 
     /**
      * Force consistency on a path. Any writes committed on the path before

http://git-wip-us.apache.org/repos/asf/storm/blob/682d31c8/storm-core/src/jvm/org/apache/storm/cluster/DistributedClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/DistributedClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/DistributedClusterState.java
new file mode 100644
index 0000000..3e0beb1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/DistributedClusterState.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.Callback;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DistributedClusterState implements ClusterState {
+
+    private static Logger LOG = LoggerFactory.getLogger(DistributedClusterState.class);
+
+    private ConcurrentHashMap<String, Callback> callbacks = new ConcurrentHashMap<String, Callback>();
+    private CuratorFramework zkWriter;
+    private CuratorFramework zkReader;
+    private AtomicBoolean active;
+
+    private boolean isNimbus;
+    private Map authConf;
+    private Map<Object, Object> conf;
+
+    public DistributedClusterState(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        this.conf = conf;
+        this.authConf = authConf;
+        if (context.getDaemonType().equals(DaemonType.NIMBUS))
+            this.isNimbus = true;
+
+        // just mkdir STORM_ZOOKEEPER_ROOT dir
+        CuratorFramework zkTemp = mkZk();
+        String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+        Zookeeper.mkdirs(zkTemp, rootPath, acls);
+        zkTemp.close();
+
+        active = new AtomicBoolean(true);
+        zkWriter = mkZk(new WatcherCallBack() {
+            @Override
+            public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+                if (active.get()) {
+                    if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                        LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                    } else {
+                        LOG.info("Received event {} : {} : {}", state, type, path);
+                    }
+
+                    if (!type.equals(Watcher.Event.EventType.None)) {
+                        for (Map.Entry<String, Callback> e : callbacks.entrySet()) {
+                            Callback fn = e.getValue();
+                            fn.execute(type, path);
+                        }
+                    }
+                }
+            }
+        });
+        if (isNimbus) {
+            zkReader = mkZk(new WatcherCallBack() {
+                @Override
+                public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+                    if (active.get()) {
+                        if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                            LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                        } else {
+                            LOG.info("Received event {} : {} : {}", state, type, path);
+                        }
+
+                        if (!type.equals(Watcher.Event.EventType.None)) {
+                            for (Map.Entry<String, Callback> e : callbacks.entrySet()) {
+                                Callback fn = e.getValue();
+                                fn.execute(type, path);
+                            }
+                        }
+                    }
+                }
+            });
+        } else {
+            zkReader = zkWriter;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk() throws IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "", authConf);
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+    }
+
+    @Override
+    public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+
+    }
+
+    @Override
+    public String register(Callback callback) {
+        String id = UUID.randomUUID().toString();
+        this.callbacks.put(id, callback);
+        return id;
+    }
+
+    @Override
+    public void unregister(String id) {
+        this.callbacks.remove(id);
+    }
+
+    @Override
+    public String create_sequential(String path, byte[] data, List<ACL> acls) {
+        return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+    }
+
+    @Override
+    public void mkdirs(String path, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, path, acls);
+    }
+
+    @Override
+    public void delete_node(String path) {
+        Zookeeper.deleteNode(zkWriter, path);
+    }
+
+    @Override
+    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            try {
+                Zookeeper.setData(zkWriter, path, data);
+            } catch (RuntimeException e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+                    Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+                } else {
+                    throw e;
+                }
+            }
+
+        } else {
+            Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+        }
+    }
+
+    @Override
+    public Integer get_version(String path, boolean watch) throws Exception {
+        Integer ret = Zookeeper.getVersion(zkReader, path, watch);
+        return ret;
+    }
+
+    @Override
+    public boolean node_exists(String path, boolean watch) {
+        return Zookeeper.existsNode(zkWriter, path, watch);
+    }
+
+    @Override
+    public List<String> get_children(String path, boolean watch) {
+        return Zookeeper.getChildren(zkReader, path, watch);
+    }
+
+    @Override
+    public void close() {
+        this.active.set(false);
+        zkWriter.close();
+        if (isNimbus) {
+            zkReader.close();
+        }
+    }
+
+    @Override
+    public void set_data(String path, byte[] data, List<ACL> acls) {
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            Zookeeper.setData(zkWriter, path, data);
+        } else {
+            Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+            Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+        }
+    }
+
+    @Override
+    public byte[] get_data(String path, boolean watch) {
+        byte[] ret = null;
+
+        ret = Zookeeper.getData(zkReader, path, watch);
+
+        return ret;
+    }
+
+    @Override
+    public APersistentMap get_data_with_version(String path, boolean watch) {
+        return Zookeeper.getDataWithVersion(zkReader, path, watch);
+    }
+
+    @Override
+    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+        set_data(path, data, acls);
+    }
+
+    @Override
+    public byte[] get_worker_hb(String path, boolean watch) {
+        return Zookeeper.getData(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_worker_hb_children(String path, boolean watch) {
+        return get_children(path, watch);
+    }
+
+    @Override
+    public void delete_worker_hb(String path) {
+        delete_node(path);
+    }
+
+    @Override
+    public void add_listener(final ConnectionStateListener listener) {
+        Zookeeper.addListener(zkReader, new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                listener.stateChanged(curatorFramework, connectionState);
+            }
+        });
+    }
+
+    @Override
+    public void sync_path(String path) {
+        Zookeeper.syncPath(zkWriter, path);
+    }
+
+    // To be remove when finished port Util.clj
+    public static String parentPath(String path) {
+        List<String> toks = Zookeeper.tokenizePath(path);
+        int size = toks.size();
+        if (size > 0) {
+            toks.remove(size - 1);
+        }
+        return Zookeeper.toksToPath(toks);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/682d31c8/storm-core/src/jvm/org/apache/storm/cluster/StormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterState.java
new file mode 100644
index 0000000..b3c0f90
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterState.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.storm.callback.Callback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map;
+
+public interface StormClusterState {
+    public List<String> assignments(Callback callback);
+
+    public Assignment assignmentInfo(String stormId, Callback callback);
+
+    public APersistentMap assignmentInfoWithVersion(String stormId, Callback callback);
+
+    public Integer assignmentVersion(String stormId, Callback callback) throws Exception;
+
+    // returns key information under /storm/blobstore/key
+    public List<String> blobstoreInfo(String blobKey);
+
+    // returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
+    public List nimbuses();
+
+    // adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
+    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+
+    public List<String> activeStorms();
+
+    public StormBase stormBase(String stormId, Callback callback);
+
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+
+    public List<ProfileRequest> getWorkerProfileRequets(String stormId, NodeInfo nodeInfo, boolean isThrift);
+
+    public List<ProfileRequest> getTopologyProfileRequets(String stormId, boolean isThrift);
+
+    public void setWorkerProfileRequests(String stormId, ProfileRequest profileRequest);
+
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+
+    public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+
+    public List<String> supervisors(Callback callback);
+
+    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+
+    public void setupHeatbeats(String stormId);
+
+    public void teardownHeatbeats(String stormId);
+
+    public void teardownTopologyErrors(String stormId);
+
+    public List<String> heartbeatStorms();
+
+    public List<String> errorTopologies();
+
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+
+    public LogConfig topologyLogConfig(String stormId, Callback cb);
+
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+
+    public void removeWorkerHeartbeat(String stormId, String node, Long port);
+
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+
+    public void workerBackpressure(String stormId, String node, Long port, boolean on);
+
+    public boolean topologyBackpressure(String stormId, Callback callback);
+
+    public void setupBackpressure(String stormId);
+
+    public void removeWorkerBackpressure(String stormId, String node, Long port);
+
+    public void activateStorm(String stormId, StormBase stormBase);
+
+    public void updateStorm(String stormId, StormBase newElems);
+
+    public void removeStormBase(String stormId);
+
+    public void setAssignment(String stormId, Assignment info);
+
+    // sets up information related to key consisting of nimbus
+    // host:port and version info of the blob
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, String versionInfo);
+
+    public List<String> activeKeys();
+
+    public List<String> blobstore(Callback callback);
+
+    public void removeStorm(String stormId);
+
+    public void removeBlobstoreKey(String blobKey);
+
+    public void removeKeyVersion(String blobKey);
+
+    public void reportError(String stormId, String componentId, String node, Long port, String error);
+
+    public List<ErrorInfo> errors(String stormId, String componentId);
+
+    public ErrorInfo lastError(String stormId, String componentId);
+
+    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
+
+    public Credentials credentials(String stormId, Callback callback);
+
+    public void disconnect();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/682d31c8/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
new file mode 100644
index 0000000..93d29b2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.Callback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormZkClusterState implements StormClusterState {
+
+    private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
+
+    private ClusterState clusterState;
+
+    private ConcurrentHashMap<String, Callback> assignmentInfoCallback;
+    private ConcurrentHashMap<String, Callback> assignmentInfoWithVersionCallback;
+    private ConcurrentHashMap<String, Callback> assignmentVersionCallback;
+    private AtomicReference<Callback> supervisorsCallback;
+    // we want to reigister a topo directory getChildren callback for all workers of this dir
+    private ConcurrentHashMap<String, Callback> backPressureCallback;
+    private AtomicReference<Callback> assignmentsCallback;
+    private ConcurrentHashMap<String, Callback> stormBaseCallback;
+    private AtomicReference<Callback> blobstoreCallback;
+    private ConcurrentHashMap<String, Callback> credentialsCallback;
+    private ConcurrentHashMap<String, Callback> logConfigCallback;
+
+    private List<ACL> acls;
+    private String stateId;
+    private boolean solo;
+
+    public StormZkClusterState(Object clusterState, List<ACL> acls, ClusterStateContext context) throws Exception {
+
+        if (clusterState instanceof ClusterState) {
+            solo = false;
+            this.clusterState = (ClusterState) clusterState;
+        } else {
+
+            solo = true;
+            this.clusterState = new DistributedClusterState((Map) clusterState, (Map) clusterState, acls, context);
+        }
+
+        assignmentInfoCallback = new ConcurrentHashMap<>();
+        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+        assignmentVersionCallback = new ConcurrentHashMap<>();
+        supervisorsCallback = new AtomicReference<>();
+        backPressureCallback = new ConcurrentHashMap<>();
+        assignmentsCallback = new AtomicReference<>();
+        stormBaseCallback = new ConcurrentHashMap<>();
+        credentialsCallback = new ConcurrentHashMap<>();
+        logConfigCallback = new ConcurrentHashMap<>();
+        blobstoreCallback = new AtomicReference<>();
+
+        stateId = this.clusterState.register(new Callback() {
+
+            public <T> Object execute(T... args) {
+                if (args == null) {
+                    LOG.warn("Input args is null");
+                    return null;
+                } else if (args.length < 2) {
+                    LOG.warn("Input args is invalid, args length:" + args.length);
+                    return null;
+                }
+                String path = (String) args[1];
+
+                List<String> toks = Zookeeper.tokenizePath(path);
+                int size = toks.size();
+                if (size >= 1) {
+                    String params = null;
+                    String root = toks.get(0);
+                    Callback fn = null;
+                    if (root.equals(Cluster.ASSIGNMENTS_ROOT)) {
+                        if (size == 1) {
+                            // set null and get the old value
+                            issueCallback(assignmentsCallback);
+                        } else {
+                            issueMapCallback(assignmentInfoCallback, toks.get(1));
+                            issueMapCallback(assignmentVersionCallback, toks.get(1));
+                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+                        }
+
+                    } else if (root.equals(Cluster.SUPERVISORS_ROOT)) {
+                        issueCallback(supervisorsCallback);
+                    } else if (root.equals(Cluster.BLOBSTORE_ROOT)) {
+                        issueCallback(blobstoreCallback);
+                    } else if (root.equals(Cluster.STORMS_ROOT) && size > 1) {
+                        issueMapCallback(stormBaseCallback, toks.get(1));
+                    } else if (root.equals(Cluster.CREDENTIALS_ROOT) && size > 1) {
+                        issueMapCallback(credentialsCallback, toks.get(1));
+                    } else if (root.equals(Cluster.LOGCONFIG_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else if (root.equals(Cluster.BACKPRESSURE_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else {
+                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+                        Runtime.getRuntime().exit(30);
+                    }
+
+                }
+
+                return null;
+            }
+
+        });
+
+        String[] pathlist = { Cluster.ASSIGNMENTS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.SUPERVISORS_SUBTREE, Cluster.WORKERBEATS_SUBTREE,
+                Cluster.ERRORS_SUBTREE, Cluster.BLOBSTORE_SUBTREE, Cluster.NIMBUSES_SUBTREE, Cluster.LOGCONFIG_SUBTREE };
+        for (String path : pathlist) {
+            this.clusterState.mkdirs(path, acls);
+        }
+
+    }
+
+    protected void issueCallback(AtomicReference<Callback> cb) {
+        Callback callback = cb.getAndSet(null);
+        callback.execute();
+    }
+
+    protected void issueMapCallback(ConcurrentHashMap<String, Callback> callbackConcurrentHashMap, String key) {
+        Callback callback = callbackConcurrentHashMap.remove(key);
+        callback.execute();
+    }
+
+    @Override
+    public List<String> assignments(Callback callback) {
+        if (callback != null) {
+            assignmentsCallback.set(callback);
+        }
+        return clusterState.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public Assignment assignmentInfo(String stormId, Callback callback) {
+        if (callback != null) {
+            assignmentInfoCallback.put(stormId, callback);
+        }
+        byte[] serialized = clusterState.get_data(Cluster.assignmentPath(stormId), callback != null);
+        return Cluster.maybeDeserialize(serialized, Assignment.class);
+    }
+
+    @Override
+    public APersistentMap assignmentInfoWithVersion(String stormId, Callback callback) {
+        if (callback != null) {
+            assignmentInfoWithVersionCallback.put(stormId, callback);
+        }
+        APersistentMap aPersistentMap = clusterState.get_data_with_version(Cluster.assignmentPath(stormId), callback != null);
+        Assignment assignment = Cluster.maybeDeserialize((byte[]) aPersistentMap.get("data"), Assignment.class);
+        Integer version = (Integer) aPersistentMap.get("version");
+        APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+        return map;
+    }
+
+    @Override
+    public Integer assignmentVersion(String stormId, Callback callback) throws Exception {
+        if (callback != null) {
+            assignmentVersionCallback.put(stormId, callback);
+        }
+        return clusterState.get_version(Cluster.assignmentPath(stormId), callback != null);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstoreInfo(String blobKey) {
+        String path = Cluster.blobstorePath(blobKey);
+        clusterState.sync_path(path);
+        return clusterState.get_children(path, false);
+    }
+
+    @Override
+    public List nimbuses() {
+        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+        List<String> nimbusIds = clusterState.get_children(Cluster.NIMBUSES_SUBTREE, false);
+        for (String nimbusId : nimbusIds) {
+            byte[] serialized = clusterState.get_data(Cluster.nimbusPath(nimbusId), false);
+            NimbusSummary nimbusSummary = Cluster.maybeDeserialize(serialized, NimbusSummary.class);
+            nimbusSummaries.add(nimbusSummary);
+        }
+        return nimbusSummaries;
+    }
+
+    @Override
+    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+        // explicit delete for ephmeral node to ensure this session creates the entry.
+        clusterState.delete_node(Cluster.nimbusPath(nimbusId));
+        clusterState.add_listener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+                if (connectionState.equals(ConnectionState.RECONNECTED)) {
+                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+                    clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+                }
+
+            }
+        });
+
+        clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+    }
+
+    @Override
+    public List<String> activeStorms() {
+        return clusterState.get_children(Cluster.STORMS_SUBTREE, false);
+    }
+
+    @Override
+    public StormBase stormBase(String stormId, Callback callback) {
+        if (callback != null) {
+            stormBaseCallback.put(stormId, callback);
+        }
+        return Cluster.maybeDeserialize(clusterState.get_data(Cluster.stormPath(stormId), callback != null), StormBase.class);
+    }
+
+    @Override
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+        byte[] bytes = clusterState.get_worker_hb(Cluster.workerbeatPath(stormId, node, port), false);
+        if (bytes != null) {
+            return Cluster.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+        }
+        return null;
+    }
+
+    @Override
+    public List<ProfileRequest> getWorkerProfileRequets(String stormId, NodeInfo nodeInfo, boolean isThrift) {
+        List<ProfileRequest> requests = new ArrayList<>();
+        List<ProfileRequest> profileRequests = getTopologyProfileRequets(stormId, isThrift);
+        for (ProfileRequest profileRequest : profileRequests) {
+            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+            if (nodeInfo1.equals(nodeInfo))
+                requests.add(profileRequest);
+        }
+        return requests;
+    }
+
+    @Override
+    public List<ProfileRequest> getTopologyProfileRequets(String stormId, boolean isThrift) {
+        List<ProfileRequest> profileRequests = new ArrayList<>();
+        String path = Cluster.profilerConfigPath(stormId);
+        if (clusterState.node_exists(path, false)) {
+            List<String> strs = clusterState.get_children(path, false);
+            for (String str : strs) {
+                String childPath = path + Cluster.ZK_SEPERATOR + str;
+                byte[] raw = clusterState.get_data(childPath, false);
+                ProfileRequest request = Cluster.maybeDeserialize(raw, ProfileRequest.class);
+                if (request != null)
+                    profileRequests.add(request);
+            }
+        }
+        return profileRequests;
+    }
+
+    @Override
+    public void setWorkerProfileRequests(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
+        clusterState.set_data(path, Utils.serialize(profileRequest), acls);
+    }
+
+    @Override
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
+        clusterState.delete_node(path);
+    }
+
+    @Override
+    public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+        Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
+
+        Map<NodeInfo, List<List<Long>>> nodePortExecutors = Cluster.reverseMap(executorNodePort);
+        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+            String node = entry.getKey().get_node();
+            Long port = entry.getKey().get_port_iterator().next();
+            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+            List<ExecutorInfo> executorInfoList = new ArrayList<>();
+            for (List<Long> list : entry.getValue()) {
+                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+            }
+            executorWhbs.putAll(Cluster.convertExecutorBeats(executorInfoList, whb));
+        }
+        return executorWhbs;
+    }
+
+    @Override
+    public List<String> supervisors(Callback callback) {
+        if (callback != null) {
+            supervisorsCallback.set(callback);
+        }
+        return clusterState.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public SupervisorInfo supervisorInfo(String supervisorId) {
+        String path = Cluster.supervisorPath(supervisorId);
+        return Cluster.maybeDeserialize(clusterState.get_data(path, false), SupervisorInfo.class);
+    }
+
+    @Override
+    public void setupHeatbeats(String stormId) {
+        clusterState.mkdirs(Cluster.workerbeatStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void teardownHeatbeats(String stormId) {
+        try {
+            clusterState.delete_worker_hb(Cluster.workerbeatStormRoot(stormId));
+        } catch (Exception e) {
+            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown heartbeats for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void teardownTopologyErrors(String stormId) {
+        try {
+            clusterState.delete_node(Cluster.errorStormRoot(stormId));
+        } catch (Exception e) {
+            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown errors for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public List<String> heartbeatStorms() {
+        return clusterState.get_worker_hb_children(Cluster.WORKERBEATS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> errorTopologies() {
+        return clusterState.get_children(Cluster.ERRORS_SUBTREE, false);
+    }
+
+    @Override
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+        clusterState.set_data(Cluster.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+    }
+
+    @Override
+    public LogConfig topologyLogConfig(String stormId, Callback cb) {
+        String path = Cluster.logConfigPath(stormId);
+        return Cluster.maybeDeserialize(clusterState.get_data(path, cb != null), LogConfig.class);
+    }
+
+    @Override
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+        if (info != null) {
+            String path = Cluster.workerbeatPath(stormId, node, port);
+            clusterState.set_worker_hb(path, Utils.serialize(info), acls);
+        }
+    }
+
+    @Override
+    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+        String path = Cluster.workerbeatPath(stormId, node, port);
+        clusterState.delete_worker_hb(path);
+    }
+
+    @Override
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+        String path = Cluster.supervisorPath(supervisorId);
+        clusterState.set_ephemeral_node(path, Utils.serialize(info), acls);
+    }
+
+    // if znode exists and to be not on?, delete; if exists and on?, do nothing;
+    // if not exists and to be on?, create; if not exists and not on?, do nothing;
+    @Override
+    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+        String path = Cluster.backpressurePath(stormId, node, port);
+        boolean existed = clusterState.node_exists(path, false);
+        if (existed) {
+            if (on == false)
+                clusterState.delete_node(path);
+
+        } else {
+            if (on == true) {
+                clusterState.set_ephemeral_node(path, null, acls);
+            }
+        }
+    }
+
+    // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
+    @Override
+    public boolean topologyBackpressure(String stormId, Callback callback) {
+        if (callback != null) {
+            backPressureCallback.put(stormId, callback);
+        }
+        String path = Cluster.backpressureStormRoot(stormId);
+        List<String> childrens = clusterState.get_children(path, callback != null);
+        return childrens.size() > 0;
+
+    }
+
+    @Override
+    public void setupBackpressure(String stormId) {
+        clusterState.mkdirs(Cluster.backpressureStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void removeWorkerBackpressure(String stormId, String node, Long port) {
+        clusterState.delete_node(Cluster.backpressurePath(stormId, node, port));
+    }
+
+    @Override
+    public void activateStorm(String stormId, StormBase stormBase) {
+        String path = Cluster.stormPath(stormId);
+        clusterState.set_data(path, Utils.serialize(stormBase), acls);
+    }
+
+    // maybe exit some questions for updateStorm
+    @Override
+    public void updateStorm(String stormId, StormBase newElems) {
+
+        StormBase stormBase = stormBase(stormId, null);
+        if (stormBase.get_component_executors() != null) {
+            Map<String, Integer> componentExecutors = newElems.get_component_executors();
+            if (componentExecutors == null) {
+                componentExecutors = new HashMap<>();
+            }
+            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+                if (!componentExecutors.containsKey(entry.getKey())) {
+                    componentExecutors.put(entry.getKey(), entry.getValue());
+                }
+            }
+            if (componentExecutors.size() > 0)
+                newElems.set_component_executors(componentExecutors);
+        }
+
+        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+        if (oldComponentDebug == null)
+            oldComponentDebug = new HashMap<>();
+        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+        if (newComponentDebug == null)
+            newComponentDebug = new HashMap<>();
+        Set<String> debugOptionsKeys = oldComponentDebug.keySet();
+        debugOptionsKeys.addAll(newComponentDebug.keySet());
+        for (String key : debugOptionsKeys) {
+            boolean enable = false;
+            double samplingpct = 0;
+            if (oldComponentDebug.containsKey(key)) {
+                enable = oldComponentDebug.get(key).is_enable();
+                samplingpct = oldComponentDebug.get(key).get_samplingpct();
+            }
+            if (newComponentDebug.containsKey(key)) {
+                enable = newComponentDebug.get(key).is_enable();
+                samplingpct += newComponentDebug.get(key).get_samplingpct();
+            }
+            DebugOptions debugOptions = new DebugOptions();
+            debugOptions.set_enable(enable);
+            debugOptions.set_samplingpct(samplingpct);
+            ComponentDebug.put(key, debugOptions);
+        }
+        if (ComponentDebug.size() > 0) {
+            newElems.set_component_debug(ComponentDebug);
+        }
+        // only merge some parameters which are optional
+        if (newElems.get_launch_time_secs() == 0) {
+            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+        }
+        if (StringUtils.isBlank(newElems.get_owner())) {
+            newElems.set_owner(stormBase.get_owner());
+        }
+        if (newElems.get_topology_action_options() == null) {
+            newElems.set_topology_action_options(stormBase.get_topology_action_options());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        clusterState.set_data(Cluster.stormPath(stormId), Utils.serialize(newElems), acls);
+    }
+
+    @Override
+    public void removeStormBase(String stormId) {
+        clusterState.delete_node(Cluster.stormPath(stormId));
+    }
+
+    @Override
+    public void setAssignment(String stormId, Assignment info) {
+        clusterState.set_data(Cluster.assignmentPath(stormId), Utils.serialize(info), acls);
+    }
+
+    @Override
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, String versionInfo) {
+        String path = Cluster.blobstorePath(key) + Cluster.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "_" + versionInfo;
+        LOG.info("set-path: {}", path);
+        clusterState.mkdirs(Cluster.blobstorePath(key), acls);
+        clusterState.delete_node_blobstore(Cluster.blobstorePath(key), nimbusInfo.toHostPortString());
+        clusterState.set_ephemeral_node(path, null, acls);
+    }
+
+    @Override
+    public List<String> activeKeys() {
+        return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, false);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstore(Callback callback) {
+        if (callback != null) {
+            blobstoreCallback.set(callback);
+        }
+        clusterState.sync_path(Cluster.BLOBSTORE_SUBTREE);
+        return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, callback != null);
+
+    }
+
+    @Override
+    public void removeStorm(String stormId) {
+        clusterState.delete_node(Cluster.assignmentPath(stormId));
+        clusterState.delete_node(Cluster.credentialsPath(stormId));
+        clusterState.delete_node(Cluster.logConfigPath(stormId));
+        clusterState.delete_node(Cluster.profilerConfigPath(stormId));
+        removeStormBase(stormId);
+    }
+
+    @Override
+    public void removeBlobstoreKey(String blobKey) {
+        LOG.debug("remove key {}", blobKey);
+        clusterState.delete_node(Cluster.blobstorePath(blobKey));
+    }
+
+    @Override
+    public void removeKeyVersion(String blobKey) {
+        clusterState.delete_node(Cluster.blobstoreMaxKeySequenceNumberPath(blobKey));
+    }
+
+    @Override
+    public void reportError(String stormId, String componentId, String node, Long port, String error) {
+
+        try {
+            String path = Cluster.errorPath(stormId, componentId);
+            String lastErrorPath = Cluster.lastErrorPath(stormId, componentId);
+            ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
+            errorInfo.set_host(node);
+            errorInfo.set_port(port.intValue());
+            byte[] serData = Utils.serialize(errorInfo);
+            clusterState.mkdirs(path, acls);
+            clusterState.create_sequential(path + Cluster.ZK_SEPERATOR + "e", serData, acls);
+            clusterState.set_data(lastErrorPath, serData, acls);
+            List<String> childrens = clusterState.get_children(path, false);
+
+            Collections.sort(childrens);
+
+            while (childrens.size() >= 10) {
+                clusterState.delete_node(path + Cluster.ZK_SEPERATOR + childrens.remove(0));
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    @Override
+    public List<ErrorInfo> errors(String stormId, String componentId) {
+        List<ErrorInfo> errorInfos = new ArrayList<>();
+        try {
+            String path = Cluster.errorPath(stormId, componentId);
+            if (clusterState.node_exists(path, false)) {
+                List<String> childrens = clusterState.get_children(path, false);
+                for (String child : childrens) {
+                    String childPath = path + Cluster.ZK_SEPERATOR + child;
+                    ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(childPath, false), ErrorInfo.class);
+                    if (errorInfo != null)
+                        errorInfos.add(errorInfo);
+                }
+            }
+            Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+                public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+                    return Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs());
+                }
+            });
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        return errorInfos;
+    }
+
+    @Override
+    public ErrorInfo lastError(String stormId, String componentId) {
+        try {
+            String path = Cluster.lastErrorPath(stormId, componentId);
+            if (clusterState.node_exists(path, false)) {
+                ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(path, false), ErrorInfo.class);
+                return errorInfo;
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        return null;
+    }
+
+    @Override
+    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = Cluster.mkTopoOnlyAcls(topoConf);
+        String path = Cluster.credentialsPath(stormId);
+        clusterState.set_data(path, Utils.serialize(creds), aclList);
+
+    }
+
+    @Override
+    public Credentials credentials(String stormId, Callback callback) {
+        if (callback != null) {
+            credentialsCallback.put(stormId, callback);
+        }
+        String path = Cluster.credentialsPath(stormId);
+        return Cluster.maybeDeserialize(clusterState.get_data(path, callback != null), Credentials.class);
+
+    }
+
+    @Override
+    public void disconnect() {
+        clusterState.unregister(stateId);
+        if (solo)
+            clusterState.close();
+    }
+}


Mime
View raw message