storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From etha...@apache.org
Subject [2/3] storm git commit: STORM-3133: Extend metrics on Nimbus and LogViewer:
Date Fri, 10 Aug 2018 17:12:45 GMT
STORM-3133: Extend metrics on Nimbus and LogViewer:

STORM-3157: Added registration method for MetricSet

STORM-3133: Refactored and added metrics to LogViewer components

STORM-3133: Fixed up Unit test for LogViewer

STORM-3133: Refactored and added metrics to Nimbus components.

STORM-3133: Add nimbus scheduling metrics

 STORM-3133: Add metrics for disk usage of workers' logs and performance of LogCleaner routine

STORM-3133: Refactored code and added file partial read count metric for logviewer

STORM-3133: Add metrics for counting LogViewer's IOExceptions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf81b684
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf81b684
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf81b684

Branch: refs/heads/master
Commit: bf81b6840dba16b506187c53448db8e09a6a9f14
Parents: c9efe3b
Author: Zhengdai Hu <hu.zhengdai@gmail.com>
Authored: Fri Jul 20 13:41:51 2018 -0500
Committer: Zhengdai Hu <zhengdai.hu@oath.com>
Committed: Fri Aug 10 12:01:36 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/scheduler/WorkerSlot.java  |   7 +
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 228 ++++++++++++-------
 .../daemon/supervisor/SupervisorUtils.java      |   9 +-
 .../storm/localizer/LocallyCachedBlob.java      |   3 +-
 .../storm/metric/StormMetricsRegistry.java      |  25 +-
 .../storm/nimbus/LeaderListenerCallback.java    |   7 +
 .../org/apache/storm/scheduler/Cluster.java     |  12 +-
 .../apache/storm/scheduler/ExecutorDetails.java |   9 +-
 .../storm/metric/StormMetricsRegistryTest.java  | 111 +++++++++
 .../storm/daemon/logviewer/LogviewerServer.java |   6 +-
 .../handler/LogviewerLogPageHandler.java        | 102 +++++----
 .../handler/LogviewerLogSearchHandler.java      | 159 ++++++++-----
 .../daemon/logviewer/utils/DeletionMeta.java    |  31 +++
 .../logviewer/utils/DirectoryCleaner.java       |  25 +-
 .../daemon/logviewer/utils/ExceptionMeters.java |  66 ++++++
 .../daemon/logviewer/utils/LogCleaner.java      |  48 ++--
 .../logviewer/utils/LogFileDownloader.java      |   8 +
 .../utils/LogviewerResponseBuilder.java         |  19 +-
 .../daemon/logviewer/utils/WorkerLogs.java      |  16 +-
 .../logviewer/webapp/LogviewerResource.java     |  78 ++++++-
 .../handler/LogviewerLogSearchHandlerTest.java  |   3 +-
 .../daemon/logviewer/utils/LogCleanerTest.java  |   9 +-
 22 files changed, 722 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 07064db..fa963d2 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -12,6 +12,9 @@
 
 package org.apache.storm.scheduler;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class WorkerSlot {
     private final String nodeId;
     private final int port;
@@ -39,6 +42,10 @@ public class WorkerSlot {
         return getNodeId() + ":" + getPort();
     }
 
+    public List<Object> toList() {
+        return Arrays.asList(nodeId, (long) port);
+    }
+
     @Override
     public int hashCode() {
         return nodeId.hashCode() + 13 * ((Integer) port).hashCode();

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index c401f60..a096217 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,9 +18,9 @@
 
 package org.apache.storm.daemon.nimbus;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -45,6 +46,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +55,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
@@ -181,6 +184,8 @@ import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.base.Strings;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
+import org.apache.storm.shade.com.google.common.collect.MapDifference;
+import org.apache.storm.shade.com.google.common.collect.Maps;
 import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
 import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
@@ -251,10 +256,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls");
     private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls");
     private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls");
-    private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms",
-                                                                                                   new ExponentiallyDecayingReservoir());
     private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter(
         "nimbus:num-getOwnerResourceSummaries-calls");
+    //Timer
+    private static final Timer fileUploadDuration = StormMetricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
+    private static final Timer schedulingDuration = StormMetricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
+    //Scheduler histogram
+    private static final Histogram numAddedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
+    private static final Histogram numAddedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling");
+    private static final Histogram numRemovedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling");
+    private static final Histogram numRemovedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling");
+    private static final Histogram numNetExecIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling");
+    private static final Histogram numNetSlotIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling");
     // END Metrics
     private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
     private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
@@ -411,6 +424,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final StormTimer timer;
     private final IScheduler scheduler;
     private final IScheduler underlyingScheduler;
+    //Metrics related
+    private final AtomicReference<Long> schedulingStartTimeNs = new AtomicReference<>(null);
+    private final AtomicLong longestSchedulingTime = new AtomicLong();
+
     private final ILeaderElector leaderElector;
     private final AssignmentDistributionService assignmentsDistributer;
     private final AtomicReference<Map<String, String>> idToSchedStatus;
@@ -550,6 +567,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             });
     }
 
+    //Not symmetric difference. Performing A.entrySet() - B.entrySet()
     private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
         Map<K, V> ret = new HashMap<>();
         for (Entry<? extends K, ? extends V> entry : second.entrySet()) {
@@ -689,26 +707,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
      * @return {topology-id -> {executor [node port]}} mapping
      */
     private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort(
-        Map<String, SchedulerAssignment> schedAssignments) {
+        Map<String, SchedulerAssignment> schedAssignments, List<String> assignedTopologyIds) {
         Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>();
         for (Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) {
             Map<List<Long>, List<Object>> execToNodePort = new HashMap<>();
             for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) {
                 ExecutorDetails exec = execAndNodePort.getKey();
                 WorkerSlot slot = execAndNodePort.getValue();
-
-                List<Long> listExec = new ArrayList<>(2);
-                listExec.add((long) exec.getStartTask());
-                listExec.add((long) exec.getEndTask());
-
-                List<Object> nodePort = new ArrayList<>(2);
-                nodePort.add(slot.getNodeId());
-                nodePort.add((long) slot.getPort());
-
-                execToNodePort.put(listExec, nodePort);
+                execToNodePort.put(exec.toList(), slot.toList());
             }
             ret.put(schedEntry.getKey(), execToNodePort);
         }
+        for (String id : assignedTopologyIds) {
+            ret.putIfAbsent(id, null);
+        }
         return ret;
     }
 
