storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-2272: don't leak simulated time
Date Fri, 06 Jan 2017 16:58:52 GMT
Repository: storm
Updated Branches:
  refs/heads/master e95fd9408 -> b41e9a161


STORM-2272: don't leak simulated time


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e0da8f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e0da8f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e0da8f8

Branch: refs/heads/master
Commit: 4e0da8f8d4cc0e9b593fdefe6003e24366ac60d0
Parents: 6b92d23
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Wed Jan 4 14:40:09 2017 -0600
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Wed Jan 4 14:40:09 2017 -0600

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/LocalCluster.java  | 156 ++++++++++---------
 .../src/jvm/org/apache/storm/utils/Time.java    |   4 +
 2 files changed, 87 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4e0da8f8/storm-core/src/jvm/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalCluster.java b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
index 3b18ac9..9b31209 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalCluster.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalCluster.java
@@ -356,83 +356,91 @@ public class LocalCluster implements ILocalCluster {
         } else {
             time = null;
         }
-        this.trackId = builder.trackId;
-        if (trackId != null) {
-            ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
-            metrics.put("spout-emitted", new AtomicInteger(0));
-            metrics.put("transferred", new AtomicInteger(0));
-            metrics.put("processed", new AtomicInteger(0));
-            this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
-            LOG.warn("Adding tracked metrics for ID {}", this.trackId);
-            RegisteredGlobalState.setState(this.trackId, metrics);
-            LocalExecutor.setTrackId(this.trackId);
-        } else {
-            this.commonInstaller = null;
-        }
-        
-        this.tmpDirs = new ArrayList<>();
-        this.supervisors = new ArrayList<>();
-        TmpPath nimbusTmp = new TmpPath();
-        this.tmpDirs.add(nimbusTmp);
-        Map<String, Object> conf = ConfigUtils.readStormConfig();
-        conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
-        conf.put(Config.ZMQ_LINGER_MILLIS, 0);
-        conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
-        conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
-        conf.put(Config.STORM_CLUSTER_MODE, "local");
-        conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
-        conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
-        
-        InProcessZookeeper zookeeper = null;
-        if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
-            zookeeper = new InProcessZookeeper();
-            conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
-            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
-        }
-        this.zookeeper = zookeeper;
-        conf.putAll(builder.daemonConf);
-        this.daemonConf = new HashMap<>(conf);
-        
-        this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
-        ClusterStateContext cs = new ClusterStateContext();
-        this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
-        if (builder.clusterState == null) {
-            clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
-        } else {
-            this.clusterState = builder.clusterState;
-        }
-        //Set it for nimbus only
-        conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
-        Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus()
: builder.inimbus, 
+        boolean success = false;
+        try {
+            this.trackId = builder.trackId;
+            if (trackId != null) {
+                ConcurrentHashMap<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
+                metrics.put("spout-emitted", new AtomicInteger(0));
+                metrics.put("transferred", new AtomicInteger(0));
+                metrics.put("processed", new AtomicInteger(0));
+                this.commonInstaller = new StormCommonInstaller(new TrackedStormCommon(this.trackId));
+                LOG.warn("Adding tracked metrics for ID {}", this.trackId);
+                RegisteredGlobalState.setState(this.trackId, metrics);
+                LocalExecutor.setTrackId(this.trackId);
+            } else {
+                this.commonInstaller = null;
+            }
+        
+            this.tmpDirs = new ArrayList<>();
+            this.supervisors = new ArrayList<>();
+            TmpPath nimbusTmp = new TmpPath();
+            this.tmpDirs.add(nimbusTmp);
+            Map<String, Object> conf = ConfigUtils.readStormConfig();
+            conf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
+            conf.put(Config.ZMQ_LINGER_MILLIS, 0);
+            conf.put(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, false);
+            conf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 50);
+            conf.put(Config.STORM_CLUSTER_MODE, "local");
+            conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name"));
+            conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
+        
+            InProcessZookeeper zookeeper = null;
+            if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
+                zookeeper = new InProcessZookeeper();
+                conf.put(Config.STORM_ZOOKEEPER_PORT, zookeeper.getPort());
+                conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
+            }
+            this.zookeeper = zookeeper;
+            conf.putAll(builder.daemonConf);
+            this.daemonConf = new HashMap<>(conf);
+        
+            this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+            ClusterStateContext cs = new ClusterStateContext();
+            this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, null, cs);
+            if (builder.clusterState == null) {
+                clusterState = ClusterUtils.mkStormClusterState(this.daemonConf, null, cs);
+            } else {
+                this.clusterState = builder.clusterState;
+            }
+            //Set it for nimbus only
+            conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
+            Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus()
: builder.inimbus, 
                 this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
