storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/9] storm git commit: STROM-1263: port backtype.storm.command.kill-topology to java (And add in better java CLI)
Date Tue, 16 Feb 2016 18:33:37 GMT
Repository: storm
Updated Branches:
  refs/heads/master a759db38d -> 4a9278630


STROM-1263: port backtype.storm.command.kill-topology to java (And add in better java CLI)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b05aeb0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b05aeb0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b05aeb0e

Branch: refs/heads/master
Commit: b05aeb0eaadde8c919428bb2dbbffaa414b8470d
Parents: 265ff91
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Fri Feb 12 12:38:48 2016 -0600
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Fri Feb 12 12:38:48 2016 -0600

----------------------------------------------------------------------
 bin/storm.cmd                                   |  14 +-
 bin/storm.py                                    |   2 +-
 pom.xml                                         |   6 +
 storm-core/pom.xml                              |   9 +
 .../org/apache/storm/command/kill_topology.clj  |  29 ---
 .../src/jvm/org/apache/storm/command/CLI.java   | 229 +++++++++++++++++++
 .../org/apache/storm/command/KillTopology.java  |  51 +++++
 .../org/apache/storm/utils/NimbusClient.java    |  19 +-
 8 files changed, 321 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/bin/storm.cmd
----------------------------------------------------------------------
diff --git a/bin/storm.cmd b/bin/storm.cmd
index 6f4e934..8b3fa92 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -145,7 +145,7 @@
 
 :drpc
   set CLASS=org.apache.storm.daemon.drpc
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value
drpc.childopts > %CMD_TEMP_FILE%
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue
drpc.childopts > %CMD_TEMP_FILE%
   FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
@@ -160,7 +160,7 @@
   goto :eof
 
 :kill
-  set CLASS=org.apache.storm.command.kill_topology
+  set CLASS=org.apache.storm.command.KillTopology
   set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
   goto :eof
 
@@ -171,7 +171,7 @@
 
 :logviewer
   set CLASS=org.apache.storm.daemon.logviewer
-   "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value
logviewer.childopts > %CMD_TEMP_FILE%
+   "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue
logviewer.childopts > %CMD_TEMP_FILE%
   FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
@@ -183,7 +183,7 @@
 
 :nimbus
   set CLASS=org.apache.storm.daemon.nimbus
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value
nimbus.childopts > %CMD_TEMP_FILE%
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue
nimbus.childopts > %CMD_TEMP_FILE%
   FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
@@ -199,7 +199,7 @@
   goto :eof
 
 :remoteconfvalue
-  set CLASS=org.apache.storm.command.config_value
+  set CLASS=org.apache.storm.command.ConfigValue
   set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
   goto :eof
 
@@ -215,7 +215,7 @@
   
 :supervisor
   set CLASS=org.apache.storm.daemon.supervisor
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value
supervisor.childopts > %CMD_TEMP_FILE%
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue
supervisor.childopts > %CMD_TEMP_FILE%
   FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