@@ -735,39 +747,95 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
 
-    private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort(
-        Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) {
-        Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments);
-        // Print some useful information
-        if (existingAssignments != null && !existingAssignments.isEmpty()) {
-            for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) {
-                String topoId = entry.getKey();
-                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
-                Assignment assignment = existingAssignments.get(topoId);
-                if (assignment == null) {
-                    continue;
+    private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments,
+                                           Map<String, Assignment> newAssignments) {
+        assert existingAssignments != null && newAssignments != null;
+        boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty();
+        long numRemovedExec = 0;
+        long numRemovedSlot = 0;
+        long numAddedExec   = 0;
+        long numAddedSlot   = 0;
+        if (existingAssignments.isEmpty()) {
+            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
+                final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+                final long count = new HashSet<>(execToPort.values()).size();
+                LOG.info("Assigning {} to {} slots", entry.getKey(), count);
+                LOG.info("Assign executors: {}", execToPort.keySet());
+                numAddedSlot += count;
+                numAddedExec += execToPort.size();
+            }
+        } else if (newAssignments.isEmpty()) {
+            for (Entry<String, Assignment> entry : existingAssignments.entrySet()) {
+                final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+                final long count = new HashSet<>(execToPort.values()).size();
+                LOG.info("Removing {} from {} slots", entry.getKey(), count);
+                LOG.info("Remove executors: {}", execToPort.keySet());
+                numRemovedSlot += count;
+                numRemovedExec += execToPort.size();
+            }
+        } else {
+            MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments);
+            if (anyChanged = !difference.areEqual()) {
+                for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) {
+                    final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+                    final long count = new HashSet<>(execToPort.values()).size();
+                    LOG.info("Removing {} from {} slots", entry.getKey(), count);
+                    LOG.info("Remove executors: {}", execToPort.keySet());
+                    numRemovedSlot += count;
+                    numRemovedExec += execToPort.size();
                 }
-                Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port();
-                Map<List<Long>, List<Object>> reassigned = new HashMap<>();
-                for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
-                    NodeInfo oldAssigned = old.get(execAndNodePort.getKey());
-                    String node = (String) execAndNodePort.getValue().get(0);
-                    Long port = (Long) execAndNodePort.getValue().get(1);
-                    if (oldAssigned == null || !oldAssigned.get_node().equals(node)
-                        || !port.equals(oldAssigned.get_port_iterator().next())) {
-                        reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue());
-                    }
+                for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) {
+                    final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port();
+                    final long count = new HashSet<>(execToPort.values()).size();
+                    LOG.info("Assigning {} to {} slots", entry.getKey(), count);
+                    LOG.info("Assign executors: {}", execToPort.keySet());
+                    numAddedSlot += count;
+                    numAddedExec += execToPort.size();
                 }
+                for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) {
+                    final Map<List<Long>, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port();
+                    final Set<NodeInfo> slots = new HashSet<>(execToSlot.values());
+                    LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size());
+                    LOG.info("Reassign executors: {}", execToSlot.keySet());
+
+                    final Map<List<Long>, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port();
+
+                    long commonExecCount = 0;
+                    Set<NodeInfo> commonSlots = new HashSet<>(execToSlot.size());
+                    for (Entry<List<Long>, NodeInfo> execEntry : execToSlot.entrySet()) {
+                        if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) {
+                            commonExecCount++;
+                            commonSlots.add(execEntry.getValue());
+                        }
+                    }
+                    long commonSlotCount = commonSlots.size();
 
-                if (!reassigned.isEmpty()) {
-                    int count = (new HashSet<>(execToNodePort.values())).size();
-                    Set<List<Long>> reExecs = reassigned.keySet();
-                    LOG.info("Reassigning {} to {} slots", topoId, count);
-                    LOG.info("Reassign executors: {}", reExecs);
+                    //Treat reassign as remove and add
+                    numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount;
+                    numRemovedExec += oldExecToSlot.size() - commonExecCount;
+                    numAddedSlot += slots.size() - commonSlotCount;
+                    numAddedExec += execToSlot.size() - commonExecCount;
                 }
             }
+            LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet());
         }
-        return ret;
+        numAddedExecPerScheduling.update(numAddedExec);
+        numAddedSlotPerScheduling.update(numAddedSlot);
+        numRemovedExecPerScheduling.update(numRemovedExec);
+        numRemovedSlotPerScheduling.update(numRemovedSlot);
+        numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec);
+        numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot);
+
+        if (anyChanged) {
+            LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
+            nodeIdToResources.get().forEach((id, node) ->
+                LOG.info(
+                    "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
+                        + "CPU: {}, Available CPU: {}, fragmented: {}",
+                    id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
+                    node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
+        }
+        return anyChanged;
     }
 
     private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>,
@@ -780,7 +848,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             key.add(ni.get_node());
             key.add(ni.get_port_iterator().next());
             List<List<Long>> value = new ArrayList<>(entry.getValue());
-            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+            value.sort(Comparator.comparing(a -> a.get(0)));
             slotAssigned.put(key, value);
         }
         HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() :
@@ -788,7 +856,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>();
         for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) {
             List<List<Long>> value = new ArrayList<>(entry.getValue());
-            value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+            value.sort(Comparator.comparing(a -> a.get(0)));
             newSlotAssigned.put(entry.getKey(), value);
         }
         Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned);
@@ -1217,7 +1285,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             return allNodeHost;
         } else {
             // rebalance
-            Map<String, String> ret = new HashMap();
+            Map<String, String> ret = new HashMap<>();
             for (Map.Entry<List<Long>, NodeInfo> entry : newExecutorNodePort.entrySet()) {
                 NodeInfo newNodeInfo = entry.getValue();
                 NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey());
@@ -1984,11 +2052,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf);
         cluster.setStatusMap(idToSchedStatus.get());
 
-        long beforeSchedule = System.currentTimeMillis();
+        schedulingStartTimeNs.set(Time.nanoTime());
         scheduler.schedule(topologies, cluster);
