storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [39/50] [abbrv] storm git commit: Merge remote-tracking branch 'apache/nimbus-ha-branch' into ha-merge
Date Mon, 24 Aug 2015 13:52:24 GMT
Merge remote-tracking branch 'apache/nimbus-ha-branch' into ha-merge

Conflicts:
	STORM-UI-REST-API.md
	conf/defaults.yaml
	storm-core/src/clj/backtype/storm/daemon/nimbus.clj
	storm-core/src/clj/backtype/storm/ui/core.clj
	storm-core/src/jvm/backtype/storm/Config.java
	storm-core/src/jvm/backtype/storm/generated/TopologySummary.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d1afefde
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d1afefde
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d1afefde

Branch: refs/heads/master
Commit: d1afefde51e34dc993591ad79d3fe217bef86f87
Parents: c54cea1 765e4c2
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Authored: Tue Aug 11 22:31:02 2015 -0700
Committer: Parth Brahmbhatt <brahmbhatt.parth@gmail.com>
Committed: Tue Aug 11 22:31:02 2015 -0700

----------------------------------------------------------------------
 STORM-UI-REST-API.md                            |  40 +-
 conf/defaults.yaml                              |   7 +-
 .../nimbus_ha_leader_election_and_failover.png  | Bin 0 -> 154316 bytes
 .../images/nimbus_ha_topology_submission.png    | Bin 0 -> 134180 bytes
 docs/documentation/nimbus-ha-design.md          | 217 +++++
 .../ha/codedistributor/HDFSCodeDistributor.java | 101 +++
 pom.xml                                         |  16 +
 storm-core/pom.xml                              |  16 +
 storm-core/src/clj/backtype/storm/cluster.clj   |  57 +-
 .../backtype/storm/command/shell_submission.clj |   9 +-
 storm-core/src/clj/backtype/storm/config.clj    |  15 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 243 ++++--
 .../clj/backtype/storm/daemon/supervisor.clj    |  53 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |  23 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  72 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |  94 ++-
 storm-core/src/jvm/backtype/storm/Config.java   |  42 +-
 .../storm/codedistributor/ICodeDistributor.java |  56 ++
 .../LocalFileSystemCodeDistributor.java         | 106 +++
 .../storm/generated/ClusterSummary.java         | 292 ++++---
 .../backtype/storm/generated/NimbusSummary.java | 796 +++++++++++++++++++
 .../backtype/storm/generated/TopologyInfo.java  | 221 +++--
 .../storm/generated/TopologySummary.java        | 107 ++-
 .../backtype/storm/nimbus/ILeaderElector.java   |  60 ++
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |  93 +++
 .../jvm/backtype/storm/utils/NimbusClient.java  |  63 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   9 +
 storm-core/src/py/storm/ttypes.py               | 613 ++++++++------
 storm-core/src/storm.thrift                     |  12 +-
 storm-core/src/ui/public/index.html             |  21 +
 .../public/templates/index-page-template.html   |  58 +-
 .../templates/topology-page-template.html       |   6 +
 .../test/clj/backtype/storm/cluster_test.clj    |  23 +-
 .../test/clj/backtype/storm/nimbus_test.clj     | 210 +++--
 .../backtype/storm/security/auth/auth_test.clj  |   4 +-
 .../storm/security/auth/nimbus_auth_test.clj    |  14 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   1 +
 .../test/clj/backtype/storm/utils_test.clj      |  12 -
 38 files changed, 3124 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --cc STORM-UI-REST-API.md
index 2836105,baaca84..35ba6ed
--- a/STORM-UI-REST-API.md
+++ b/STORM-UI-REST-API.md
@@@ -231,7 -263,13 +263,8 @@@ Response fields
  |bolts.errorLapsedSecs| Integer |Number of seconds elapsed since that last error happened
in a bolt|
  |bolts.errorWorkerLogLink| String | Link to the worker log that reported the exception |
  |bolts.emitted| Long |Number of tuples emitted|
 -|antiForgeryToken| String | CSRF token|
