storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/10] storm git commit: [STORM-1269] port backtype.storm.daemon.common to java
Date Thu, 10 Mar 2016 14:33:48 GMT
Repository: storm
Updated Branches:
  refs/heads/master 81fb727d8 -> 6390d18dd


http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
new file mode 100644
index 0000000..7680fbc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -0,0 +1,605 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.Thrift;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.generated.*;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.metric.EventLoggerBolt;
+import org.apache.storm.metric.MetricsConsumerBolt;
+import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.testing.NonRichBoltTracker;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.IPredicate;
+import org.apache.storm.utils.ThriftTopologyUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class StormCommon {
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static StormCommon _instance = new StormCommon();
+
+    /**
+     * Provide an instance of this class for delegates to use.  To mock out
+     * delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     * @param common a StormCommon instance
+     * @return the previously set instance
+     */
+    public static StormCommon setInstance(StormCommon common) {
+        StormCommon oldInstance = _instance;
+        _instance = common;
+        return oldInstance;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class);
+
+    public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID;
+    public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID;
+    public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID;
+    public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID;
+
+    public static final String SYSTEM_STREAM_ID = "__system";
+
+    public static final String EVENTLOGGER_COMPONENT_ID = "__eventlogger";
+    public static final String EVENTLOGGER_STREAM_ID = "__eventlog";
+
+    public static void startMetricsReporter(PreparableReporter report, Map conf) {
+        report.prepare(new MetricRegistry(), conf);
+        report.start();
+        LOG.info("Started statistics report plugin...");
+    }
+
+    public static void startMetricsReporters(Map conf) {
+        List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
+        for (PreparableReporter reporter : reporters) {
+            startMetricsReporter(reporter, conf);
+        }
+    }
+
+    public static String getTopologyNameById(String topologyId) {
+        String topologyName = null;
+        try {
+            topologyName = topologyIdToName(topologyId);
+        } catch (InvalidTopologyException e) {
+            LOG.error("Invalid topologyId=" + topologyId);
+        }
+        return topologyName;
+    }
+
+    /**
+     * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp
+     *
+     * @param topologyId
+     * @return
+     */
+    public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
+        String ret = null;
+        int index = topologyId.lastIndexOf('-');
+        if (index != -1 && index > 2) {
+            index = topologyId.lastIndexOf('-', index - 1);
+            if (index != -1 && index > 0)
+                ret = topologyId.substring(0, index);
+            else
+                throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
+        } else
+            throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
+        return ret;
+    }
+
+    public static String getStormId(IStormClusterState stormClusterState, final String topologyName) {
+        List<String> activeTopologys = stormClusterState.activeStorms();
+        IPredicate pred = new IPredicate<String>() {
+            @Override
+            public boolean test(String obj) {
+                return obj != null ? getTopologyNameById(obj).equals(topologyName) : false;
+            }
+        };
+        return Utils.findOne(pred, activeTopologys);
+    }
+
+    public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) {
+        return _instance.topologyBasesImpl(stormClusterState);
+    }
+
+    protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) {
+        List<String> activeTopologys = stormClusterState.activeStorms();
+        Map<String, StormBase> stormBases = new HashMap<String, StormBase>();
+        if (activeTopologys != null) {
+            for (String topologyId : activeTopologys) {
+                StormBase base = stormClusterState.stormBase(topologyId, null);
+                if (base != null) {
+                    stormBases.put(topologyId, base);
+                }
+            }
+        }
+        return stormBases;
+    }
+
+    public static void validateDistributedMode(Map conf) {
+        if (ConfigUtils.isLocalMode(conf)) {
+            throw new IllegalArgumentException("Cannot start server in local mode!");
+        }
+    }
+
+    private static void validateIds(StormTopology topology) throws InvalidTopologyException {
+        List<String> componentIds = new ArrayList<String>();
+
+        for (StormTopology._Fields field : Thrift.getTopologyFields()) {
+            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+                Object value = topology.getFieldValue(field);
+                if (value != null) {
+                    Map<String, Object> componentMap = (Map<String, Object>) value;
+                    componentIds.addAll(componentMap.keySet());
+
+                    for (String id : componentMap.keySet()) {
+                        if (Utils.isSystemId(id)) {
+                            throw new InvalidTopologyException(id + " is not a valid component id.");
+                        }
+                    }
+                    for (Object componentObj : componentMap.values()) {
+                        ComponentCommon common = getComponentCommon(componentObj);
+                        Set<String> streamIds = common.get_streams().keySet();
+                        for (String id : streamIds) {
+                            if (Utils.isSystemId(id)) {
+                                throw new InvalidTopologyException(id + " is not a valid stream id.");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        List<String> offending = Utils.getRepeat(componentIds);
+        if (offending.isEmpty() == false) {
+            throw new InvalidTopologyException("Duplicate component ids: " + offending);
+        }
+    }
+
+    private static boolean isEmptyInputs(ComponentCommon common) {
+        if (common == null) {
+            return true;
+        } else if (common.get_inputs() == null) {
+            return true;
+        } else {
+            return common.get_inputs().isEmpty();
+        }
+    }
+
+    public static Map<String, Object> allComponents(StormTopology topology) {
+        Map<String, Object> components = new HashMap<String, Object>();
+        List<StormTopology._Fields> topologyFields = Arrays.asList(Thrift.getTopologyFields());
+        for (StormTopology._Fields field : topologyFields) {
+            if (ThriftTopologyUtils.isWorkerHook(field) == false) {
+                components.putAll(((Map) topology.getFieldValue(field)));
+            }
+        }
+        return components;
+    }
+
+    public static Map componentConf(Object component) {
+        Map<Object, Object> conf = new HashMap<Object, Object>();
+        ComponentCommon common = getComponentCommon(component);
+        if (common != null) {
+            String jconf = common.get_json_conf();
+            if (jconf != null) {
+                conf.putAll((Map<Object, Object>) JSONValue.parse(jconf));
+            }
+        }
+        return conf;
+    }
+
+    public static void validateBasic(StormTopology topology) throws InvalidTopologyException {
+        validateIds(topology);
+
+        List<StormTopology._Fields> spoutFields = Arrays.asList(Thrift.getSpoutFields());
+        for (StormTopology._Fields field : spoutFields) {
+            Map<String, Object> spoutComponents = (Map<String, Object>) topology.getFieldValue(field);
+            if (spoutComponents != null) {
+                for (Object obj : spoutComponents.values()) {
+                    ComponentCommon common = getComponentCommon(obj);
+                    if (isEmptyInputs(common) == false) {
+                        throw new InvalidTopologyException("May not declare inputs for a spout");
+                    }
+                }
+            }
+        }
+
+        Map<String, Object> componentMap = allComponents(topology);
+        for (Object componentObj : componentMap.values()) {
+            Map conf = componentConf(componentObj);
+            ComponentCommon common = getComponentCommon(componentObj);
+            if (common != null) {
+                int parallelismHintNum = Thrift.getParallelismHint(common);
+                Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
+                if (taskNum != null && taskNum > 0 && parallelismHintNum <= 0) {
+                    throw new InvalidTopologyException("Number of executors must be greater than 0 when number of tasks is greater than 0");
+                }
+            }
+        }
+    }
+
+    private static Set<String> getStreamOutputFields(Map<String, StreamInfo> streams) {
+        Set<String> outputFields = new HashSet<String>();
+        if (streams != null) {
+            for (StreamInfo streamInfo : streams.values()) {
+                outputFields.addAll(streamInfo.get_output_fields());
+            }
+        }
+        return outputFields;
+    }
+
+    public static void validateStructure(StormTopology topology) throws InvalidTopologyException {
+        Map<String, Object> componentMap = allComponents(topology);
+        for (Map.Entry<String, Object> entry : componentMap.entrySet()) {
+            String componentId = entry.getKey();
+            ComponentCommon common = getComponentCommon(entry.getValue());
+            if (common != null) {
+                Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+                for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+                    String sourceStreamId = input.getKey().get_streamId();
+                    String sourceComponentId = input.getKey().get_componentId();
+                    if(componentMap.keySet().contains(sourceComponentId) == false) {
+                        throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent component [" + sourceComponentId + "]");
+                    }
+
+                    ComponentCommon sourceComponent = getComponentCommon(componentMap.get(sourceComponentId));
+                    if (sourceComponent == null || sourceComponent.get_streams().containsKey(sourceStreamId) == false) {
+                        throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from non-existent stream: " +
+                                "[" + sourceStreamId + "] of component [" + sourceComponentId + "]");
+                    }
+
+                    Grouping grouping = input.getValue();
+                    if (Thrift.groupingType(grouping) == Grouping._Fields.FIELDS) {
+                        List<String> fields = grouping.get_fields();
+                        Map<String, StreamInfo> streams = sourceComponent.get_streams();
+                        Set<String> sourceOutputFields = getStreamOutputFields(streams);
+                        if (sourceOutputFields.containsAll(fields) == false) {
+                            throw new InvalidTopologyException("Component: [" + componentId + "] subscribes from stream: [" + sourceStreamId  +"] of component " +
+                                    "[" + sourceComponentId + "] + with non-existent fields: " + fields);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        Set<String> boltIds = topology.get_bolts().keySet();
+        Set<String> spoutIds = topology.get_spouts().keySet();
+
+        for(String id : spoutIds) {
+            inputs.put(Utils.getGlobalStreamId(id, ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        }
+
+        for(String id : boltIds) {
+            inputs.put(Utils.getGlobalStreamId(id, ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+            inputs.put(Utils.getGlobalStreamId(id, ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id")));
+        }
+        return inputs;
+    }
+
+    public static String clusterId = null;
+    public static IBolt makeAckerBolt() {
+        return _instance.makeAckerBoltImpl();
+    }
+    public IBolt makeAckerBoltImpl() {
+        return new Acker();
+    }
+
+    public static void addAcker(Map conf, StormTopology topology) {
+        int ackerNum = Utils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);
+
+        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+        outputStreams.put(ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
+        outputStreams.put(ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id")));
+
+        Map<String, Object> ackerConf = new HashMap<String, Object>();
+        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
+        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+
+        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);
+
+        for(Bolt bolt : topology.get_bolts().values()) {
+            ComponentCommon common = bolt.get_common();
+            common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
+            common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
+        }
+
+        for (SpoutSpec spout : topology.get_spouts().values()) {
+            ComponentCommon common = spout.get_common();
+            Map spoutConf = componentConf(spout);
+            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+            common.set_json_conf(JSONValue.toJSONString(spoutConf));
+            common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
+            common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping());
+            common.put_to_inputs(Utils.getGlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping());
+        }
+
+        topology.put_to_bolts(ACKER_COMPONENT_ID, acker);
+    }
+
+    public static ComponentCommon getComponentCommon(Object component) {
+        if (component == null) {
+            return null;
+        }
+
+        ComponentCommon common = null;
+        if (component instanceof StateSpoutSpec) {
+            common = ((StateSpoutSpec) component).get_common();
+        } else if (component instanceof SpoutSpec) {
+            common = ((SpoutSpec) component).get_common();
+        } else if (component instanceof Bolt) {
+            common = ((Bolt) component).get_common();
+        }
+        return common;
+    }
+
+    public static void addMetricStreams(StormTopology topology) {
+        for (Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            if (common != null) {
+                StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));
+                common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);
+            }
+        }
+    }
+
+    public static void addSystemStreams(StormTopology topology) {
+        for (Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            if (common != null) {
+                StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("event"));
+                common.put_to_streams(SYSTEM_STREAM_ID, streamInfo);
+            }
+        }
+    }
+
+    public static List<String> eventLoggerBoltFields() {
+        List<String> fields = Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID, EventLoggerBolt.FIELD_TS,
+                EventLoggerBolt.FIELD_VALUES);
+        return fields;
+    }
+
+    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        Set<String> allIds = new HashSet<String>();
+        allIds.addAll(topology.get_bolts().keySet());
+        allIds.addAll(topology.get_spouts().keySet());
+
+        for(String id : allIds) {
+            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
+        }
+        return inputs;
+    }
+
+    public static void addEventLogger(Map conf, StormTopology topology) {
+        Integer numExecutors = Utils.parseInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS), Utils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)));
+        HashMap<String, Object> componentConf = new HashMap<String, Object>();
+        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
+        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Utils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
+        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);
+
+        for(Object component : allComponents(topology).values()) {
+            ComponentCommon common = getComponentCommon(component);
+            if (common != null) {
+                common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
+            }
+        }
+        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
+    }
+
+    public static Map<String, Bolt> metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
+        Map<String, Bolt> metricsConsumerBolts = new HashMap<String, Bolt>();
+
+        Set<String> componentIdsEmitMetrics = new HashSet<String>();
+        componentIdsEmitMetrics.addAll(allComponents(topology).keySet());
+        componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);
+
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
+        for (String componentId : componentIdsEmitMetrics) {
+            inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());
+        }
+
+        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
+        if (registerInfo != null) {
+            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+            for (Map<String, Object> info : registerInfo) {
+                String className = (String) info.get("class");
+                Object argument = info.get("argument");
+                Integer phintNum = Utils.parseInt(info.get("parallelism.hint"), 1);
+                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+                metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+                Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf);
+
+                String id = className;
+                if (classOccurrencesMap.containsKey(className)) {
+                    // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+                    int occurrenceNum = classOccurrencesMap.get(className);
+                    occurrenceNum++;
+                    classOccurrencesMap.put(className, occurrenceNum);
+                    id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;
+                } else {
+                    classOccurrencesMap.put(className, 1);
+                }
+                metricsConsumerBolts.put(id, metricsConsumerBolt);
+            }
+        }
+        return metricsConsumerBolts;
+    }
+
+    public static void addMetricComponents(Map conf, StormTopology topology) {
+        Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);
+        for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {
+            topology.put_to_bolts(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public static void addSystemComponents(Map conf, StormTopology topology) {
+        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
+        outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+        outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+        outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
+
+        Map<String, Object> boltConf = new HashMap<String, Object>();
+        boltConf.put(Config.TOPOLOGY_TASKS, 0);
+
+        Bolt systemBoltSpec = Thrift.prepareSerializedBoltDetails(null, new SystemBolt(), outputStreams, 0, boltConf);
+        topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, systemBoltSpec);
+    }
+
+    public static StormTopology systemTopology(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+        return _instance.systemTopologyImpl(stormConf, topology);
+    }
+
+    protected StormTopology systemTopologyImpl(Map stormConf, StormTopology topology) throws InvalidTopologyException {
+        validateBasic(topology);
+
+        StormTopology ret = topology.deepCopy();
+        addAcker(stormConf, ret);
+        addEventLogger(stormConf, ret);
+        addMetricComponents(stormConf, ret);
+        addSystemComponents(stormConf, ret);
+        addMetricStreams(ret);
+        addSystemStreams(ret);
+
+        validateStructure(ret);
+
+        return ret;
+    }
+
+    public static boolean hasAckers(Map stormConf) {
+        Integer ackerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+        if (ackerNum == null || ackerNum > 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public static boolean hasEventLoggers(Map stormConf) {
+        Integer eventLoggerNum = Utils.parseInt(stormConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
+        if (eventLoggerNum == null || eventLoggerNum > 0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public static int numStartExecutors(Object component) throws InvalidTopologyException {
+        ComponentCommon common = getComponentCommon(component);
+        if (common == null) {
+            throw new InvalidTopologyException("unknown component type " + component.getClass().getName());
+        }
+        int parallelismHintNum = Thrift.getParallelismHint(common);
+        return parallelismHintNum;
+    }
+
+    public static Map stormTaskInfo(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+        return _instance.stormTaskInfoImpl(userTopology, stormConf);
+    }
+    /*
+     * Returns map from task -> componentId
+     */
+    protected Map stormTaskInfoImpl(StormTopology userTopology, Map stormConf) throws InvalidTopologyException {
+        Map<Integer, String> taskIdToComponentId = new HashMap<Integer, String>();
+
+        StormTopology systemTopology = systemTopology(stormConf, userTopology);
+        Map<String, Object> components = allComponents(systemTopology);
+        Map<String, Integer> componentIdToTaskNum = new TreeMap<String, Integer>();
+        for (Map.Entry<String, Object> entry : components.entrySet()) {
+            Map conf = componentConf(entry.getValue());
+            Integer taskNum = Utils.parseInt(conf.get(Config.TOPOLOGY_TASKS));
+            if (taskNum != null) {
+                componentIdToTaskNum.put(entry.getKey(), taskNum);
+            }
+        }
+
+        int taskId = 1;
+        for (Map.Entry<String, Integer> entry : componentIdToTaskNum.entrySet()) {
+            String componentId = entry.getKey();
+            Integer taskNum = entry.getValue();
+            while (taskNum > 0) {
+                taskIdToComponentId.put(taskId, componentId);
+                taskNum--;
+                taskId++;
+            }
+        }
+        return taskIdToComponentId;
+    }
+
+    public static List<Integer> executorIdToTasks(List<Long> executorId) {
+        List<Integer> taskIds = new ArrayList<Integer>();
+        int taskId = executorId.get(0).intValue();
+        while (taskId <= executorId.get(1).intValue()) {
+            taskIds.add(taskId);
+            taskId++;
+        }
+        return taskIds;
+    }
+
+    public static Map<Integer, NodeInfo> taskToNodeport(Map<List<Long>, NodeInfo> executorToNodeport) {
+        Map<Integer, NodeInfo> tasksToNodeport = new HashMap<Integer, NodeInfo>();
+        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodeport.entrySet()) {
+            List<Integer> taskIds = executorIdToTasks(entry.getKey());
+            for (Integer taskId : taskIds) {
+                tasksToNodeport.put(taskId, entry.getValue());
+            }
+        }
+        return tasksToNodeport;
+    }
+
+    public static IAuthorizer mkAuthorizationHandler(String klassName, Map conf) {
+        return _instance.mkAuthorizationHandlerImpl(klassName, conf);
+    }
+
+    protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) {
+        IAuthorizer aznHandler = null;
+        try {
+            if (klassName != null) {
+                Class aznClass = Class.forName(klassName);
+                if (aznClass != null) {
+                    aznHandler = (IAuthorizer) aznClass.newInstance();
+                    if (aznHandler != null) {
+                        aznHandler.prepare(conf);
+                    }
+                    LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to make authorization handler, klassName:{}", klassName);
+        }
+
+        return aznHandler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/jvm/org/apache/storm/utils/StormCommonInstaller.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/StormCommonInstaller.java b/storm-core/src/jvm/org/apache/storm/utils/StormCommonInstaller.java
new file mode 100644
index 0000000..c9a0add
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/StormCommonInstaller.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import org.apache.storm.daemon.StormCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Just for testing purpose. After the migration of testing.clj. This class could be removed.
+ */
+public class StormCommonInstaller implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(StormCommonInstaller.class);
+    private StormCommon _oldInstance;
+    private StormCommon _curInstance;
+
+    public StormCommonInstaller(StormCommon instance) {
+        _oldInstance = StormCommon.setInstance(instance);
+        _curInstance = instance;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (StormCommon.setInstance(_oldInstance) != _curInstance) {
+            throw new IllegalStateException(
+                    "Instances of this resource must be closed in reverse order of opening.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index bc12e8e..2de296e 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2307,4 +2307,54 @@ public class Utils {
     public static long bitXor(Long a, Long b) {
         return a ^ b;
     }
+
+    public static Integer parseInt(Object o) {
+        if (o == null) {
+            return null;
+        }
+
+        if (o instanceof String) {
+            return Integer.parseInt(String.valueOf(o));
+        } else if (o instanceof Long) {
+            long value = (Long) o;
+            return (int) value;
+        } else if (o instanceof Integer) {
+            return (Integer) o;
+        } else {
+            throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
+        }
+    }
+
+    public static Integer parseInt(Object o, int defaultValue) {
+        if (o == null) {
+            return defaultValue;
+        }
+
+        if (o instanceof String) {
+            return Integer.parseInt(String.valueOf(o));
+        } else if (o instanceof Long) {
+            long value = (Long) o;
+            return (int) value;
+        } else if (o instanceof Integer) {
+            return (Integer) o;
+        } else {
+            return defaultValue;
+        }
+    }
+
+    public static List<String> getRepeat(List<String> list) {
+        List<String> rtn = new ArrayList<String>();
+        Set<String> idSet = new HashSet<String>();
+
+        for (String id : list) {
+            if (idSet.contains(id)) {
+                rtn.add(id);
+            } else {
+                idSet.add(id);
+            }
+        }
+
+        return rtn;
+    }
 }
+

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 697bdae..3814429 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -24,9 +24,9 @@
   (:import [org.apache.storm.cluster StormClusterStateImpl])
   (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm testing config util])
-  (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.utils Utils]))
+  (:import [org.apache.storm.utils Utils]) 
+  (:import [org.apache.storm.daemon StormCommon]))
 
 (deftest test-basic-topology
   (doseq [zmq-on? [true false]]
@@ -582,7 +582,7 @@
                                               }
                                              (:topology tracked))
             _ (advance-cluster-time cluster 11)
-            storm-id (get-storm-id state "test-errors")
+            storm-id (StormCommon/getStormId state "test-errors")
             errors-count (fn [] (count (.errors state storm-id "2")))]
 
         (is (nil? (clojurify-error (.lastError state storm-id "2"))))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index 6a3d3ca..7fffd34 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -1,4 +1,3 @@
-
 ;; Licensed to the Apache Software Foundation (ASF) under one
 ;; or more contributor license agreements.  See the NOTICE file
 ;; distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 3670fd1..b63ac1f 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -25,7 +25,7 @@
            [org.apache.storm Thrift])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.scheduler INimbus])
-  (:import [org.mockito Mockito])
+  (:import [org.mockito Mockito Matchers])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
   (:import [org.apache.storm.testing.staticmocking MockedCluster])
@@ -36,14 +36,14 @@
   (:import [java.util HashMap])
   (:import [java.io File])
   (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate]
-           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
+           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller CommonInstaller])
   (:import [org.apache.storm.zookeeper Zookeeper])