-        if (builder.nimbusWrapper != null) {
-            nimbus = builder.nimbusWrapper.apply(nimbus);
-        }
-        this.nimbus = nimbus;
-        this.nimbus.launchServer();
-        IContext context = null;
-        if (!Utils.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false)) {
-            context = new Context();
-            context.prepare(this.daemonConf);
-        }
-        this.sharedContext = context;
-        this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus)
: null;
+            if (builder.nimbusWrapper != null) {
+                nimbus = builder.nimbusWrapper.apply(nimbus);
+            }
+            this.nimbus = nimbus;
+            this.nimbus.launchServer();
+            IContext context = null;
+            if (!Utils.getBoolean(this.daemonConf.get(Config.STORM_LOCAL_MODE_ZMQ), false))
{
+                context = new Context();
+                context.prepare(this.daemonConf);
+            }
+            this.sharedContext = context;
+            this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf,
this.nimbus) : null;
         
-        for (int i = 0; i < builder.supervisors; i++) {
-            addSupervisor(builder.portsPerSupervisor, null, null);
-        }
+            for (int i = 0; i < builder.supervisors; i++) {
+                addSupervisor(builder.portsPerSupervisor, null, null);
+            }
         
-        //Wait for a leader to be elected (or topology submission can be rejected)
-        try {
-            long timeoutAfter = System.currentTimeMillis() + 10_000;
-            while (!hasLeader()) {
-                if (timeoutAfter > System.currentTimeMillis()) {
-                    throw new IllegalStateException("Timed out waiting for nimbus to become
the leader");
+            //Wait for a leader to be elected (or topology submission can be rejected)
+            try {
+                long timeoutAfter = System.currentTimeMillis() + 10_000;
+                while (!hasLeader()) {
+                    if (timeoutAfter > System.currentTimeMillis()) {
+                        throw new IllegalStateException("Timed out waiting for nimbus to
become the leader");
+                    }
+                    Thread.sleep(1);
                 }
-                Thread.sleep(1);
+            } catch (Exception e) {
+                //Ignore any exceptions we might be doing a test for authentication 
+            }
+            success = true;
+        } finally {
+            if (!success) {
+                close();
             }
-        } catch (Exception e) {
-            //Ignore any exceptions we might be doing a test for authentication 
         }
     }
     
@@ -605,7 +613,9 @@ public class LocalCluster implements ILocalCluster {
 
     @Override
     public synchronized void close() throws Exception {
-        nimbus.shutdown();
+        if (nimbus != null) {
+            nimbus.shutdown();
+        }
         if (thriftServer != null) {
             LOG.info("shutting down thrift server");
             try {

http://git-wip-us.apache.org/repos/asf/storm/blob/4e0da8f8/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index b6c48c1..c5c6b6a 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -47,6 +47,7 @@ public class Time {
                 if (ms != null) {
                     Time.autoAdvanceOnSleep.set(ms.longValue());
                 }
+                LOG.warn("AutoCloseable Simulated Time Starting...");
             }
         }
         
@@ -56,6 +57,7 @@ public class Time {
                 Time.simulating.set(false);    
                 Time.autoAdvanceOnSleep.set(0);
                 Time.threadSleepTimes = null;
+                LOG.warn("AutoCloseable Simulated Time Ending...");
             }
         }
     }
@@ -66,6 +68,7 @@ public class Time {
             Time.simulating.set(true);
             Time.simulatedCurrTimeMs = new AtomicLong(0);
             Time.threadSleepTimes = new ConcurrentHashMap<>();
+            LOG.warn("Simulated Time Starting...");
         }
     }
     
@@ -75,6 +78,7 @@ public class Time {
             Time.simulating.set(false);    
             Time.autoAdvanceOnSleep.set(0);
             Time.threadSleepTimes = null;
+            LOG.warn("Simulated Time Ending...");
         }
     }
     


Mime
View raw message