@@ -228,7 +228,7 @@
 :ui
   set CLASS=org.apache.storm.ui.core
   set CLASSPATH=%CLASSPATH%;%STORM_HOME%
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value
ui.childopts > %CMD_TEMP_FILE%
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue
ui.childopts > %CMD_TEMP_FILE%
   FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index f2aca95..48160cc 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -278,7 +278,7 @@ def kill(*args):
         print_usage(command="kill")
         sys.exit(2)
     exec_storm_class(
-        "org.apache.storm.command.kill_topology",
+        "org.apache.storm.command.KillTopology",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 783018f..61a1ed9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@
         <commons-exec.version>1.1</commons-exec.version>
         <commons-fileupload.version>1.2.1</commons-fileupload.version>
         <commons-codec.version>1.6</commons-codec.version>
+        <commons-cli.version>1.3.1</commons-cli.version>
         <clj-time.version>0.8.0</clj-time.version>
         <curator.version>2.9.0</curator.version>
         <json-simple.version>1.1</json-simple.version>
@@ -492,6 +493,11 @@
                 <version>${kryo.version}</version>
             </dependency>
             <dependency>
+                <groupId>commons-cli</groupId>
+                <artifactId>commons-cli</artifactId>
+                <version>${commons-cli.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
                 <version>${commons-io.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 247d097..624e340 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -149,6 +149,10 @@
         
         <!--java-->
         <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
             <scope>compile</scope>
@@ -505,6 +509,7 @@
                             <include>org.apache.commons:commons-exec</include>
                             <include>org.apache.commons:commons-compress</include>
                             <include>org.apache.hadoop:hadoop-auth</include>
+                            <include>commons-cli:commons-cli</include>
                             <include>commons-io:commons-io</include>
                             <include>commons-codec:commons-codec</include>
                             <include>commons-fileupload:commons-fileupload</include>
@@ -643,6 +648,10 @@
                           <shadedPattern>org.apache.storm.shade.com.metamx.http.client</shadedPattern>
                         </relocation>
                         <relocation>
+                          <pattern>org.apache.commons.cli</pattern>
+                          <shadedPattern>org.apache.storm.shade.org.apache.commons.cli</shadedPattern>
+                        </relocation>
+                        <relocation>
                           <pattern>org.apache.commons.io</pattern>
                           <shadedPattern>org.apache.storm.shade.org.apache.commons.io</shadedPattern>
                         </relocation>

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
deleted file mode 100644
index 84e0a64..0000000
--- a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj
+++ /dev/null
@@ -1,29 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.kill-topology
-  (:use [clojure.tools.cli :only [cli]])
-  (:use [org.apache.storm thrift config log])
-  (:import [org.apache.storm.generated KillOptions])
-  (:gen-class))
-
-(defn -main [& args]
-  (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt
%)])
-        opts (KillOptions.)]
-    (if wait (.set_wait_secs opts wait))
-    (with-configured-nimbus-connection nimbus
-      (.killTopologyWithOpts nimbus name opts)
-      (log-message "Killed topology: " name)
-      )))

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/command/CLI.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
new file mode 100644
index 0000000..9813a3e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import org.apache.commons.cli.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLI {
+    private static final Logger LOG = LoggerFactory.getLogger(CLI.class);
+    private static class Opt {
+        final String s;
+        final String l;
+        final Object defaultValue;
+        final Parse parse;
+        final Assoc assoc;
+        public Opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) {
+            this.s = s;
+            this.l = l;
+            this.defaultValue = defaultValue;
+            this.parse = parse == null ? AS_STRING : parse;
+            this.assoc = assoc == null ? LAST_WINS : assoc;
+        }
+
+        public Object process(Object current, String value) {
+            return assoc.assoc(current, parse.parse(value));
+        }
+    }
+
+    private static class Arg {
+        final String name;
+        final Parse parse;
+        final Assoc assoc;
+        public Arg(String name, Parse parse, Assoc assoc) {
+            this.name = name;
+            this.parse = parse == null ? AS_STRING : parse;
+            this.assoc = assoc == null ? INTO_LIST : assoc;
+        }
+
+        public Object process(Object current, String value) {
+            return assoc.assoc(current, parse.parse(value));
+        }
+    }
+
+    public interface Parse {
+        /**
+         * Parse a String to the type you want it to be.
+         * @param value the String to parse
+         * @return the parsed value
+         */
+        public Object parse(String value);
+    }
+
+    public static final Parse AS_INT = new Parse() {
+        @Override
+        public Object parse(String value) {
+            return Integer.valueOf(value);
+        }
+    };
+
+    public static final Parse AS_STRING = new Parse() {
+        @Override
+        public Object parse(String value) {
+            return value;
+        }
+    };
+
+    public interface Assoc {
+        /**
+         * Associate a value into somthing else
+         * @param current what to put value into, will be null if no values have been added
yet.
+         * @param value what to add
+         * @return the result of combining the two
+         */
+        public Object assoc(Object current, Object value);
+    }
+
+    public static final Assoc LAST_WINS = new Assoc() {
+        @Override
+        public Object assoc(Object current, Object value) {
+            return value;
+        }
+    };
+
+    public static final Assoc FIRST_WINS = new Assoc() {
+        @Override
+        public Object assoc(Object current, Object value) {
+            return current == null ? value : current;
+        }
+    };
+
+    public static final Assoc INTO_LIST = new Assoc() {
+        @Override
+        public Object assoc(Object current, Object value) {
+            if (current == null) {
+                current = new ArrayList<Object>();
+            }
+            ((List<Object>)current).add(value);
+            return current;
+        }
+    };
+
+    public static class CLIBuilder {
+        private final ArrayList<Opt> opts = new ArrayList<>();
+        private final ArrayList<Arg> args = new ArrayList<>();
+
+        public CLIBuilder opt(String s, String l, Object defaultValue) {
+            return opt(s, l, defaultValue, null, null);
+        }
+ 
+        public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) {
+            return opt(s, l, defaultValue, parse, null);
+        }
+
+        public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc
assoc) {
+            opts.add(new Opt(s, l, defaultValue, parse, assoc));
+            return this;
+        }
+
+        public CLIBuilder arg(String name) {
+            return arg(name, null, null);
+        }
+
+        public CLIBuilder arg(String name, Assoc assoc) {
+            return arg(name, null, assoc);
+        }
+ 
+        public CLIBuilder arg(String name, Parse parse) {
+            return arg(name, parse, null);
+        }
+
+        public CLIBuilder arg(String name, Parse parse, Assoc assoc) {
+            args.add(new Arg(name, parse, assoc));
+            return this;
+        }
+
+        public Map<String, Object> parse(String[] rawArgs) throws Exception {
+            Options options = new Options();
+            for (Opt opt: opts) {
+                options.addOption(Option.builder(opt.s).longOpt(opt.l).hasArg().build());
+            }
+            DefaultParser parser = new DefaultParser();
+            CommandLine cl = parser.parse(options, rawArgs);
+            HashMap<String, Object> ret = new HashMap<>();
+            for (Opt opt: opts) {
+                Object current = null;
+                for (String val: cl.getOptionValues(opt.s)) {
+                    current = opt.process(current, val);
+                }
+                if (current == null) {
+                    current = opt.defaultValue;
+                }
+                ret.put(opt.s, current);
+            }
+            List<String> stringArgs = cl.getArgList();
+            if (args.size() > stringArgs.size()) {
+                throw new RuntimeException("Wrong number of arguments at least "+args.size()+"
expected, but only " + stringArgs.size() + " found");
+            }
+
+            int argIndex = 0;
+            int stringArgIndex = 0;
+            if (args.size() > 0) {
+                while (argIndex < args.size()) {
+                    Arg arg = args.get(argIndex);
+                    boolean isLastArg = (argIndex == (args.size() - 1));
+                    Object current = null;
+                    int maxStringIndex = isLastArg ? stringArgs.size() : (stringArgIndex
+ 1);
+                    for (;stringArgIndex < maxStringIndex; stringArgIndex++) {
+                        current = arg.process(current, stringArgs.get(stringArgIndex));
+                    }
+                    ret.put(arg.name, current);
+                    argIndex++;
+                }
+            } else {
+                ret.put("ARGS", stringArgs);
+            }
+            return ret;
+        }
+    }
+
+    public static CLIBuilder opt(String s, String l, Object defaultValue) {
+        return new CLIBuilder().opt(s, l, defaultValue);
+    }
+ 
+    public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) {
+        return new CLIBuilder().opt(s, l, defaultValue, parse);
+    }
+
+    public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc
assoc) {
+        return new CLIBuilder().opt(s, l, defaultValue, parse, assoc);
+    }
+
+    public CLIBuilder arg(String name) {
+        return new CLIBuilder().arg(name);
+    }
+ 
+    public CLIBuilder arg(String name, Assoc assoc) {
+        return new CLIBuilder().arg(name, assoc);
+    }
+
+    public CLIBuilder arg(String name, Parse parse) {
+        return new CLIBuilder().arg(name, parse);
+    }
+
+    public CLIBuilder arg(String name, Parse parse, Assoc assoc) {
+        return new CLIBuilder().arg(name, parse, assoc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
new file mode 100644
index 0000000..8f4d323
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import java.util.Map;
+
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.utils.NimbusClient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KillTopology {
+    private static final Logger LOG = LoggerFactory.getLogger(KillTopology.class);
+
+    public static void main(String [] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+                                    .arg("TOPO", CLI.FIRST_WINS)
+                                    .parse(args);
+        final String name = (String)cl.get("TOPO");
+        Integer wait = (Integer)cl.get("w");
+
+        final KillOptions opts = new KillOptions();
+        if (wait != null) {
+            opts.set_wait_secs(wait);
+        }
+        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+          @Override
+          public void run(Nimbus.Client nimbus) throws Exception {
+            nimbus.killTopologyWithOpts(name, opts);
+            LOG.info("Killed topology: {}", name);
+          }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b05aeb0e/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index f5bad6e..4c76b29 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -17,11 +17,11 @@
  */
 package org.apache.storm.utils;
 
-
 import org.apache.storm.Config;
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
 import com.google.common.collect.Lists;
@@ -29,6 +29,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.security.Principal;
 import java.util.List;
 import java.util.Map;
 
@@ -36,6 +37,22 @@ public class NimbusClient extends ThriftClient implements AutoCloseable
{
     private Nimbus.Client _client;
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
 
+    public interface WithNimbus {
+        public void run(Nimbus.Client client) throws Exception;
+    }
+
+    public static void withConfiguredClient(WithNimbus cb) throws Exception {
+        withConfiguredClient(cb, ConfigUtils.readStormConfig());
+    }
+
+    public static void withConfiguredClient(WithNimbus cb, Map conf) throws Exception {
+        ReqContext context = ReqContext.context();
+        Principal principal = context.principal();
+        String user = principal == null ? null : principal.getName();
+        try (NimbusClient client = getConfiguredClientAs(conf, user);) {
+            cb.run(client.getClient());
+        }
+    }
 
     public static NimbusClient getConfiguredClient(Map conf) {
         return getConfiguredClientAs(conf, null);


Mime
View raw message