-        long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule;
-        LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size());
-        scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+        //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge
+        final Long startTime = schedulingStartTimeNs.getAndSet(null);
+        long elapsed = Time.nanoTime() - startTime;
+        longestSchedulingTime.accumulateAndGet(elapsed, Math::max);
+        schedulingDuration.update(elapsed, TimeUnit.NANOSECONDS);
+        LOG.debug("Scheduling took {} ms for {} topologies", elapsed, topologies.getTopologies().size());
 
         //merge with existing statuses
         idToSchedStatus.set(Utils.merge(idToSchedStatus.get(), cluster.getStatusMap()));
@@ -2131,17 +2202,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
         }
         // make the new assignments for topologies
-        Map<String, SchedulerAssignment> newSchedulerAssignments = null;
         synchronized (schedLock) {
-            newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
+            Map<String, SchedulerAssignment> newSchedulerAssignments =
+                computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
 
             Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
-                computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
-            for (String id : assignedTopologyIds) {
-                if (!topologyToExecutorToNodePort.containsKey(id)) {
-                    topologyToExecutorToNodePort.put(id, null);
-                }
-            }
+                computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
             Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
                 computeTopoToNodePortToResources(newSchedulerAssignments);
             int nowSecs = Time.currentTimeSecs();
@@ -2154,14 +2220,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 if (execToNodePort == null) {
                     execToNodePort = new HashMap<>();
                 }
-                Assignment existingAssignment = existingAssignments.get(topoId);
                 Set<String> allNodes = new HashSet<>();
-                if (execToNodePort != null) {
-                    for (List<Object> nodePort : execToNodePort.values()) {
-                        allNodes.add((String) nodePort.get(0));
-                    }
+                for (List<Object> nodePort : execToNodePort.values()) {
+                    allNodes.add((String) nodePort.get(0));
                 }
                 Map<String, String> allNodeHost = new HashMap<>();
+                Assignment existingAssignment = existingAssignments.get(topoId);
                 if (existingAssignment != null) {
                     allNodeHost.putAll(existingAssignment.get_node_host());
                 }
@@ -2219,15 +2283,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 newAssignments.put(topoId, newAssignment);
             }
 
-            if (!newAssignments.equals(existingAssignments)) {
+            boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments);
+            if (assignmentChanged) {
                 LOG.debug("RESETTING id->resources and id->worker-resources cache!");
-                LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu());
-                nodeIdToResources.get().forEach((id, node) ->
-                                                    LOG.info(
-                                                        "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-                                                        + "CPU: {}, Available CPU: {}, fragmented: {}",
-                                                        id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-                                                        node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node)));
                 idToResources.set(new HashMap<>());
                 idToWorkerResources.set(new HashMap<>());
             }
@@ -2826,21 +2884,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 .parallelStream()
                 .mapToDouble(SupervisorResources::getTotalCpu)
                 .sum());
-
+            StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+                //We want to update longest scheduling time in real time in case scheduler get stuck
+                // Get current time before startTime to avoid potential race with scheduler's Timer
+                Long currTime = Time.nanoTime();
+                Long startTime = schedulingStartTimeNs.get();
+                return TimeUnit.NANOSECONDS.toMillis(startTime == null ?
+                        longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get()));
+            });
+            StormMetricsRegistry.registerMeter("nimbus:num-launched").mark();
             StormMetricsRegistry.startMetricsReporters(conf);
 