-  (:import [org.apache.commons.io FileUtils]
-           [org.json.simple JSONValue])
+  (:import [org.apache.commons.io FileUtils])
+  (:import [org.json.simple JSONValue])
+  (:import [org.apache.storm.daemon StormCommon])
   (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
   (:use [org.apache.storm testing MockAutoCred util config log converter])
-  (:use [org.apache.storm.daemon common])
-  (:require [conjure.core])
+    (:require [conjure.core] [org.apache.storm.daemon.worker :as worker])
 
   (:use [conjure core]))
 
@@ -55,23 +55,23 @@
          nil))
 
 (defn storm-component->task-info [cluster storm-name]
-  (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
+  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
         nimbus (:nimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
-        (storm-task-info (from-json (.getTopologyConf nimbus storm-id)))
+        (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
         (Utils/reverseMap)
         clojurify-structure)))
 
 (defn getCredentials [cluster storm-name]
-  (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)]
+  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)]
     (clojurify-crdentials (.credentials (:storm-cluster-state cluster) storm-id nil))))
 
 (defn storm-component->executor-info [cluster storm-name]
-  (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
+  (let [storm-id (StormCommon/getStormId (:storm-cluster-state cluster) storm-name)
         nimbus (:nimbus cluster)
         storm-conf (from-json (.getTopologyConf nimbus storm-id))
         topology (.getUserTopology nimbus storm-id)
-        task->component (storm-task-info topology storm-conf)
+        task->component (clojurify-structure (StormCommon/stormTaskInfo topology storm-conf))
         state (:storm-cluster-state cluster)
         get-component (comp task->component first)]
     (->> (clojurify-assignment (.assignmentInfo state storm-id nil))
@@ -83,13 +83,13 @@
          clojurify-structure)))
 
 (defn storm-num-workers [state storm-name]
-  (let [storm-id (get-storm-id state storm-name)
+  (let [storm-id (StormCommon/getStormId state storm-name)
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
     (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))
     ))
 
 (defn topology-nodes [state storm-name]
-  (let [storm-id (get-storm-id state storm-name)
+  (let [storm-id (StormCommon/getStormId state storm-name)
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
     (->> assignment
          :executor->node+port
@@ -99,7 +99,7 @@
          )))
 
 (defn topology-slots [state storm-name]
-  (let [storm-id (get-storm-id state storm-name)
+  (let [storm-id (StormCommon/getStormId state storm-name)
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
     (->> assignment
          :executor->node+port
@@ -110,7 +110,7 @@
 ;TODO: when translating this function, don't call map-val, but instead use an inline for loop.
 ; map-val is a temporary kluge for clojure.
 (defn topology-node-distribution [state storm-name]
-  (let [storm-id (get-storm-id state storm-name)
+  (let [storm-id (StormCommon/getStormId state storm-name)
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
     (->> assignment
          :executor->node+port
@@ -154,7 +154,8 @@
 (defn task-ids [cluster storm-id]
   (let [nimbus (:nimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
-        (storm-task-info (from-json (.getTopologyConf nimbus storm-id)))
+        (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
+        clojurify-structure
         keys)))
 
 (defn topology-executors [cluster storm-id]
@@ -174,14 +175,17 @@
     (= (count combined) (count (set combined)))
     ))
 
+(defn executor->tasks [executor-id]
+  clojurify-structure (StormCommon/executorIdToTasks executor-id))
+
 (defnk check-consistency [cluster storm-name :assigned? true]
   (let [state (:storm-cluster-state cluster)
-        storm-id (get-storm-id state storm-name)
+        storm-id (StormCommon/getStormId state storm-name)
         task-ids (task-ids cluster storm-id)
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
         executor->node+port (:executor->node+port assignment)
-        task->node+port (to-task->node+port executor->node+port)
-        assigned-task-ids (mapcat executor-id->tasks (keys executor->node+port))
+        task->node+port (worker/task->node_port executor->node+port)
+        assigned-task-ids (mapcat executor->tasks (keys executor->node+port))
         all-nodes (set (map first (vals executor->node+port)))]
     (when assigned?
       (is (= (sort task-ids) (sort assigned-task-ids)))
@@ -446,7 +450,7 @@
           _ (advance-cluster-time cluster 11)
           task-info (storm-component->task-info cluster "mystorm")
           executor-info (->> (storm-component->executor-info cluster "mystorm")
-                             (map-val #(map executor-id->tasks %)))]
+                             (map-val #(map executor->tasks %)))]
       (check-consistency cluster "mystorm")
       (is (= 5 (count (task-info "1"))))
       (check-distribution (executor-info "1") [2 2 1])
@@ -506,7 +510,7 @@
                          {}))
         (bind state (:storm-cluster-state cluster))
         (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
-        (bind storm-id (get-storm-id state "test"))
+        (bind storm-id (StormCommon/getStormId state "test"))
         (advance-cluster-time cluster 5)
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
@@ -517,7 +521,7 @@
         (advance-cluster-time cluster 35)
         ;; kill topology read on group
         (submit-local-topology (:nimbus cluster) "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
-        (bind storm-id-killgroup (get-storm-id state "killgrouptest"))
+        (bind storm-id-killgroup (StormCommon/getStormId state "killgrouptest"))
         (advance-cluster-time cluster 5)
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killgroup nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil))))
@@ -528,7 +532,7 @@
         (advance-cluster-time cluster 35)
         ;; kill topology can't read
         (submit-local-topology (:nimbus cluster) "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
-        (bind storm-id-killnoread (get-storm-id state "killnoreadtest"))
+        (bind storm-id-killnoread (StormCommon/getStormId state "killnoreadtest"))
         (advance-cluster-time cluster 5)
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id-killnoread nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil))))
@@ -541,19 +545,19 @@
         ;; active topology can read
         (submit-local-topology (:nimbus cluster) "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
         (advance-cluster-time cluster 11)
-        (bind storm-id2 (get-storm-id state "2test"))
+        (bind storm-id2 (StormCommon/getStormId state "2test"))
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id2 nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id2 nil))))
         ;; active topology can not read
         (submit-local-topology (:nimbus cluster) "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice"]} topology)
         (advance-cluster-time cluster 11)
-        (bind storm-id3 (get-storm-id state "testnoread"))
+        (bind storm-id3 (StormCommon/getStormId state "testnoread"))
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id3 nil))))
         ;; active topology can read based on group
         (submit-local-topology (:nimbus cluster) "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
         (advance-cluster-time cluster 11)
-        (bind storm-id4 (get-storm-id state "testreadgroup"))
+        (bind storm-id4 (StormCommon/getStormId state "testreadgroup"))
         (is (not-nil? (clojurify-storm-base (.stormBase state storm-id4 nil))))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
         ;; at this point have 1 running, 1 killed topo
@@ -602,7 +606,7 @@
                        {}))
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (advance-cluster-time cluster 15)
       (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
       (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil))))
@@ -627,7 +631,7 @@
       (advance-cluster-time cluster 11)
       (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
       (advance-cluster-time cluster 11)
-      (bind storm-id (get-storm-id state "2test"))
+      (bind storm-id (StormCommon/getStormId state "2test"))
       (is (not-nil? (clojurify-storm-base (.stormBase state storm-id nil))))
       (.killTopology (:nimbus cluster) "2test")
       (is (thrown? AlreadyAliveException (submit-local-topology (:nimbus cluster) "2test" {} topology)))
@@ -641,7 +645,7 @@
       (is (= 0 (count (.heartbeatStorms state))))
 
       (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
-      (bind storm-id3 (get-storm-id state "test3"))
+      (bind storm-id3 (StormCommon/getStormId state "test3"))
       (advance-cluster-time cluster 11)
       (.removeStorm state storm-id3)
       (is (nil? (clojurify-storm-base (.stormBase state storm-id3 nil))))
@@ -655,7 +659,7 @@
       (wait-until-cluster-waiting cluster)
 
       (submit-local-topology (:nimbus cluster) "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
-      (bind storm-id3 (get-storm-id state "test3"))
+      (bind storm-id3 (StormCommon/getStormId state "test3"))
 
       (advance-cluster-time cluster 11)
       (bind executor-id (first (topology-executors cluster storm-id3)))
@@ -672,7 +676,7 @@
       (submit-local-topology (:nimbus cluster) "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
       (advance-cluster-time cluster 11)
       (.killTopologyWithOpts (:nimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs 10)))
-      (bind storm-id4 (get-storm-id state "test4"))
+      (bind storm-id4 (StormCommon/getStormId state "test4"))
       (advance-cluster-time cluster 9)
       (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id4 nil))))
       (advance-cluster-time cluster 2)
@@ -698,7 +702,7 @@
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
       (advance-cluster-time cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -819,7 +823,7 @@
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
       (advance-cluster-time cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -874,7 +878,7 @@
       (bind state (:storm-cluster-state cluster))
       (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4} topology)  ; distribution should be 2, 2, 2, 3 ideally
       (advance-cluster-time cluster 11)
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (bind slot-executors (slot-assignments cluster storm-id))
       (check-executor-distribution slot-executors [9])
       (check-consistency cluster "test")
@@ -927,7 +931,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
       (advance-cluster-time cluster 11)
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (add-supervisor cluster :ports 3)
       (add-supervisor cluster :ports 3)
 
@@ -975,7 +979,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
       (advance-cluster-time cluster 11)
-      (bind storm-id (get-storm-id state "test"))
+      (bind storm-id (StormCommon/getStormId state "test"))
       (bind checker (fn [distribution]
                       (check-executor-distribution
                         (slot-assignments cluster storm-id)
@@ -1010,7 +1014,7 @@
       (check-consistency cluster "test")
 
       (bind executor-info (->> (storm-component->executor-info cluster "test")
-                               (map-val #(map executor-id->tasks %))))
+                               (map-val #(map executor->tasks %))))
       (check-distribution (executor-info "1") [2 2 2 2 1 1 1 1])
 
       )))
@@ -1157,8 +1161,8 @@
                          {}))
          (submit-local-topology nimbus "t1" {} topology)
          (submit-local-topology nimbus "t2" {} topology)
-         (bind storm-id1 (get-storm-id cluster-state "t1"))
-         (bind storm-id2 (get-storm-id cluster-state "t2"))
+         (bind storm-id1 (StormCommon/getStormId cluster-state "t1"))
+         (bind storm-id2 (StormCommon/getStormId cluster-state "t2"))
          (.shutdown nimbus)
          (let [blob-store (Utils/getNimbusBlobStore conf nil)]
            (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
@@ -1346,20 +1350,25 @@
                       [1 2 3] expected-name expected-conf expected-operation))))))
 
         (testing "getTopology calls check-authorization! with the correct parameters."
-          (let [expected-operation "getTopology"]
-            (stubbing [nimbus/check-authorization! nil
+          (let [expected-operation "getTopology"
+                common-spy (->>
+                             (proxy [StormCommon] []
+                                    (systemTopologyImpl [conf topology] nil))
+                           Mockito/spy)]
+            (with-open [- (CommonInstaller. common-spy)]
+              (stubbing [nimbus/check-authorization! nil
                        nimbus/try-read-storm-conf expected-conf
-                       nimbus/try-read-storm-topology nil
-                       system-topology! nil]
-              (try
-                (.getTopology nimbus "fake-id")
-                (catch NotAliveException e)
-                (finally
-                  (verify-first-call-args-for-indices
-                    nimbus/check-authorization!
-                      [1 2 3] expected-name expected-conf expected-operation)
-                  (verify-first-call-args-for-indices
-                    system-topology! [0] expected-conf))))))
+                       nimbus/try-read-storm-topology nil]
+                (try
+                  (.getTopology nimbus "fake-id")
+                  (catch NotAliveException e)
+                  (finally
+                    (verify-first-call-args-for-indices
+                      nimbus/check-authorization!
+                        [1 2 3] expected-name expected-conf expected-operation)
+                    (. (Mockito/verify common-spy)
+                      (systemTopologyImpl (Matchers/eq expected-conf)
+                                          (Matchers/any)))))))))
 
         (testing "getUserTopology calls check-authorization with the correct parameters."
           (let [expected-operation "getUserTopology"]
@@ -1478,14 +1487,16 @@
                        (newInstanceImpl [_])
                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
                                                 (upTime [] 0))))
-          cluster-utils (Mockito/mock ClusterUtils)]
+          cluster-utils (Mockito/mock ClusterUtils)
+	  fake-common (proxy [StormCommon] []
+                             (mkAuthorizationHandler [_] nil))]
       (with-open [_ (ConfigUtilsInstaller. fake-cu)
                   _ (UtilsInstaller. fake-utils)
+                  - (CommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
                           (zkLeaderElectorImpl [conf] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
-        (stubbing [mk-authorization-handler nil
-                 nimbus/file-cache-map nil
+        (stubbing [nimbus/file-cache-map nil
                  nimbus/mk-blob-cache-map nil
                  nimbus/mk-bloblist-cache-map nil
                  nimbus/mk-scheduler nil]

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index 27f5816..54441c3 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -32,6 +32,7 @@
   (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])
   (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
             ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
+  (:import [org.apache.storm.daemon StormCommon])
   (:use [org.apache.storm util config])
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm testing])
@@ -58,7 +59,7 @@
   (let [forced-scheduler (.getForcedScheduler inimbus)]
     {:conf storm-conf
      :inimbus inimbus
-     :authorization-handler (mk-authorization-handler (storm-conf NIMBUS-AUTHORIZER) storm-conf)
+     :authorization-handler (StormCommon/mkAuthorizationHandler (storm-conf NIMBUS-AUTHORIZER) storm-conf)
      :submitted-count (atom 0)
      :storm-cluster-state nil
      :submit-lock (Object.)

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index cdd66e4..a74fc6e 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -34,6 +34,7 @@
   (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
   (:import [java.nio.file.attribute FileAttribute])
+  (:import [org.apache.storm.daemon StormCommon])
   (:use [org.apache.storm config testing util log converter])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]])
@@ -134,7 +135,7 @@
                       (advance-cluster-time cluster 2)
                       (heartbeat-workers cluster "sup1" [1 2 3])
                       (advance-cluster-time cluster 10)))
-      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
+      (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
       (is (empty? (:shutdown changed)))
       (validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
       (bind changed (capture-changed-workers
@@ -194,7 +195,7 @@
                       (heartbeat-workers cluster "sup1" [1 2])
                       (heartbeat-workers cluster "sup2" [1])
                       ))
-      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
+      (bind storm-id (StormCommon/getStormId (:storm-cluster-state cluster) "test"))
       (is (empty? (:shutdown changed)))
       (validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
       (bind changed (capture-changed-workers
@@ -219,7 +220,7 @@
                       (heartbeat-workers cluster "sup1" [3])
                       (heartbeat-workers cluster "sup2" [2])
                       ))
-      (bind storm-id2 (get-storm-id (:storm-cluster-state cluster) "test2"))
+      (bind storm-id2 (StormCommon/getStormId (:storm-cluster-state cluster) "test2"))
       (is (empty? (:shutdown changed)))
       (validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
       (bind changed (capture-changed-workers
@@ -831,8 +832,8 @@
                         ))
         (validate-launched-once (:launched changed)
           {"sup1" [1 2]}
-          (get-storm-id (:storm-cluster-state cluster) "topology1"))
+          (StormCommon/getStormId (:storm-cluster-state cluster) "topology1"))
         (validate-launched-once (:launched changed)
           {"sup1" [3 4]}
-          (get-storm-id (:storm-cluster-state cluster) "topology2"))
+          (StormCommon/getStormId (:storm-cluster-state cluster) "topology2"))
         )))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7241a67/storm-core/test/jvm/org/apache/storm/utils/staticmocking/CommonInstaller.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/staticmocking/CommonInstaller.java b/storm-core/test/jvm/org/apache/storm/utils/staticmocking/CommonInstaller.java
new file mode 100644
index 0000000..8794cd0
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/utils/staticmocking/CommonInstaller.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils.staticmocking;
+
+import org.apache.storm.daemon.StormCommon;
+
+public class CommonInstaller implements AutoCloseable {
+
+    private StormCommon _oldInstance;
+    private StormCommon _curInstance;
+
+    public CommonInstaller(StormCommon instance) {
+        _oldInstance = StormCommon.setInstance(instance);
+        _curInstance = instance;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (StormCommon.setInstance(_oldInstance) != _curInstance) {
+            throw new IllegalStateException(
+                    "Instances of this resource must be closed in reverse order of opening.");
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message