+ |replicationCount| Integer |Number of nimbus hosts on which this topology code is replicated|
  
 -Caution: users need to unescape the antiForgeryToken value before using this token to make
POST calls(simple-json escapes forward slashes)
 -[ISSUE-8](https://code.google.com/p/json-simple/issues/detail?id=8)
 -
 -
  Examples:
  
  ```no-highlight
@@@ -375,7 -413,9 +408,8 @@@ Sample response
          "storm.zookeeper.retry.intervalceiling.millis": 30000,
          "supervisor.enable": true,
          "storm.messaging.netty.server_worker_threads": 1
-     }
+     },
 -    "antiForgeryToken": "lAFTN\/5iSedRLwJeUNqkJ8hgYubRl2OxjXGoDf9A4Bt1nZY3rvJW0\/P4zqu9yAk\/LvDhlmn7gigw\/z8C",
+     "replicationCount": 1
  }
  ```
  

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/conf/defaults.yaml
----------------------------------------------------------------------
diff --cc conf/defaults.yaml
index c3fa372,49584f2..dd69eb6
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@@ -47,10 -47,12 +47,11 @@@ storm.auth.simple-white-list.users: [
  storm.auth.simple-acl.users: []
  storm.auth.simple-acl.users.commands: []
  storm.auth.simple-acl.admins: []
 -storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
 +storm.meta.serialization.delegate: "backtype.storm.serialization.GzipThriftSerializationDelegate"
+ storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
 -storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegate"
  
  ### nimbus.* configs are for the master
- nimbus.host: "localhost"
+ nimbus.seeds : ["localhost:6627"]
  nimbus.thrift.port: 6627
  nimbus.thrift.threads: 64
  nimbus.thrift.max_buffer_size: 1048576

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 63e385f,333feec..f75648a
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -284,11 -311,9 +311,12 @@@
                        (condp = subtree
                           ASSIGNMENTS-ROOT (if (empty? args)
                                               (issue-callback! assignments-callback)
 -                                             (issue-map-callback! assignment-info-callback
(first args)))
 +                                             (do
 +                                               (issue-map-callback! assignment-info-callback
(first args))
 +                                               (issue-map-callback! assignment-version-callback
(first args))
 +                                               (issue-map-callback! assignment-info-with-version-callback
(first args))))
                           SUPERVISORS-ROOT (issue-callback! supervisors-callback)
+                          CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback)
                           STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                           CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first
args))
                           ;; this should never happen

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index c88e36b,8a2c0fb..35154d3
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@@ -327,9 -341,32 +345,32 @@@
     (FileUtils/cleanDirectory (File. stormroot))
     (setup-jar conf tmp-jar-location stormroot)
     (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize
topology))
 -   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/javaSerialize
storm-conf))
 +   (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/toCompressedJsonConf
storm-conf))
+    (if (:code-distributor nimbus) (.upload (:code-distributor nimbus) stormroot storm-id))
     ))
  
+ (defn- wait-for-desired-code-replication [nimbus conf storm-id]
+   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
+         max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
+         total-wait-time (atom 0)
+         current-replication-count (atom (if (:code-distributor nimbus) (.getReplicationCount
(:code-distributor nimbus) storm-id) 0))]
+   (if (:code-distributor nimbus)
+     (while (and (> min-replication-count @current-replication-count)
+              (or (= -1 max-replication-wait-time)
+                (< @total-wait-time max-replication-wait-time)))
+         (sleep-secs 1)
+         (log-debug "waiting for desired replication to be achieved.
+           min-replication-count = " min-replication-count  " max-replication-wait-time =
" max-replication-wait-time
+           "current-replication-count = " @current-replication-count " total-wait-time "
@total-wait-time)
+         (swap! total-wait-time inc)
+         (reset! current-replication-count  (.getReplicationCount (:code-distributor nimbus)
storm-id))))
+   (if (< min-replication-count @current-replication-count)
+     (log-message "desired replication count "  min-replication-count " achieved,
+       current-replication-count" @current-replication-count)
+     (log-message "desired replication count of "  min-replication-count " not achieved but
we have hit the max wait time
+       so moving on with replication count = " @current-replication-count)
+     )))
+ 
  (defn- read-storm-topology [conf storm-id]
    (let [stormroot (master-stormdist-root conf storm-id)]
      (Utils/deserialize
@@@ -1258,12 -1323,18 +1330,20 @@@
                                                                  (:uptime-secs info)
                                                                  (count ports)
                                                                  (count (:used-ports info))
 -                                                                id )
 +                                                                id) ]
 +                                            (when-let [version (:version info)] (.set_version
sup-sum version))
 +                                            sup-sum
                                              ))
