storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From etha...@apache.org
Subject [1/2] storm git commit: STORM-3099: Extend metrics on supervisor, workers and DRPC
Date Wed, 08 Aug 2018 15:11:01 GMT
Repository: storm
Updated Branches:
  refs/heads/master 73382d9ec -> c9efe3be9


STORM-3099: Extend metrics on supervisor, workers and DRPC

STORM-3157: Added Timer registration method

STORM-3130: Added object wrapper with timer in both decorator and inheritance patterns.

STORM-3099: Refactored code and add blob download metrics

STORM-3099: Add duration metrics for responses to Http Requests and refactored code.

STORM-3099: Refactored and commented code for supervisor components.

STORM-3099: Add internal exceptions count for supervisor

STORM-3099: Add exceptions count for worker cleanup, kill and force kill


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/422f2255
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/422f2255
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/422f2255

Branch: refs/heads/master
Commit: 422f2255732c7c01e905327cbc81fc3827b045fe
Parents: 73382d9
Author: Zhengdai Hu <hu.zhengdai@gmail.com>
Authored: Fri Jul 20 13:41:51 2018 -0500
Committer: Zhengdai Hu <zhengdai.hu@oath.com>
Committed: Wed Aug 8 10:03:19 2018 -0500

----------------------------------------------------------------------
 .../supervisor/ClientSupervisorUtils.java       | 13 +++-
 .../jvm/org/apache/storm/utils/ShellUtils.java  | 14 +++-
 .../daemon/nimbus/TimedWritableByteChannel.java | 51 ++++++++++++++
 .../storm/daemon/supervisor/Container.java      | 55 ++++++++++++---
 .../apache/storm/daemon/supervisor/Slot.java    | 42 +++++++++--
 .../storm/daemon/supervisor/Supervisor.java     |  4 ++
 .../supervisor/TimerDecoratedAssignment.java    | 34 +++++++++
 .../apache/storm/localizer/AsyncLocalizer.java  | 74 +++++++++++++-------
 .../storm/localizer/LocallyCachedBlob.java      | 16 +++++
 .../storm/localizer/PortAndAssignment.java      | 70 +++++-------------
 .../storm/localizer/PortAndAssignmentImpl.java  | 73 +++++++++++++++++++
 .../storm/localizer/TimePortAndAssignment.java  | 51 ++++++++++++++
 .../storm/metric/StormMetricsRegistry.java      | 10 +++
 .../org/apache/storm/metric/timed/Timed.java    | 36 ++++++++++
 .../storm/metric/timed/TimedResource.java       | 33 +++++++++
 .../storm/metric/timed/TimerDecorated.java      | 38 ++++++++++
 .../storm/localizer/AsyncLocalizerTest.java     | 22 +++---
 .../LocalizedResourceRetentionSetTest.java      |  6 +-
 .../apache/storm/daemon/drpc/DRPCServer.java    |  5 +-
 .../storm/daemon/drpc/webapp/DRPCResource.java  | 19 ++---
 20 files changed, 541 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index bf6cadb..c8192d3 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.codahale.metrics.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,11 +29,15 @@ import org.apache.storm.Config;
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientSupervisorUtils {
+    //Worker launched through external commands, hence we count their exceptions toward shell exceptions
+    public static final Meter numWorkerLaunchExceptions = ShellUtils.numShellExceptions;
+
     private static final Logger LOG = LoggerFactory.getLogger(ClientSupervisorUtils.class);
 
     static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String stormId) throws IOException {
@@ -125,7 +130,13 @@ public class ClientSupervisorUtils {
         if (environment != null) {
             procEnv.putAll(environment);
         }
-        final Process process = builder.start();
+        final Process process;
+        try {
+            process = builder.start();
+        } catch (IOException e) {
+            numWorkerLaunchExceptions.mark();
+            throw e;
+        }
         if (logPrefix != null || exitCodeCallback != null) {
             Utils.asyncLoop(new Callable<Long>() {
                 public Long call() {

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
index 2216e1a..f4df473 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.utils;
 
+import com.codahale.metrics.Meter;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -36,8 +37,12 @@ abstract public class ShellUtils {
     public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
     public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
     public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);
+
+    //Meter declared here can be registered by any daemon, and is currently used by Supervisor
+    public static final Meter numShellExceptions = new Meter();
+
     /**
-     * Token separator regex used to parse Shell tool outputs
+     * Token separator regex used to parse Shell tool outputs.
      */
     public static final String TOKEN_SEPARATOR_REGEX
         = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
@@ -186,7 +191,12 @@ abstract public class ShellUtils {
             return;
         }
         exitCode = 0; // reset for next run
-        runCommand();
+        try {
+            runCommand();
+        } catch (IOException e) {
+            numShellExceptions.mark();
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TimedWritableByteChannel.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TimedWritableByteChannel.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TimedWritableByteChannel.java
new file mode 100644
index 0000000..e179477
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TimedWritableByteChannel.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import com.codahale.metrics.Timer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.storm.metric.timed.TimedResource;
+
+public class TimedWritableByteChannel extends TimedResource<WritableByteChannel> implements WritableByteChannel {
+
+    public TimedWritableByteChannel(WritableByteChannel measured, Timer timer) {
+        super(measured, timer);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return getMeasured().write(src);
+    }
+
+    @Override
+    public boolean isOpen() {
+        return getMeasured().isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            super.close();
+        } catch (Exception e) {
+            //WritableByteChannel is a Channel which implements Closeable.
+            // Hence although declared AutoCloseable super#close here should only throws IOException
+            //We rethrow to conform the signature
+            throw (IOException) e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 452237f..907c717 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -68,6 +70,13 @@ public abstract class Container implements Killable {
     private static final ConcurrentHashMap<Integer, TopoAndMemory> _reservedMemory =
         new ConcurrentHashMap<>();
 
+    private static final Meter numCleanupExceptions = StormMetricsRegistry.registerMeter("supervisor:num-cleanup-exceptions");
+    private static final Meter numKillExceptions = StormMetricsRegistry.registerMeter("supervisor:num-kill-exceptions");
+    private static final Meter numForceKillExceptions = StormMetricsRegistry.registerMeter("supervisor:num-force-kill-exceptions");
+    private static final Meter numForceKill = StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill");
+    private static final Timer shutdownDuration = StormMetricsRegistry.registerTimer("supervisor:worker-shutdown-duration-ns");
+    private static final Timer cleanupDuration = StormMetricsRegistry.registerTimer("supervisor:worker-per-call-clean-up-duration-ns");
+
     static {
         StormMetricsRegistry.registerGauge(
             "supervisor:current-used-memory-mb",
@@ -106,6 +115,8 @@ public abstract class Container implements Killable {
     protected String _workerId;
     protected ContainerType _type;
     private long lastMetricProcessTime = 0L;
+    private Timer.Context shutdownTimer = null;
+
     /**
      * Create a new Container.
      *
@@ -204,20 +215,34 @@ public abstract class Container implements Killable {
     @Override
     public void kill() throws IOException {
         LOG.info("Killing {}:{}", _supervisorId, _workerId);
-        Set<Long> pids = getAllPids();
+        if (shutdownTimer == null) {
+            shutdownTimer = shutdownDuration.time();
+        }
+        try {
+            Set<Long> pids = getAllPids();
 
-        for (Long pid : pids) {
-            kill(pid);
+            for (Long pid : pids) {
+                kill(pid);
+            }
+        } catch (IOException e) {
+            numKillExceptions.mark();
+            throw e;
         }
     }
 
     @Override
     public void forceKill() throws IOException {
         LOG.info("Force Killing {}:{}", _supervisorId, _workerId);
-        Set<Long> pids = getAllPids();
+        numForceKill.mark();
+        try {
+            Set<Long> pids = getAllPids();
 
-        for (Long pid : pids) {
-            forceKill(pid);
+            for (Long pid : pids) {
+                forceKill(pid);
+            }
+        } catch (IOException e) {
+            numForceKillExceptions.mark();
+            throw e;
         }
     }
 
@@ -318,14 +343,26 @@ public abstract class Container implements Killable {
                 break;
             }
         }
+
+        if (allDead && shutdownTimer != null) {
+            shutdownTimer.stop();
+            shutdownTimer = null;
+        }
+
         return allDead;
     }
 
     @Override
     public void cleanUp() throws IOException {
-        _usedMemory.remove(_port);
-        _reservedMemory.remove(_port);
-        cleanUpForRestart();
+        try (Timer.Context t = cleanupDuration.time()) {
+            _usedMemory.remove(_port);
+            _reservedMemory.remove(_port);
+            cleanUpForRestart();
+        } catch (IOException e) {
+            //This may or may not be reported depending on when process exits
+            numCleanupExceptions.mark();
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 22c00bf..4e87c77 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -13,6 +13,7 @@
 package org.apache.storm.daemon.supervisor;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -70,8 +71,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
 
     private static final Map<KillReason, Meter> numWorkersKilledFor = EnumUtil.toEnumMap(KillReason.class,
         killReason -> StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-" + killReason.toString()));
-    private static final Meter numForceKill =
-        StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill");
+    private static final Timer workerLaunchDuration = StormMetricsRegistry.registerTimer(
+        "supervisor:worker-launch-duration");
+
     private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
     private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>();
     private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>());
@@ -143,7 +145,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         }
 
         setNewAssignment(newAssignment);
-        this.dynamicState = new DynamicState(currentAssignment, container, newAssignment);
+        //if the current assignment is already running, new assignment will never be promoted to currAssignment,
+        // because Timer is not being compared in #equals or #equivalent, meaning newAssignment always equals to currAssignment.
+        // Therefore the timer in newAssignment won't be invoked
+        this.dynamicState = new DynamicState(currentAssignment, container, this.newAssignment.get());
         if (MachineState.RUNNING == dynamicState.state) {
             //We are running so we should recover the blobs.
             staticState.localizer.recoverRunningTopology(currentAssignment, port, this);
@@ -556,7 +561,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         }
 
         LOG.warn("SLOT {} force kill and wait...", staticState.port);
-        numForceKill.mark();
         dynamicState.container.forceKill();
         Time.sleep(staticState.killSleepMs);
         return dynamicState;
@@ -588,7 +592,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) {
             throw new RuntimeException("Not all processes in " + dynamicState.container + " exited after 120 seconds");
         }
-        numForceKill.mark();
         dynamicState.container.forceKill();
         Time.sleep(staticState.killSleepMs);
         return dynamicState;
@@ -808,7 +811,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @param newAssignment the new assignment for this slot to run, null to run nothing
      */
     public void setNewAssignment(LocalAssignment newAssignment) {
-        this.newAssignment.set(newAssignment);
+        this.newAssignment.set(newAssignment == null ? null : new TimerDecoratedAssignment(newAssignment, workerLaunchDuration));
     }
 
     @Override
@@ -967,6 +970,11 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         KILL_BLOB_UPDATE,
         WAITING_FOR_BLOB_LOCALIZATION,
         WAITING_FOR_BLOB_UPDATE;
+
+        @Override
+        public String toString() {
+            return EnumUtil.toMetricName(this);
+        }
     }
 
     static class StaticState {
@@ -1008,6 +1016,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     static class DynamicState {
+        private static final Map<MachineState, Meter> transitionIntoState = EnumUtil.toEnumMap(MachineState.class,
+            machineState -> StormMetricsRegistry.registerMeter("supervisor:num-worker-transitions-into-" + machineState.toString()));
+        //This also tracks how many times worker transitioning out of a state
+        private static final Map<MachineState, Timer> timeSpentInState = EnumUtil.toEnumMap(MachineState.class,
+            machineState -> StormMetricsRegistry.registerTimer("supervisor:time-worker-spent-in-state-" + machineState.toString() + "-ms")
+        );
+
         public final MachineState state;
 
         /**
@@ -1064,6 +1079,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             } else {
                 state = MachineState.RUNNING;
             }
+            transitionIntoState.get(state).mark();
 
             this.startTime = Time.currentTimeMillis();
             this.newAssignment = newAssignment;
@@ -1147,8 +1163,20 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
          */
         public DynamicState withState(final MachineState state) {
             long newStartTime = Time.currentTimeMillis();
+            //We may (though unlikely) lose metering here if state transition is too frequent (less than a millisecond)
+            timeSpentInState.get(this.state).update(newStartTime - startTime, TimeUnit.MILLISECONDS);
+            transitionIntoState.get(state).mark();
+
+            LocalAssignment assignment = this.currentAssignment;
+            if (MachineState.RUNNING != this.state && MachineState.RUNNING == state
+                && this.currentAssignment instanceof TimerDecoratedAssignment) {
+                ((TimerDecoratedAssignment) assignment).stopTiming();
+                //Timer is discarded after the initial launch of an assignment
+                assignment = new LocalAssignment(this.currentAssignment);
+            }
+
             return new DynamicState(state, this.newAssignment,
-                                    this.container, this.currentAssignment,
+                                    this.container, assignment,
                                     this.pendingLocalization, newStartTime,
                                     this.pendingDownload, this.profileActions,
                                     this.pendingStopProfileActions, this.changingBlobs,

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 8081290..a2cf66f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -69,6 +69,7 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
@@ -312,6 +313,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             Utils.addShutdownHookWithForceKillIn1Sec(this::close);
 
             StormMetricsRegistry.registerGauge("supervisor:num-slots-used-gauge", () -> SupervisorUtils.supervisorWorkerIds(conf).size());
+            //This will only get updated once
+            StormMetricsRegistry.registerMeter("supervisor:num-launched").mark();
+            StormMetricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions);
             StormMetricsRegistry.startMetricsReporters(conf);
 
             // blocking call under the hood, must invoke after launch cause some services must be initialized

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/daemon/supervisor/TimerDecoratedAssignment.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/TimerDecoratedAssignment.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/TimerDecoratedAssignment.java
new file mode 100644
index 0000000..ad2e6bb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/TimerDecoratedAssignment.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import com.codahale.metrics.Timer;
+
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.timed.TimerDecorated;
+
+public class TimerDecoratedAssignment extends LocalAssignment implements TimerDecorated {
+    private final Timer.Context timing;
+
+    public TimerDecoratedAssignment(LocalAssignment other, Timer timer) {
+        super(other);
+        timing = timer.time();
+    }
+
+    @Override
+    public long stopTiming() {
+        return stopTiming(timing);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 9c2596d..9082e9c 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.localizer;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
@@ -49,6 +51,7 @@ import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.storm.thrift.transport.TTransportException;
@@ -70,6 +73,14 @@ public class AsyncLocalizer implements AutoCloseable {
     private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>();
     private static final int ATTEMPTS_INTERVAL_TIME = 100;
 
+    private static final Timer singleBlobLocalizationDuration = StormMetricsRegistry.registerTimer(
+        "supervisor:single-blob-localization-duration");
+    private static final Timer blobCacheUpdateDuration = StormMetricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
+    private static final Timer blobLocalizationDuration = StormMetricsRegistry.registerTimer("supervisor:blob-localization-duration");
+
+    private static final Meter numBlobUpdateVersionChanged = StormMetricsRegistry.registerMeter(
+            "supervisor:num-blob-update-version-changed");
+
     static {
         ALL_DONE_FUTURE.complete(null);
     }
@@ -193,7 +204,7 @@ public class AsyncLocalizer implements AutoCloseable {
      */
     public CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
                                                                 final BlobChangingCallback cb) throws IOException {
-        final PortAndAssignment pna = new PortAndAssignment(port, assignment);
+        final PortAndAssignment pna = new TimePortAndAssignment(new PortAndAssignmentImpl(port, assignment), blobLocalizationDuration);
         final String topologyId = pna.getToplogyId();
 
         CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(pna, cb);
@@ -207,6 +218,8 @@ public class AsyncLocalizer implements AutoCloseable {
                                                           addReferencesToBlobs(pna, cb);
                                                       } catch (Exception e) {
                                                           throw new RuntimeException(e);
+                                                      } finally {
+                                                          pna.complete();
                                                       }
                                                   }
                                                   LOG.debug("Reserved blobs {} {}", topologyId, ret);
@@ -215,8 +228,7 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     @VisibleForTesting
-    CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb)
-        throws IOException {
+    CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb) {
         final String topologyId = pna.getToplogyId();
 
         final LocallyCachedBlob topoJar = getTopoJar(topologyId);
@@ -251,11 +263,18 @@ public class AsyncLocalizer implements AutoCloseable {
                                 long localVersion = blob.getLocalVersion();
                                 long remoteVersion = blob.getRemoteVersion(blobStore);
                                 if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
+                                    if (blob.isFullyDownloaded()) {
+                                        //Avoid case of different blob version
+                                        // when blob is not downloaded (first time download)
+                                        numBlobUpdateVersionChanged.mark();
+                                    }
+                                    Timer.Context t = singleBlobLocalizationDuration.time();
                                     try {
                                         long newVersion = blob.fetchUnzipToTemp(blobStore);
                                         blob.informAllOfChangeAndWaitForConsensus();
                                         blob.commitNewVersion(newVersion);
                                         blob.informAllChangeComplete();
+                                        t.stop();
                                     } finally {
                                         blob.cleanupOrphanedData();
                                     }
@@ -285,29 +304,31 @@ public class AsyncLocalizer implements AutoCloseable {
      */
     @VisibleForTesting
     void updateBlobs() {
-        List<CompletableFuture<?>> futures = new ArrayList<>();
-        futures.add(downloadOrUpdate(topologyBlobs.values()));
-        if (symlinksDisabled) {
-            LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
-        } else {
-            for (ConcurrentMap<String, LocalizedResource> map : userArchives.values()) {
-                futures.add(downloadOrUpdate(map.values()));
-            }
+        try (Timer.Context t = blobCacheUpdateDuration.time()) {
+            List<CompletableFuture<?>> futures = new ArrayList<>();
+            futures.add(downloadOrUpdate(topologyBlobs.values()));
+            if (symlinksDisabled) {
+                LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
+            } else {
+                for (ConcurrentMap<String, LocalizedResource> map : userArchives.values()) {
+                    futures.add(downloadOrUpdate(map.values()));
+                }
 
-            for (ConcurrentMap<String, LocalizedResource> map : userFiles.values()) {
-                futures.add(downloadOrUpdate(map.values()));
+                for (ConcurrentMap<String, LocalizedResource> map : userFiles.values()) {
+                    futures.add(downloadOrUpdate(map.values()));
+                }
             }
-        }
-        for (CompletableFuture<?> f : futures) {
-            try {
-                f.get();
-            } catch (Exception e) {
-                if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
-                    LOG.error("Network error while updating blobs, will retry again later", e);
-                } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
-                    LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
-                } else {
-                    LOG.error("Could not update blob, will retry again later", e);
+            for (CompletableFuture<?> f : futures) {
+                try {
+                    f.get();
+                } catch (Exception e) {
+                    if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                        LOG.error("Network error while updating blobs, will retry again later", e);
+                    } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                        LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
+                    } else {
+                        LOG.error("Could not update blob, will retry again later", e);
+                    }
                 }
             }
         }
@@ -378,7 +399,7 @@ public class AsyncLocalizer implements AutoCloseable {
      */
     public void recoverRunningTopology(final LocalAssignment currentAssignment, final int port,
                                        final BlobChangingCallback cb) throws IOException {
-        final PortAndAssignment pna = new PortAndAssignment(port, currentAssignment);
+        final PortAndAssignment pna = new PortAndAssignmentImpl(port, currentAssignment);
         final String topologyId = pna.getToplogyId();
 
         LocallyCachedBlob topoJar = getTopoJar(topologyId);
@@ -409,7 +430,7 @@ public class AsyncLocalizer implements AutoCloseable {
      * @throws IOException on any error
      */
     public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
-        PortAndAssignment pna = new PortAndAssignment(port, assignment);
+        PortAndAssignment pna = new PortAndAssignmentImpl(port, assignment);
         final String topologyId = assignment.get_topology_id();
         LOG.debug("Releasing slot for {} {}", topologyId, port);
 
@@ -687,6 +708,7 @@ public class AsyncLocalizer implements AutoCloseable {
                     }
                 }
 
+                pna.complete();
                 return null;
             } catch (Exception e) {
                 LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index abffd07..952d8d9 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -12,6 +12,9 @@
 
 package org.apache.storm.localizer;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -29,6 +32,7 @@ import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +52,9 @@ public abstract class LocallyCachedBlob {
     private long lastUsed = Time.currentTimeMillis();
     private CompletableFuture<Void> doneUpdating = null;
 
+    private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram(
+            "supervisor:blob-fetching-rate-MB/s", new ExponentiallyDecayingReservoir());
+
     /**
      * Create a new LocallyCachedBlob.
      *
@@ -87,18 +94,27 @@ public abstract class LocallyCachedBlob {
             Path downloadPath = pathSupplier.apply(newVersion);
             LOG.debug("Downloading {} to {}", key, downloadPath);
 
+            long duration;
             long totalRead = 0;
             try (OutputStream out = outStreamSupplier.apply(downloadPath.toFile())) {
+                long startTime = Time.nanoTime();
+
                 byte[] buffer = new byte[4096];
                 int read;
                 while ((read = in.read(buffer)) >= 0) {
                     out.write(buffer, 0, read);
                     totalRead += read;
                 }
+
+                duration = Time.nanoTime() - startTime;
             }
+
             long expectedSize = in.getFileLength();
             if (totalRead != expectedSize) {
                 throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead);
+            } else {
+                double downloadRate = ((double) totalRead * 1e3) / duration;
+                fetchingRate.update(Math.round(downloadRate));
             }
             return new DownloadMeta(downloadPath, newVersion);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
index 0979e03..df8785f 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
@@ -1,69 +1,31 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
  */
 
 package org.apache.storm.localizer;
 
 import org.apache.storm.generated.LocalAssignment;
 
-/**
- * A Port and a LocalAssignment used to reference count resources.
- */
-class PortAndAssignment {
-    private final int port;
-    private final LocalAssignment assignment;
-
-    public PortAndAssignment(int port, LocalAssignment assignment) {
-        this.port = port;
-        this.assignment = assignment;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof PortAndAssignment)) {
-            return false;
-        }
-        PortAndAssignment pna = (PortAndAssignment) other;
-        return pna.port == port && assignment.equals(pna.assignment);
-    }
+public interface PortAndAssignment {
+    String getToplogyId();
 
-    public String getToplogyId() {
-        return assignment.get_topology_id();
-    }
-
-    public String getOwner() {
-        return assignment.get_owner();
-    }
+    String getOwner();
 
-    @Override
-    public int hashCode() {
-        return (17 * port) + assignment.hashCode();
-    }
+    int getPort();
 
-    @Override
-    public String toString() {
-        return "{" + assignment.get_topology_id() + " on " + port + "}";
-    }
+    LocalAssignment getAssignment();
 
-    /**
-     * Return the port associated with this.
-     */
-    public int getPort() {
-        return port;
-    }
+    default void complete() {
 
-    /**
-     * return the assigment for this.
-     */
-    public LocalAssignment getAssignment() {
-        return assignment;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignmentImpl.java
new file mode 100644
index 0000000..690f1f4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignmentImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import org.apache.storm.generated.LocalAssignment;
+
+/**
+ * A Port and a LocalAssignment used to reference count resources.
+ */
+class PortAndAssignmentImpl implements PortAndAssignment {
+    private final int port;
+    private final LocalAssignment assignment;
+
+    public PortAndAssignmentImpl(int port, LocalAssignment assignment) {
+        this.port = port;
+        this.assignment = assignment;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof PortAndAssignmentImpl)) {
+            return false;
+        }
+        PortAndAssignmentImpl pna = (PortAndAssignmentImpl) other;
+        return pna.port == port && assignment.equals(pna.assignment);
+    }
+
+    @Override
+    public String getToplogyId() {
+        return assignment.get_topology_id();
+    }
+
+    @Override
+    public String getOwner() {
+        return assignment.get_owner();
+    }
+
+    @Override
+    public int hashCode() {
+        return (17 * port) + assignment.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "{" + assignment.get_topology_id() + " on " + port + "}";
+    }
+
+    /**
+     * Return the port associated with this.
+     */
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * return the assigment for this.
+     */
+    @Override
+    public LocalAssignment getAssignment() {
+        return assignment;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/localizer/TimePortAndAssignment.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/TimePortAndAssignment.java b/storm-server/src/main/java/org/apache/storm/localizer/TimePortAndAssignment.java
new file mode 100644
index 0000000..e1fee5c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/localizer/TimePortAndAssignment.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import com.codahale.metrics.Timer;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.metric.timed.Timed;
+
+public class TimePortAndAssignment extends Timed<PortAndAssignment> implements PortAndAssignment {
+
+    public TimePortAndAssignment(PortAndAssignment measured, Timer timer) {
+        super(measured, timer);
+    }
+
+    @Override
+    public String getToplogyId() {
+        return getMeasured().getToplogyId();
+    }
+
+    @Override
+    public String getOwner() {
+        return getMeasured().getOwner();
+    }
+
+    @Override
+    public int getPort() {
+        return getMeasured().getPort();
+    }
+
+    @Override
+    public LocalAssignment getAssignment() {
+        return getMeasured().getAssignment();
+    }
+
+    @Override
+    public void complete() {
+        stopTiming();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index 679c6d3..602f53e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -19,6 +19,8 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Timer;
+
 import java.util.Map;
 
 import org.apache.storm.daemon.metrics.MetricsUtils;
@@ -48,6 +50,14 @@ public class StormMetricsRegistry extends MetricRegistry {
         return REGISTRY.register(name, new Meter());
     }
 
+    public static void registerMeter(String name, Meter meter) {
+        REGISTRY.register(name, meter);
+    }
+
+    public static Timer registerTimer(String name) {
+        return REGISTRY.register(name, new Timer());
+    }
+
     /**
      * Start metrics reporters for the registry singleton.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/metric/timed/Timed.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/timed/Timed.java b/storm-server/src/main/java/org/apache/storm/metric/timed/Timed.java
new file mode 100644
index 0000000..f5e8a96
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metric/timed/Timed.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.metric.timed;
+
+import com.codahale.metrics.Timer;
+
+public class Timed<T> implements TimerDecorated {
+    private final T measured;
+    private final Timer.Context timing;
+
+    public Timed(T measured, Timer timer) {
+        this.measured = measured;
+        timing = timer.time();
+    }
+
+    public T getMeasured() {
+        return measured;
+    }
+
+    @Override
+    public long stopTiming() {
+        return stopTiming(timing);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/metric/timed/TimedResource.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/timed/TimedResource.java b/storm-server/src/main/java/org/apache/storm/metric/timed/TimedResource.java
new file mode 100644
index 0000000..ceca758
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metric/timed/TimedResource.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.metric.timed;
+
+import com.codahale.metrics.Timer;
+
+public class TimedResource<T extends AutoCloseable> extends Timed<T> {
+
+    public TimedResource(T measured, Timer timer) {
+        super(measured, timer);
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            super.close();
+        } finally {
+            getMeasured().close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/main/java/org/apache/storm/metric/timed/TimerDecorated.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/timed/TimerDecorated.java b/storm-server/src/main/java/org/apache/storm/metric/timed/TimerDecorated.java
new file mode 100644
index 0000000..309519c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metric/timed/TimerDecorated.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.metric.timed;
+
+import com.codahale.metrics.Timer;
+
+public interface TimerDecorated extends AutoCloseable {
+
+    long stopTiming();
+
+    /**
+     * Stop the timer for measured object. (Copied from {@link Timer.Context#stop()})
+     * Call to this method will not reset the start time.
+     * Multiple calls result in multiple updates.
+     *
+     * @return Time a object is in use, or under measurement, in nanoseconds.
+     */
+    default long stopTiming(final Timer.Context timing) {
+        return timing.stop();
+    }
+
+    @Override
+    default void close() throws Exception {
+        stopTiming();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index a919dd3..0349e67 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -141,7 +141,7 @@ public class AsyncLocalizerTest {
         try {
             when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
 
-            PortAndAssignment pna = new PortAndAssignment(port, la);
+            PortAndAssignment pna = new PortAndAssignmentImpl(port, la);
             Future<Void> f = bl.requestDownloadBaseTopologyBlobs(pna, null);
             f.get(20, TimeUnit.SECONDS);
 
@@ -345,7 +345,7 @@ public class AsyncLocalizerTest {
         arrUser1Keys.add(new LocalResource(archive1, true, false));
         LocalAssignment topo1 = new LocalAssignment("topo1", Collections.emptyList());
         topo1.set_owner(user1);
-        localizer.addReferences(arrUser1Keys, new PortAndAssignment(1, topo1), null);
+        localizer.addReferences(arrUser1Keys, new PortAndAssignmentImpl(1, topo1), null);
 
         ConcurrentMap<String, LocalizedResource> lrsrcFiles = localizer.getUserFiles().get(user1);
         ConcurrentMap<String, LocalizedResource> lrsrcArchives = localizer.getUserArchives().get(user1);
@@ -444,7 +444,7 @@ public class AsyncLocalizerTest {
             assertTrue("failed to create user dir", user1Dir.mkdirs());
             LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
             topo1Assignment.set_owner(user1);
-            PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+            PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
             LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true, false), topo1Pna, null);
             Time.advanceTime(10);
             long timeAfter = Time.currentTimeMillis();
@@ -529,7 +529,7 @@ public class AsyncLocalizerTest {
             Time.advanceTime(10);
             LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
             topo1Assignment.set_owner(user1);
-            PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+            PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
             LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
             long timeAfter = Time.currentTimeMillis();
             Time.advanceTime(10);
@@ -609,7 +609,7 @@ public class AsyncLocalizerTest {
 
             LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
             topo1Assignment.set_owner(user1);
-            PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+            PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
             List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, topo1Pna, null);
             LocalizedResource lrsrc = lrsrcs.get(0);
             LocalizedResource lrsrc2 = lrsrcs.get(1);
@@ -707,7 +707,7 @@ public class AsyncLocalizerTest {
 
         LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
         topo1Assignment.set_owner(user1);
-        PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+        PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
         // This should throw AuthorizationException because auth failed
         localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
     }
@@ -758,17 +758,17 @@ public class AsyncLocalizerTest {
 
         LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
         topo1Assignment.set_owner(user1);
-        PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+        PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
         LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
 
         LocalAssignment topo2Assignment = new LocalAssignment(topo2, Collections.emptyList());
         topo2Assignment.set_owner(user2);
-        PortAndAssignment topo2Pna = new PortAndAssignment(2, topo2Assignment);
+        PortAndAssignment topo2Pna = new PortAndAssignmentImpl(2, topo2Assignment);
         LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false, false), topo2Pna, null);
 
         LocalAssignment topo3Assignment = new LocalAssignment(topo3, Collections.emptyList());
         topo3Assignment.set_owner(user3);
-        PortAndAssignment topo3Pna = new PortAndAssignment(3, topo3Assignment);
+        PortAndAssignment topo3Pna = new PortAndAssignmentImpl(3, topo3Assignment);
         LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false, false), topo3Pna, null);
 
         // make sure we support different user reading same blob
@@ -846,7 +846,7 @@ public class AsyncLocalizerTest {
         assertTrue("failed to create user dir", user1Dir.mkdirs());
         LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
         topo1Assignment.set_owner(user1);
-        PortAndAssignment topo1Pna = new PortAndAssignment(1, topo1Assignment);
+        PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
         LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
 
         String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
@@ -868,7 +868,7 @@ public class AsyncLocalizerTest {
 
         LocalAssignment topo2Assignment = new LocalAssignment(topo2, Collections.emptyList());
         topo2Assignment.set_owner(user1);
-        PortAndAssignment topo2Pna = new PortAndAssignment(1, topo2Assignment);
+        PortAndAssignment topo2Pna = new PortAndAssignmentImpl(1, topo2Assignment);
         localizer.getBlob(new LocalResource(key1, false, false), topo2Pna, null);
         assertTrue("blob version file not created", versionFile.exists());
         assertEquals("blob version not correct", 2, LocalizedResource.localVersionOfBlob(keyVersionFile));

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index a6e8bfd..f41c3df 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -36,8 +36,8 @@ public class LocalizedResourceRetentionSetTest {
 
     @Test
     public void testAddResources() throws Exception {
-        PortAndAssignment pna1 = new PortAndAssignment(1, new LocalAssignment("topo1", Collections.emptyList()));
-        PortAndAssignment pna2 = new PortAndAssignment(1, new LocalAssignment("topo2", Collections.emptyList()));
+        PortAndAssignment pna1 = new PortAndAssignmentImpl(1, new LocalAssignment("topo1", Collections.emptyList()));
+        PortAndAssignment pna2 = new PortAndAssignmentImpl(1, new LocalAssignment("topo2", Collections.emptyList()));
         String user = "user";
         Map<String, Object> conf = new HashMap<>();
         IAdvancedFSOps ops = mock(IAdvancedFSOps.class);
@@ -74,7 +74,7 @@ public class LocalizedResourceRetentionSetTest {
     public void testCleanup() throws Exception {
         ClientBlobStore mockBlobstore = mock(ClientBlobStore.class);
         when(mockBlobstore.getBlobMeta(any())).thenReturn(new ReadableBlobMeta(new SettableBlobMeta(), 1));
-        PortAndAssignment pna1 = new PortAndAssignment(1, new LocalAssignment("topo1", Collections.emptyList()));
+        PortAndAssignment pna1 = new PortAndAssignmentImpl(1, new LocalAssignment("topo1", Collections.emptyList()));
         String user = "user";
         Map<String, Object> conf = new HashMap<>();
         IAdvancedFSOps ops = mock(IAdvancedFSOps.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
index 559993d..d3edd5e 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -145,8 +145,7 @@ public class DRPCServer implements AutoCloseable {
     @VisibleForTesting
     void start() throws Exception {
         LOG.info("Starting Distributed RPC servers...");
-        new Thread(() -> invokeServer.serve()).start();
-        
+        new Thread(invokeServer::serve).start();
         if (httpServer != null) {
             httpServer.start();
         }
@@ -223,7 +222,7 @@ public class DRPCServer implements AutoCloseable {
         Utils.setupDefaultUncaughtExceptionHandler();
         Map<String, Object> conf = Utils.readStormConfig();
         try (DRPCServer server = new DRPCServer(conf)) {
-            Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+            Utils.addShutdownHookWithForceKillIn1Sec(server::close);
             StormMetricsRegistry.startMetricsReporters(conf);
             server.start();
             server.awaitTermination();

http://git-wip-us.apache.org/repos/asf/storm/blob/422f2255/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
index 271c99f..54efd19 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
@@ -19,6 +19,7 @@
 package org.apache.storm.daemon.drpc.webapp;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.GET;
@@ -29,11 +30,11 @@ import javax.ws.rs.core.Context;
 
 import org.apache.storm.daemon.drpc.DRPC;
 import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.thrift.TException;
 
 @Path("/drpc/")
 public class DRPCResource {
     private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+    private static final Timer responseDuration = StormMetricsRegistry.registerTimer("drpc:HTTP-request-response-duration");
     private final DRPC drpc;
 
     public DRPCResource(DRPC drpc) {
@@ -43,24 +44,24 @@ public class DRPCResource {
     //TODO put in some better exception mapping...
     //TODO move populateContext to a filter...
     @POST
-    @Path("/{func}") 
-    public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
+    @Path("/{func}")
+    public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws Exception {
         meterHttpRequests.mark();
-        return drpc.executeBlocking(func, args);
+        return responseDuration.time(() -> drpc.executeBlocking(func, args));
     }
     
     @GET
     @Path("/{func}/{args}") 
     public String get(@PathParam("func") String func, @PathParam("args") String args,
-                      @Context HttpServletRequest request) throws TException {
+                      @Context HttpServletRequest request) throws Exception {
         meterHttpRequests.mark();
-        return drpc.executeBlocking(func, args);
+        return responseDuration.time(() -> drpc.executeBlocking(func, args));
     }
     
     @GET
-    @Path("/{func}") 
-    public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
+    @Path("/{func}")
+    public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws Exception {
         meterHttpRequests.mark();
-        return drpc.executeBlocking(func, "");
+        return responseDuration.time(() -> drpc.executeBlocking(func, ""));
     }
 }


Mime
View raw message