-            if (clusterConsumerExceutors != null) {
-                timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
-                                        () -> {
-                                            try {
-                                                if (isLeader()) {
-                                                    sendClusterMetricsToExecutors();
-                                                }
-                                            } catch (Exception e) {
-                                                throw new RuntimeException(e);
+            timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+                                    () -> {
+                                        try {
+                                            if (isLeader()) {
+                                                sendClusterMetricsToExecutors();
                                             }
-                                        });
-            }
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    });
         } catch (Exception e) {
             if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                 throw e;
@@ -3689,7 +3753,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             beginFileUploadCalls.mark();
             checkAuthorization(null, null, "fileUpload");
             String fileloc = getInbox() + "/stormjar-" + Utils.uuid() + ".jar";
-            uploaders.put(fileloc, Channels.newChannel(new FileOutputStream(fileloc)));
+            uploaders.put(fileloc, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(fileloc)), fileUploadDuration));
             LOG.info("Uploading file from client to {}", fileloc);
             return fileloc;
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 90d68dc..4619aeb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -114,20 +114,17 @@ public class SupervisorUtils {
      * @param conf
      * @return
      *
-     * @throws Exception
      */
-    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
+    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) {
         return _instance.readWorkerHeartbeatsImpl(conf);
     }
 
     /**
-     * get worker heartbeat by workerId
+     * get worker heartbeat by workerId.
      *
      * @param conf
      * @param workerId
      * @return
-     *
-     * @throws IOException
      */
     private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) {
         return _instance.readWorkerHeartbeatImpl(conf, workerId);
@@ -137,7 +134,7 @@ public class SupervisorUtils {
         return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
     }
 
-    public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception {
+    public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
         Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
         Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 952d8d9..f12713b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -52,8 +52,7 @@ public abstract class LocallyCachedBlob {
     private long lastUsed = Time.currentTimeMillis();
     private CompletableFuture<Void> doneUpdating = null;
 
-    private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram(
-            "supervisor:blob-fetching-rate-MB/s", new ExponentiallyDecayingReservoir());
+    private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s");
 
     /**
      * Create a new LocallyCachedBlob.

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index 602f53e..ea8867e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -18,18 +18,21 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Reservoir;
 import com.codahale.metrics.Timer;
 
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StormMetricsRegistry extends MetricRegistry {
-    private static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry();
+    @VisibleForTesting
+    static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry();
     private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
 
     private StormMetricsRegistry() {/*Singleton pattern*/}
@@ -54,6 +57,25 @@ public class StormMetricsRegistry extends MetricRegistry {
         REGISTRY.register(name, meter);
     }
 
+    public static void registerMetricSet(MetricSet metrics) {
+        REGISTRY.registerAll(metrics);
+    }
+
+    public static void unregisterMetricSet(MetricSet metrics) {
+        unregisterMetricSet(null, metrics);
+    }
+
+    public static void unregisterMetricSet(String prefix, MetricSet metrics) {
+        for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
+            final String name = name(prefix, entry.getKey());
+            if (entry.getValue() instanceof MetricSet) {
+                unregisterMetricSet(name, (MetricSet) entry.getValue());
+            } else {
+                REGISTRY.remove(name);
+            }
+        }
+    }
+
     public static Timer registerTimer(String name) {
         return REGISTRY.register(name, new Timer());
     }
@@ -84,6 +106,7 @@ public class StormMetricsRegistry extends MetricRegistry {
      */
     @Override
     public <T extends Metric> T register(final String name, T metric) throws IllegalArgumentException {
+        assert !(metric instanceof MetricSet);
         try {
             return super.register(name, metric);
         } catch (IllegalArgumentException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index e54509e..3783fdb 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -19,6 +19,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import javax.security.auth.Subject;
+
+import com.codahale.metrics.Meter;
 import org.apache.commons.io.IOUtils;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
@@ -29,6 +31,7 @@ import org.apache.storm.daemon.nimbus.TopoCache;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.shade.com.google.common.base.Joiner;
 import org.apache.storm.shade.com.google.common.collect.Sets;
@@ -45,6 +48,8 @@ import org.slf4j.LoggerFactory;
  * A callback function when nimbus gains leadership.
  */
 public class LeaderListenerCallback {
+    private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership");
+    private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership");
     private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class);
     private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
     private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
@@ -82,6 +87,7 @@ public class LeaderListenerCallback {
      * Invoke when gains leadership.
      */
     public void leaderCallBack() {
+        numGainedLeader.mark();
         //set up nimbus-info to zk
         setUpNimbusInfo(acls);
         //sync zk assignments/id-info to local
@@ -131,6 +137,7 @@ public class LeaderListenerCallback {
      * Invoke when lost leadership.
      */
     public void notLeaderCallback() {
+        numLostLeader.mark();
         tc.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 3f48669..d014236 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -145,11 +145,7 @@ public class Cluster implements ISchedulingState {
             String nodeId = entry.getKey();
             SupervisorDetails supervisor = entry.getValue();
             String host = supervisor.getHost();
-            List<String> ids = hostToId.get(host);
-            if (ids == null) {
-                ids = new ArrayList<>();
-                hostToId.put(host, ids);
-            }
+            List<String> ids = hostToId.computeIfAbsent(host, k -> new ArrayList<>());
             ids.add(nodeId);
         }
         this.conf = conf;
@@ -173,11 +169,7 @@ public class Cluster implements ISchedulingState {
             for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
                 String hostName = entry.getKey();
                 String rack = entry.getValue();
-                List<String> nodesForRack = this.networkTopography.get(rack);
-                if (nodesForRack == null) {
-                    nodesForRack = new ArrayList<>();
-                    this.networkTopography.put(rack, nodesForRack);
-                }
+                List<String> nodesForRack = this.networkTopography.computeIfAbsent(rack, k -> new ArrayList<>());
                 nodesForRack.add(hostName);
             }
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
index 855cc96..18de717 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.scheduler;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class ExecutorDetails {
     public final int startTask;
     public final int endTask;
@@ -35,9 +38,13 @@ public class ExecutorDetails {
         return endTask;
     }
 
+    public List<Long> toList() {
+        return Arrays.asList((long) startTask, (long) endTask);
+    }
+
     @Override
     public boolean equals(Object other) {
-        if (other == null || !(other instanceof ExecutorDetails)) {
+        if (!(other instanceof ExecutorDetails)) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
new file mode 100644
index 0000000..5d9b3e4
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.metric;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static org.junit.jupiter.api.Assertions.*;
+
+class StormMetricsRegistryTest {
+    private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class);
+
+    private static final String OUTER_METER = "outerMeter";
+    private static final String INNER_SET = "innerSet";
+    private static final String OUTER_TIMER = "outerTimer";
+    private static final String INNER_METER = "innerMeter";
+    private static final String INNER_TIMER = "innerTimer";
+    private static final MetricSet OUTER = newMetricSetInstance();
+
+    @Test
+    void registerMetricSet() {
+        Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER));
+
+        LOG.info("register outer set");
+        StormMetricsRegistry.registerMetricSet(OUTER);
+        assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
+        assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
+        assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
+            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
+
+        assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER),
+            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+        assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+
+        //Ensure idempotency
+        LOG.info("twice register outer set");
+        MetricSet newOuter = newMetricSetInstance();
+        StormMetricsRegistry.registerMetricSet(newOuter);
+        assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER));
+        assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER));
+        assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER),
+            StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER)));
+        assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER)));
+
+        LOG.info("name collision");
+        assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0));
+    }
+
+    @Test
+    void unregisterMetricSet() {
+        StormMetricsRegistry.registerMetricSet(OUTER);
+        StormMetricsRegistry.unregisterMetricSet(OUTER);
+        assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty());
+
+    }
+
+    private static MetricSet newMetricSetInstance() {
+        return new MetricSet() {
+            private final MetricSet inner = new MetricSet() {
+                private final Map<String, Metric> map = new HashMap<>();
+
+                {
+                    map.put(INNER_METER, new Meter());
+                    map.put(INNER_TIMER, new Timer());
+                }
+
+                @Override
+                public Map<String, Metric> getMetrics() {
+                    return map;
+                }
+            };
+            private final Map<String, Metric> outerMap = new HashMap<>();
+
+            {
+                outerMap.put(OUTER_METER, new Meter());
+                outerMap.put(INNER_SET, inner);
+                outerMap.put(OUTER_TIMER, new Timer());
+            }
+
+            @Override
+            public Map<String, Metric> getMetrics() {
+                return outerMap;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
index 07ac14b..07b971c 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java
@@ -21,6 +21,8 @@ package org.apache.storm.daemon.logviewer;
 import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.File;
@@ -31,6 +33,7 @@ import java.util.Map;
 
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
 import org.apache.storm.daemon.logviewer.utils.LogCleaner;
 import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication;
@@ -126,6 +129,7 @@ public class LogviewerServer implements AutoCloseable {
     void start() throws Exception {
         LOG.info("Starting Logviewer...");
         if (httpServer != null) {
+            StormMetricsRegistry.registerMetricSet(ExceptionMeters::getMetrics);
             httpServer.start();
         }
     }
@@ -165,7 +169,7 @@ public class LogviewerServer implements AutoCloseable {
 
         try (LogviewerServer server = new LogviewerServer(conf);
              LogCleaner logCleaner = new LogCleaner(conf, workerLogs, directoryCleaner, logRootDir)) {
-            Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+            Utils.addShutdownHookWithForceKillIn1Sec(server::close);
             logCleaner.start();
             StormMetricsRegistry.startMetricsReporters(conf);
             server.start();

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
index 32e79eb..089d965 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
@@ -37,11 +37,13 @@ import static java.util.stream.Collectors.toCollection;
 import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang.StringEscapeUtils.escapeHtml;
 
+import com.codahale.metrics.Meter;
 import j2html.tags.DomContent;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
@@ -61,6 +63,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.daemon.logviewer.LogviewerConstant;
 import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
 import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
 import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
@@ -68,11 +71,13 @@ import org.apache.storm.daemon.ui.InvalidRequestException;
 import org.apache.storm.daemon.ui.UIHelpers;
 import org.apache.storm.daemon.utils.StreamUtil;
 import org.apache.storm.daemon.utils.UrlBuilder;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.jooq.lambda.Unchecked;
 
 public class LogviewerLogPageHandler {
+    private static final Meter numPageRead = StormMetricsRegistry.registerMeter("logviewer:num-page-read");
     private final String logRoot;
     private final String daemonLogRoot;
     private final WorkerLogs workerLogs;
@@ -152,7 +157,7 @@ public class LogviewerLogPageHandler {
         List<String> files;
         if (fileResults != null) {
             files = fileResults.stream()
-                    .map(file -> WorkerLogs.getTopologyPortWorkerLog(file))
+                    .map(WorkerLogs::getTopologyPortWorkerLog)
                     .sorted().collect(toList());
         } else {
             files = new ArrayList<>();
@@ -162,11 +167,12 @@ public class LogviewerLogPageHandler {
     }
 
     /**
-     * Provides a worker log file to view.
+     * Provides a worker log file to view, starting from the specified position
+     * or default starting position of the most recent page.
      *
      * @param fileName file to view
-     * @param start start offset, can be null
-     * @param length length to read in this page, can be null
+     * @param start start offset, or null if the most recent page is desired
+     * @param length length to read in this page, or null if default page length is desired
      * @param grep search string if request is a result of the search, can be null
      * @param user username
      * @return HTML view page of worker log
@@ -179,7 +185,6 @@ public class LogviewerLogPageHandler {
 
             File file = new File(rootDir, fileName).getCanonicalFile();
             String path = file.getCanonicalPath();
-            boolean isZipFile = path.endsWith(".gz");
             File topoDir = file.getParentFile().getParentFile();
 
             if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) {
@@ -193,24 +198,21 @@ public class LogviewerLogPageHandler {
                     throw e.getCause();
                 }
 
-                List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
-                        .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList());
-
-                List<String> reorderedFilesStr = new ArrayList<>();
-                reorderedFilesStr.addAll(filesStrWithoutFileParam);
+                List<String> reorderedFilesStr = logFiles.stream()
+                        .map(WorkerLogs::getTopologyPortWorkerLog)
+                        .filter(fileStr -> !StringUtils.equals(fileName, fileStr))
+                        .collect(toList());
                 reorderedFilesStr.add(fileName);
 
                 length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-                String logString;
-                if (isTxtFile(fileName)) {
-                    logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
-                } else {
-                    logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+                final boolean isZipFile = path.endsWith(".gz");
+                long fileLength = getFileLength(file, isZipFile);
+                if (start == null) {
+                    start = Long.valueOf(fileLength - length).intValue();
                 }
 
-                long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
-                start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+                String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) :
+                    escapeHtml("This is a binary file and cannot display! You may download the full file.");
 
                 List<DomContent> bodyContents = new ArrayList<>();
                 if (StringUtils.isNotEmpty(grep)) {
@@ -254,8 +256,8 @@ public class LogviewerLogPageHandler {
      * Provides a daemon log file to view.
      *
      * @param fileName file to view
-     * @param start start offset, can be null
-     * @param length length to read in this page, can be null
+     * @param start start offset, or null if the most recent page is desired
+     * @param length length to read in this page, or null if default page length is desired
      * @param grep search string if request is a result of the search, can be null
      * @param user username
      * @return HTML view page of daemon log
@@ -265,7 +267,6 @@ public class LogviewerLogPageHandler {
         String rootDir = daemonLogRoot;
         File file = new File(rootDir, fileName).getCanonicalFile();
         String path = file.getCanonicalPath();
-        boolean isZipFile = path.endsWith(".gz");
 
         if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) {
             // all types of files included
@@ -273,24 +274,21 @@ public class LogviewerLogPageHandler {
                     .filter(File::isFile)
                     .collect(toList());
 
-            List<String> filesStrWithoutFileParam = logFiles.stream()
-                    .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList());
-
-            List<String> reorderedFilesStr = new ArrayList<>();
-            reorderedFilesStr.addAll(filesStrWithoutFileParam);
+            List<String> reorderedFilesStr = logFiles.stream()
+                    .map(File::getName)
+                    .filter(fName -> !StringUtils.equals(fileName, fName))
+                    .collect(toList());
             reorderedFilesStr.add(fileName);
 
             length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-            String logString;
-            if (isTxtFile(fileName)) {
-                logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length));
-            } else {
-                logString = escapeHtml("This is a binary file and cannot display! You may download the full file.");
+            final boolean isZipFile = path.endsWith(".gz");
+            long fileLength = getFileLength(file, isZipFile);
+            if (start == null) {
+                start = Long.valueOf(fileLength - length).intValue();
             }
 
-            long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length();
-            start = start != null ? start : Long.valueOf(fileLength - length).intValue();
+            String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) :
+                    escapeHtml("This is a binary file and cannot display! You may download the full file.");
 
             List<DomContent> bodyContents = new ArrayList<>();
             if (StringUtils.isNotEmpty(grep)) {
@@ -323,6 +321,18 @@ public class LogviewerLogPageHandler {
         }
     }
 
+    private long getFileLength(File file, boolean isZipFile) throws IOException {
+        try {
+            return isZipFile ? ServerUtils.zipFileSize(file) : file.length();
+        } catch (FileNotFoundException e) {
+            ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+            throw e;
+        } catch (IOException e) {
+            ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+            throw e;
+        }
+    }
+
     private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) {
         List<DomContent> finalBodyContents = new ArrayList<>();
 
@@ -426,17 +436,8 @@ public class LogviewerLogPageHandler {
         return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled"));
     }
 
-    private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException {
-        boolean isZipFile = path.endsWith(".gz");
-        long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
-        long skip = fileLength - tail;
-        return pageFile(path, Long.valueOf(skip).intValue(), tail);
-    }
-
-    private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException {
-        boolean isZipFile = path.endsWith(".gz");
-        long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length();
-
+    private String pageFile(String path, boolean isZipFile, long fileLength, Integer start, Integer readLength)
+        throws IOException, InvalidRequestException {
         try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path);
              ByteArrayOutputStream output = new ByteArrayOutputStream()) {
             if (start >= fileLength) {
@@ -447,8 +448,8 @@ public class LogviewerLogPageHandler {
             }
 
             byte[] buffer = new byte[1024];
-            while (output.size() < length) {
-                int size = input.read(buffer, 0, Math.min(1024, length - output.size()));
+            while (output.size() < readLength) {
+                int size = input.read(buffer, 0, Math.min(1024, readLength - output.size()));
                 if (size > 0) {
                     output.write(buffer, 0, size);
                 } else {
@@ -456,7 +457,14 @@ public class LogviewerLogPageHandler {
                 }
             }
 
+            numPageRead.mark();
             return output.toString();
+        } catch (FileNotFoundException e) {
+            ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+            throw e;
+        } catch (IOException e) {
+            ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+            throw e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
index a26396c..bcde077 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
@@ -25,6 +25,10 @@ import static org.apache.storm.daemon.utils.ListFunctionalSupport.last;
 import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest;
 import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -46,6 +50,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 import java.util.zip.GZIPInputStream;
 
 import javax.ws.rs.core.Response;
@@ -56,12 +61,14 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.common.JsonResponseBuilder;
 import org.apache.storm.daemon.logviewer.LogviewerConstant;
 import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner;
+import org.apache.storm.daemon.logviewer.utils.ExceptionMeters;
 import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
 import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
 import org.apache.storm.daemon.ui.InvalidRequestException;
 import org.apache.storm.daemon.utils.StreamUtil;
 import org.apache.storm.daemon.utils.UrlBuilder;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -71,6 +78,9 @@ import org.slf4j.LoggerFactory;
 
 public class LogviewerLogSearchHandler {
     private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
+    private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
+    private static final Histogram numFileScanned = StormMetricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search");
+    private static final Meter numSearchRequestNoResult = StormMetricsRegistry.registerMeter("logviewer:num-search-request-no-result");
 
     public static final int GREP_MAX_SEARCH_SIZE = 1024;
     public static final int GREP_BUF_SIZE = 2048;
@@ -124,6 +134,8 @@ public class LogviewerLogSearchHandler {
     public Response searchLogFile(String fileName, String user, boolean isDaemon, String search,
                                   String numMatchesStr, String offsetStr, String callback, String origin)
             throws IOException, InvalidRequestException {
+        boolean noResult = true;
+
         String rootDir = isDaemon ? daemonLogRoot : logRoot;
         File file = new File(rootDir, fileName).getCanonicalFile();
         Response response;
@@ -136,7 +148,9 @@ public class LogviewerLogSearchHandler {
                     if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) {
                         Map<String, Object> entity = new HashMap<>();
                         entity.put("isDaemon", isDaemon ? "yes" : "no");
-                        entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt));
+                        Map<String, Object> res = substringSearch(file, search, isDaemon, numMatchesInt, offsetInt);
+                        entity.putAll(res);
+                        noResult = ((List) res.get("matches")).isEmpty();
 
                         response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin);
                     } else {
@@ -159,16 +173,20 @@ public class LogviewerLogSearchHandler {
             response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build();
         }
 
+        if (noResult) {
+            numSearchRequestNoResult.mark();
+        }
         return response;
     }
 
     /**
-     * Deep search across worker log files in a topology.
+     * Advanced search across worker log files in a topology.
      *
      * @param topologyId topology ID
      * @param user username
      * @param search search string
-     * @param numMatchesStr the count of maximum matches
+     * @param numMatchesStr the count of maximum matches. Note that this number is with respect to
+     *                      each port, not to each log or each search request
      * @param portStr worker port, null or '*' if the request wants to search from all worker logs
      * @param fileOffsetStr index (offset) of the log files
      * @param offsetStr start offset for log file
@@ -180,6 +198,9 @@ public class LogviewerLogSearchHandler {
     public Response deepSearchLogsForTopology(String topologyId, String user, String search,
                                               String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr,
                                               Boolean searchArchived, String callback, String origin) {
+        int numMatchedFiles = 0;
+        int numScannedFiles = 0;
+
         String rootDir = logRoot;
         Object returnValue;
         File topologyDir = new File(rootDir, topologyId);
@@ -200,24 +221,24 @@ public class LogviewerLogSearchHandler {
 
             if (StringUtils.isEmpty(portStr) || portStr.equals("*")) {
                 // check for all ports
-                List<List<File>> filteredLogs = portDirs.stream()
-                        .map(portDir -> logsForPort(user, portDir))
-                        .filter(logs -> logs != null && !logs.isEmpty())
-                        .collect(toList());
+                Stream<List<File>> portsOfLogs = portDirs.stream()
+                    .map(portDir -> logsForPort(user, portDir))
+                    .filter(logs -> logs != null && !logs.isEmpty());
 
-                if (BooleanUtils.isTrue(searchArchived)) {
-                    returnValue = filteredLogs.stream()
-                            .map(fl -> findNMatches(fl, numMatches, 0, 0, search))
-                            .collect(toList());
-                } else {
-                    returnValue = filteredLogs.stream()
-                            .map(fl -> Collections.singletonList(first(fl)))
-                            .map(fl -> findNMatches(fl, numMatches, 0, 0, search))
-                            .collect(toList());
+                if (BooleanUtils.isNotTrue(searchArchived)) {
+                    portsOfLogs = portsOfLogs.map(fl -> Collections.singletonList(first(fl)));
                 }
+
+                final List<Matched> matchedList = portsOfLogs
+                        .map(logs -> findNMatches(logs, numMatches, 0, 0, search))
+                        .collect(toList());
+                numMatchedFiles = matchedList.stream().mapToInt(match -> match.getMatches().size()).sum();
+                numScannedFiles = matchedList.stream().mapToInt(match -> match.openedFiles).sum();
+                returnValue = matchedList;
             } else {
                 int port = Integer.parseInt(portStr);
                 // check just the one port
+                @SuppressWarnings("unchecked")
                 List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
                         new ArrayList<>());
                 boolean containsPort = slotsPorts.stream()
@@ -232,17 +253,22 @@ public class LogviewerLogSearchHandler {
                         returnValue = new ArrayList<>();
                     } else {
                         List<File> filteredLogs = logsForPort(user, portDir);
-                        if (BooleanUtils.isTrue(searchArchived)) {
-                            returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
-                        } else {
-                            returnValue = findNMatches(Collections.singletonList(first(filteredLogs)),
-                                    numMatches, 0, offset, search);
+                        if (BooleanUtils.isNotTrue(searchArchived)) {
+                            filteredLogs = Collections.singletonList(first(filteredLogs));
+                            fileOffset = 0;
                         }
+                        returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search);
+                        numMatchedFiles = ((Matched) returnValue).getMatches().size();
+                        numScannedFiles = ((Matched) returnValue).openedFiles;
                     }
                 }
             }
         }
 
+        if (numMatchedFiles == 0) {
+            numDeepSearchNoResult.mark();
+        }
+        numFileScanned.update(numScannedFiles);
         return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin);
     }
 
@@ -271,26 +297,21 @@ public class LogviewerLogSearchHandler {
 
     private Map<String,Object> substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches,
                                                Integer startByteOffset) throws InvalidRequestException {
-        try {
-            if (StringUtils.isEmpty(searchString)) {
-                throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
-            }
-            if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
-                throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
-                    + GREP_MAX_SEARCH_SIZE);
-            }
+        if (StringUtils.isEmpty(searchString)) {
+            throw new IllegalArgumentException("Precondition fails: search string should not be empty.");
+        }
+        if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) {
+            throw new IllegalArgumentException("Precondition fails: the length of search string should be less than "
+                + GREP_MAX_SEARCH_SIZE);
+        }
 
-            boolean isZipFile = file.getName().endsWith(".gz");
-            try (InputStream fis = Files.newInputStream(file.toPath());
-                InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
-                BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
+        boolean isZipFile = file.getName().endsWith(".gz");
+        try (InputStream fis = Files.newInputStream(file.toPath())) {
+            try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis;
+                 BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) {
 
-                int fileLength;
-                if (isZipFile) {
-                    fileLength = (int) ServerUtils.zipFileSize(file);
-                } else {
-                    fileLength = (int) file.length();
-                }
+                //It's more likely to be a file read exception here, so we don't differentiate
+                int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file) : (int) file.length();
 
                 ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE);
                 final byte[] bufArray = buf.array();
@@ -311,7 +332,7 @@ public class LogviewerLogSearchHandler {
                 Arrays.fill(bufArray, (byte) 0);
 
                 int totalBytesRead = 0;
-                int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE));
+                int bytesRead = stream.read(bufArray, 0, Math.min(fileLength, GREP_BUF_SIZE));
                 buf.limit(bytesRead);
                 totalBytesRead += bytesRead;
 
@@ -335,7 +356,7 @@ public class LogviewerLogSearchHandler {
                         // buffer on the previous read.
                         final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length;
 
-                        totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength);
+                        totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength);
                         if (totalBytesRead < 0) {
                             throw new InvalidRequestException("Cannot search past the end of the file");
                         }
@@ -358,8 +379,14 @@ public class LogviewerLogSearchHandler {
                     }
                 }
                 return ret;
+            } catch (UnknownHostException | UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            } catch (IOException e) {
+                ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark();
+                throw new RuntimeException(e);
             }
         } catch (IOException e) {
+            ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
             throw new RuntimeException(e);
         }
     }
@@ -388,32 +415,46 @@ public class LogviewerLogSearchHandler {
         }
     }
 
+    /**
+     * Find the first N matches of target string in files.
+     * @param logs all candidate log files to search
+     * @param numMatches number of matches expected
+     * @param fileOffset number of log files to skip initially
+     * @param startByteOffset number of byte to be ignored in each log file
+     * @param targetStr searched string
+     * @return all matched results
+     */
     @VisibleForTesting
-    Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int offset, String search) {
+    Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) {
         logs = drop(logs, fileOffset);
+        LOG.debug("{} files to scan", logs.size());
 
         List<Map<String, Object>> matches = new ArrayList<>();
         int matchCount = 0;
+        int scannedFiles = 0;
 
         while (true) {
             if (logs.isEmpty()) {
+                //fileOffset = one past last scanned file
                 break;
             }
 
             File firstLog = logs.get(0);
-            Map<String, Object> theseMatches;
+            Map<String, Object> matchInLog;
             try {
                 LOG.debug("Looking through {}", firstLog);
-                theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset);
+                matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset);
+                scannedFiles++;
             } catch (InvalidRequestException e) {
                 LOG.error("Can't search past end of file.", e);
-                theseMatches = new HashMap<>();
+                matchInLog = new HashMap<>();
             }
 
             String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog);
 
+            //This section simply put the formatted log filename and corresponding port in the matching.
             final List<Map<String, Object>> newMatches = new ArrayList<>(matches);
-            Map<String, Object> currentFileMatch = new HashMap<>(theseMatches);
+            Map<String, Object> currentFileMatch = new HashMap<>(matchInLog);
             currentFileMatch.put("fileName", fileName);
             Path firstLogAbsPath;
             try {
@@ -424,27 +465,27 @@ public class LogviewerLogSearchHandler {
             currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
             newMatches.add(currentFileMatch);
 
-            int newCount = matchCount + ((List<?>)theseMatches.get("matches")).size();
-
-            //theseMatches is never empty! As guaranteed by the #get().size() method above
+            int newCount = matchCount + ((List<?>)matchInLog.get("matches")).size();
             if (newCount == matchCount) {
                 // matches and matchCount is not changed
                 logs = rest(logs);
-                offset = 0;
+                startByteOffset = 0;
                 fileOffset = fileOffset + 1;
             } else if (newCount >= numMatches) {
                 matches = newMatches;
+                //fileOffset = the index of last scanned file
                 break;
             } else {
                 matches = newMatches;
                 logs = rest(logs);
-                offset = 0;
+                startByteOffset = 0;
                 fileOffset = fileOffset + 1;
                 matchCount = newCount;
             }
         }
 
-        return new Matched(fileOffset, search, matches);
+        LOG.debug("scanned {} files", scannedFiles);
+        return new Matched(fileOffset, targetStr, matches, scannedFiles);
     }
 
 
@@ -502,8 +543,7 @@ public class LogviewerLogSearchHandler {
         return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes);
     }
 
-    private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file,
-                                 int fileLength) throws IOException {
+    private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException {
         byte[] bufArray = buf.array();
 
         // Copy the 2nd half of the buffer to the first half.
@@ -513,7 +553,7 @@ public class LogviewerLogSearchHandler {
         Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0);
 
         // Fill the 2nd half with new bytes from the stream.
-        int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE));
+        int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE));
         buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead);
         return totalBytesRead + bytesRead;
     }
@@ -693,18 +733,21 @@ public class LogviewerLogSearchHandler {
         private int fileOffset;
         private String searchString;
         private List<Map<String, Object>> matches;
+        @JsonIgnore
+        private final int openedFiles;
 
         /**
          * Constructor.
-         *
-         * @param fileOffset offset (index) of the files
+         *  @param fileOffset offset (index) of the files
          * @param searchString search string
          * @param matches map representing matched search result
+         * @param openedFiles number of files scanned, used for metrics only
          */
-        public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches) {
+        public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches, int openedFiles) {
             this.fileOffset = fileOffset;
             this.searchString = searchString;
             this.matches = matches;
+            this.openedFiles = openedFiles;
         }
 
         public int getFileOffset() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
new file mode 100644
index 0000000..9e0afd9
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+class DeletionMeta {
+    static final DeletionMeta EMPTY = new DeletionMeta(0, 0);
+
+    final long deletedSize;
+    final int deletedFiles;
+
+    DeletionMeta(long deletedSize, int deletedFiles) {
+        this.deletedSize = deletedSize;
+        this.deletedFiles = deletedFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
index 310bc8e..293b2be 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
@@ -60,7 +60,12 @@ public class DirectoryCleaner {
      * @return DirectoryStream
      */
     public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException {
-        return Files.newDirectoryStream(dir.toPath());
+        try {
+            return Files.newDirectoryStream(dir.toPath());
+        } catch (IOException e) {
+            ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+            throw e;
+        }
     }
 
     /**
@@ -74,11 +79,9 @@ public class DirectoryCleaner {
      * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs
      * @return number of files deleted
      */
-    public int deleteOldestWhileTooLarge(List<File> dirs,
-                        long quota, boolean forPerDir, Set<String> activeDirs) throws IOException {
+    public DeletionMeta deleteOldestWhileTooLarge(List<File> dirs,
+                                                  long quota, boolean forPerDir, Set<String> activeDirs) throws IOException {
         long totalSize = 0;
-        int deletedFiles = 0;
-
         for (File dir : dirs) {
             try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
                 for (Path path : stream) {
@@ -87,13 +90,14 @@ public class DirectoryCleaner {
                 }
             }
         }
-
         LOG.debug("totalSize: {} quota: {}", totalSize, quota);
         long toDeleteSize = totalSize - quota;
         if (toDeleteSize <= 0) {
-            return deletedFiles;
+            return DeletionMeta.EMPTY;
         }
 
+        int deletedFiles = 0;
+        long deletedSize = 0;
         // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root
         PriorityQueue<File> pq = new PriorityQueue<>(PQ_SIZE, (f1, f2) -> f1.lastModified() > f2.lastModified() ? -1 : 1);
         int round = 0;
@@ -134,6 +138,7 @@ public class DirectoryCleaner {
                         Utils.forceDelete(file.getPath());
                         LOG.info("Delete file: {}, size: {}, lastModified: {}", canonicalPath, fileSize, lastModified);
                         toDeleteSize -= fileSize;
+                        deletedSize += fileSize;
                         deletedFiles++;
                     } catch (IOException e) {
                         excluded.add(file);
@@ -157,7 +162,7 @@ public class DirectoryCleaner {
                     forPerDir ? "this directory" : "root directory", toDeleteSize * 1e-6);
             }
         }
-        return deletedFiles;
+        return new DeletionMeta(deletedSize, deletedFiles);
     }
 
     private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException {
@@ -186,7 +191,11 @@ public class DirectoryCleaner {
                     break;
                 }
             }
+        } catch (IOException e) {
+            ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
+            throw e;
         }
         return files;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
new file mode 100644
index 0000000..81aa222
--- /dev/null
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.daemon.logviewer.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum ExceptionMeters {
+    //Operation level IO Exceptions
+    NUM_FILE_OPEN_EXCEPTIONS("logviewer:num-file-open-exceptions"),
+    NUM_FILE_READ_EXCEPTIONS("logviewer:num-file-read-exceptions"),
+    NUM_FILE_REMOVAL_EXCEPTIONS("logviewer:num-file-removal-exceptions"),
+    NUM_FILE_DOWNLOAD_EXCEPTIONS("logviewer:num-file-download-exceptions"),
+    NUM_SET_PERMISSION_EXCEPTIONS("logviewer:num-set-permission-exceptions"),
+
+    //Routine level
+    NUM_CLEANUP_EXCEPTIONS("logviewer:num-other-cleanup-exceptions"),
+    NUM_READ_LOG_EXCEPTIONS("logviewer:num-read-log-exceptions"),
+    NUM_READ_DAEMON_LOG_EXCEPTIONS("logviewer:num-read-daemon-log-exceptions"),
+    NUM_LIST_LOG_EXCEPTIONS("logviewer:num-search-log-exceptions"),
+    NUM_LIST_DUMP_EXCEPTIONS("logviewer:num-list-dump-files-exceptions"),
+    NUM_DOWNLOAD_DUMP_EXCEPTIONS("logviewer:num-download-dump-exceptions"),
+    NUM_DOWNLOAD_LOG_EXCEPTIONS("logviewer:num-download-log-exceptions"),
+    NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS("logviewer:num-download-daemon-log-exceptions"),
+    NUM_SEARCH_EXCEPTIONS("logviewer:num-search-exceptions");
+
+    private static final Map<String, Metric> metrics = new HashMap<>();
+
+    static {
+        for (ExceptionMeters e : ExceptionMeters.values()) {
+            metrics.put(e.name, e.meter);
+        }
+    }
+
+    private final String name;
+    private final Meter meter;
+
+    public static Map<String, Metric> getMetrics() {
+        return metrics;
+    }
+
+    ExceptionMeters(String name) {
+        this.name = name;
+        meter = new Meter();
+    }
+
+    public void mark() {
+        this.meter.mark();
+    }
+}


Mime
View raw message