-               nimbus-uptime ((:uptime nimbus))
                bases (topology-bases storm-cluster-state)
+               nimbuses (.nimbuses storm-cluster-state)
+ 
+               ;;update the isLeader field for each nimbus summary
+               _ (let [leader (.getLeader (:leader-elector nimbus))
+                       leader-host (.getHost leader)
+                       leader-port (.getPort leader)]
+                   (doseq [nimbus-summary nimbuses]
+                     (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary))
(= leader-port (.get_port nimbus-summary))))))
+ 
                topology-summaries (dofor [[id base] bases :when base]
  	                                  (let [assignment (.assignment-info storm-cluster-state
id nil)
                                                  topo-summ (TopologySummary. id

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index be8f682,4fc219e..5f819bd
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -37,7 -37,11 +37,8 @@@
  
  (defmulti download-storm-code cluster-mode)
  (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
+ (defmulti mk-code-distributor cluster-mode)
  
 -;; used as part of a map from port to this
 -(defrecord LocalAssignment [storm-id executors])
 -
  (defprotocol SupervisorDaemon
    (get-id [this])
    (get-conf [this])

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/ui/core.clj
index b0e266c,02c3d90..950b88d
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@@ -25,8 -25,10 +25,9 @@@
    (:use [backtype.storm.ui helpers])
    (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                                ACKER-FAIL-STREAM-ID system-id? mk-authorization-handler]]])
 -  (:use [ring.middleware.anti-forgery])
    (:use [clojure.string :only [blank? lower-case trim]])
-   (:import [backtype.storm.utils Utils])
+   (:import [backtype.storm.utils Utils]
+            [backtype.storm.generated NimbusSummary])
    (:import [backtype.storm.generated ExecutorSpecificStats
              ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats
              ErrorInfo ClusterSummary SupervisorSummary TopologySummary
@@@ -293,18 -287,14 +286,21 @@@
                (bolt-comp-summs id))]
      (sort-by #(-> ^ExecutorSummary % .get_executor_info .get_task_start) ret)))
  
 -(defn worker-log-link [host port topology-id]
 +(defn worker-log-link [host port topology-id secure?]
    (let [fname (logs-filename topology-id port)]
 -    (url-format (str "http://%s:%s/log?file=%s")
 -          host (*STORM-CONF* LOGVIEWER-PORT) fname)))
 +    (if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
 +      (url-format "https://%s:%s/log?file=%s"
 +                  host
 +                  (*STORM-CONF* LOGVIEWER-HTTPS-PORT)
 +                  fname)
 +      (url-format "http://%s:%s/log?file=%s"
 +                  host
 +                  (*STORM-CONF* LOGVIEWER-PORT)
 +                  fname))))
  
+ (defn nimbus-log-link [host port]
+   (url-format "http://%s:%s/log?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
+ 
  (defn compute-executor-capacity
    [^ExecutorSummary e]
    (let [stats (.get_stats e)
@@@ -705,8 -711,8 +718,8 @@@
          "acked" (get-in stats [:acked k])
          "failed" (get-in stats [:failed k])})))
  
 -(defn topology-page [id window include-sys? user]
 +(defn topology-page [id window include-sys? user secure?]
-   (with-nimbus nimbus
+   (thrift/with-configured-nimbus-connection nimbus
      (let [window (if window window ":all-time")
            window-hint (window-hint window)
            summ (->> (doto
@@@ -738,10 -745,12 +752,11 @@@
          "windowHint" window-hint
          "msgTimeout" msg-timeout
          "topologyStats" (topology-stats id window (total-aggregate-stats spout-summs bolt-summs
include-sys?))
 -        "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?)
 -        "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys?)
 +        "spouts" (spout-comp id spout-comp-summs (.get_errors summ) window include-sys?
secure?)
 +        "bolts" (bolt-comp id bolt-comp-summs (.get_errors summ) window include-sys? secure?)
          "configuration" topology-conf
-         "visualizationTable" (stream-boxes visualizer-data)}))))
+         "visualizationTable" (stream-boxes visualizer-data)
 -        "antiForgeryToken" *anti-forgery-token*
+         "replicationCount" replication-count}))))
  
  (defn spout-output-stats
    [stream-summary window]
@@@ -885,11 -894,11 +900,11 @@@
       "inputStats" (bolt-input-stats stream-summary window)
       "outputStats" (bolt-output-stats stream-summary window)
       "executorStats" (bolt-executor-stats
 -                       (.get_id topology-info) executors window include-sys?)}))
 +                       (.get_id topology-info) executors window include-sys? secure?)}))
  
  (defn component-page
 -  [topology-id component window include-sys? user]
 +  [topology-id component window include-sys? user secure?]
-   (with-nimbus nimbus
+   (thrift/with-configured-nimbus-connection nimbus
      (let [window (if window window ":all-time")
            summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id)
            topology (.getTopology ^Nimbus$Client nimbus topology-id)
@@@ -935,7 -944,7 +950,7 @@@
       {:status status
        :headers (merge {"Cache-Control" "no-cache, no-store"
                         "Access-Control-Allow-Origin" "*"
--                       "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers,
Access-Controler-Allow-Origin, X-Requested-By, X-Csrf-Token, Authorization, X-Requested-With"}
++                       "Access-Control-Allow-Headers" "Content-Type, Access-Control-Allow-Headers,
Access-Controler-Allow-Origin, X-Requested-By, Authorization, X-Requested-With"}
                        (if (not-nil? callback) {"Content-Type" "application/javascript;charset=utf-8"}
                            {"Content-Type" "application/json;charset=utf-8"}))
        :body (if (not-nil? callback)
@@@ -965,15 -977,15 +983,15 @@@
    (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id &
m]
          (assert-authorized-user servlet-request "getTopology" (topology-config id))
          (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m)))
(:callback m)))
 -  (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request]}
id component & m]
 +  (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request
scheme]} id component & m]
         (let [user (.getUserName http-creds-handler servlet-request)]
           (assert-authorized-user servlet-request "getTopology" (topology-config id))
 -         (json-response (component-page id component (:window m) (check-include-sys? (:sys
m)) user) (:callback m))))
 -  (GET "/api/v1/token" [ & m]
 -       (json-response (format "{\"antiForgeryToken\": \"%s\"}" *anti-forgery-token*) (:callback
m) :serialize-fn identity))
 +         (json-response
 +          (component-page id component (:window m) (check-include-sys? (:sys m)) user (=
scheme :https))
 +          (:callback m))))
    (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id &
m]
+     (thrift/with-configured-nimbus-connection nimbus
      (assert-authorized-user servlet-request "activate" (topology-config id))
-     (with-nimbus nimbus
        (let [tplg (->> (doto
                          (GetInfoOptions.)
                          (.set_num_err_choice NumErrorsChoice/NONE))

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/Config.java
index 4628bd4,bd145d5..3cba37c
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@@ -1454,12 -1293,35 +1454,42 @@@ public class Config extends HashMap<Str
      public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class;
  
      /**
 +     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used
to tradeoff latency
 +     * vs. CPU usage
 +     */
 +    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
 +    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator;
 +
++    /**
+      * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should
be used by storm for code
+      * distribution.
+      */
+     public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
+     public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class;
+ 
+     /**
+      * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
+      * is allowed to perform topology activation tasks like setting up heartbeats/assignments
+      * and marking the topology as active. default is 0.
+      */
+     public static final String TOPOLOGY_MIN_REPLICATION_COUNT = "topology.min.replication.count";
+     public static final Object TOPOLOGY_MIN_REPLICATION_COUNT_SCHEMA = Number.class;
+ 
+     /**
+      * Maximum wait time for the nimbus host replication to achieve the nimbus.min.replication.count.
+      * Once this time is elapsed nimbus will go ahead and perform topology activation tasks
even
+      * if required nimbus.min.replication.count is not achieved. The default is 0 seconds,
a value of
+      * -1 indicates to wait for ever.
+      */
 -    public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "nimbus.max.replication.wait.time.sec";
++    public static final String TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC = "topology.max.replication.wait.time.sec";
+     public static final Object TOPOLOGY_MAX_REPLICATION_WAIT_TIME_SEC_SCHEMA = Number.class;
+ 
+     /**
+      * How often nimbus's background thread to sync code for missing topologies should run.
+      */
+     public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs";
+     public static final Object NIMBUS_CODE_SYNC_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
+ 
      public static void setClasspath(Map conf, String cp) {
          conf.put(Config.TOPOLOGY_CLASSPATH, cp);
      }

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --cc storm-core/src/storm.thrift
index a4b0b2a,839f6da..a585924
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@@ -153,13 -154,20 +154,21 @@@ struct SupervisorSummary 
    3: required i32 num_workers;
    4: required i32 num_used_workers;
    5: required string supervisor_id;
 +  6: optional string version = "VERSION_NOT_PROVIDED";
  }
  
+ struct NimbusSummary {
+   1: required string host;
+   2: required i32 port;
+   3: required i32 uptime_secs;
+   4: required bool isLeader;
+   5: required string version;
+ }
+ 
  struct ClusterSummary {
    1: required list<SupervisorSummary> supervisors;
-   2: required i32 nimbus_uptime_secs;
    3: required list<TopologySummary> topologies;
+   4: required list<NimbusSummary> nimbuses;
  }
  
  struct ErrorInfo {

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/index.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/nimbus_test.clj
index 00fb6d6,057dd30..cbd88c4
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@@ -1180,32 -1277,3 +1277,33 @@@
          (is (thrown-cause? InvalidTopologyException
            (submit-local-topology-with-opts nimbus "test" bad-config topology
                                             (SubmitOptions.))))))))
 +
 +(deftest test-stateless-with-scheduled-topology-to-be-killed
 +  ; tests regression of STORM-856
 +  (with-inprocess-zookeeper zk-port
 +    (with-local-tmp [nimbus-dir]
 +      (letlocals
 +        (bind conf (merge (read-storm-config)
 +                     {STORM-ZOOKEEPER-SERVERS ["localhost"]
 +                      STORM-CLUSTER-MODE "local"
 +                      STORM-ZOOKEEPER-PORT zk-port
 +                      STORM-LOCAL-DIR nimbus-dir}))
 +        (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +        (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
++        (sleep-secs 1)
 +        (bind topology (thrift/mk-topology
 +                         {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint
3)}
 +                         {}))
 +        (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
 +        ; make transition for topology t1 to be killed -> nimbus applies this event to
cluster state
 +        (.killTopology nimbus "t1")
 +        ; shutdown nimbus immediately to achieve nimbus doesn't handle event right now
 +        (.shutdown nimbus)
 +
 +        ; in startup of nimbus it reads cluster state and take proper actions
 +        ; in this case nimbus registers topology transition event to scheduler again
 +        ; before applying STORM-856 nimbus was killed with NPE
 +        (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
 +        (.shutdown nimbus)
 +        (.disconnect cluster-state)
 +        ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/d1afefde/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------


Mime
View raw message