helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 09/10: TASK2.0: Add performance metrics to JobMonitor
Date Thu, 28 Mar 2019 19:31:53 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8ba4c9c19981fffe3c958c6851e2b8b8bf90bfbb
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:30:09 2019 -0700

    TASK2.0: Add performance metrics to JobMonitor
    
        We want to add more metrics to Task Framework so that the user could understand what's
going on in case of a slowdown, or get a general sense of how fast the workload is moving.
        Changelist:
        1. Add SubmissionToProcessDelay
        2. Add SubmissionToScheduleDelay
        3. Add ControllerInducedDelay (for testing)
        4. Add JobLatencyGauge
        5. Change regular metrics to Dynamic metrics in JobMonitor
        6. Add an integration test: TestTaskPerformanceMetrics
---
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  35 ++--
 .../apache/helix/monitoring/mbeans/JobMonitor.java | 220 +++++++++++++--------
 .../apache/helix/task/AbstractTaskDispatcher.java  | 159 ++++++++++++++-
 .../java/org/apache/helix/task/JobContext.java     |  37 +++-
 .../java/org/apache/helix/task/JobDispatcher.java  |  14 +-
 .../org/apache/helix/task/WorkflowContext.java     |   2 +-
 .../mbeans/TestTaskPerformanceMetrics.java         | 151 ++++++++++++++
 7 files changed, 508 insertions(+), 110 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 4616f9e..1d778ee 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -641,10 +641,22 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     jobType = preProcessJobMonitor(jobType);
     JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
     if (jobMonitor != null) {
-      jobMonitor.updateJobCounters(to, latency);
+      jobMonitor.updateJobMetricsWithLatency(to, latency);
     }
   }
 
+  /**
+   * TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because ClusterStatusMonitor
is
+   * getting too big.
+   * Returns the appropriate JobMonitor for the given type. If it does not exist, create
one and
+   * return it.
+   * @param jobType
+   * @return
+   */
+  public JobMonitor getJobMonitor(String jobType) {
+    return _perTypeJobMonitorMap.get(preProcessJobMonitor(jobType));
+  }
+
   private void updateJobGauges(String jobType, TaskState current) {
     // When first time for WorkflowRebalancer call, jobconfig may not ready.
     // Thus only check it for gauge.
@@ -662,13 +674,17 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
     synchronized (_perTypeJobMonitorMap) {
       if (!_perTypeJobMonitorMap.containsKey(jobType)) {
-        JobMonitor monitor = new JobMonitor(_clusterName, jobType);
+        String jobMonitorBeanName = getJobBeanName(jobType);
+        JobMonitor monitor = null;
         try {
-          registerJob(monitor);
-        } catch (MalformedObjectNameException e) {
+          monitor = new JobMonitor(_clusterName, jobType, getObjectName(jobMonitorBeanName));
+          monitor.register(); // Necessary for dynamic metrics
+        } catch (Exception e) {
           LOG.error("Failed to register job type : " + jobType, e);
         }
-        _perTypeJobMonitorMap.put(jobType, monitor);
+        if (monitor != null) {
+          _perTypeJobMonitorMap.put(jobType, monitor);
+        }
       }
     }
     return jobType;
@@ -789,17 +805,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     }
   }
 
-  private void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException {
-    String jobBeanName = getJobBeanName(jobMonitor.getJobType());
-    register(jobMonitor, getObjectName(jobBeanName));
-  }
-
-  private void unregisterAllJobs() throws MalformedObjectNameException {
+  private void unregisterAllJobs() {
     synchronized (_perTypeJobMonitorMap) {
       Iterator<Map.Entry<String, JobMonitor>> jobIter = _perTypeJobMonitorMap.entrySet().iterator();
       while (jobIter.hasNext()) {
         Map.Entry<String, JobMonitor> jobEntry = jobIter.next();
-        unregister(getObjectName(getJobBeanName(jobEntry.getKey())));
+        jobEntry.getValue().unregister();
         jobIter.remove();
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 39108cf..6589e96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -19,82 +19,82 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.helix.task.TaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-public class JobMonitor implements JobMonitorMBean {
+public class JobMonitor extends DynamicMBeanProvider {
 
   private static final String JOB_KEY = "Job";
   private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class);
   private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
 
+  // For registering dynamic metrics
+  private final ObjectName _initObjectName;
+
   private String _clusterName;
   private String _jobType;
-
-  private long _successfullJobCount;
-  private long _failedJobCount;
-  private long _abortedJobCount;
-  private long _existingJobGauge;
-  private long _queuedJobGauge;
-  private long _runningJobGauge;
-  private long _maximumJobLatencyGauge;
-  private long _jobLatencyCount;
   private long _lastResetTime;
 
-  public JobMonitor(String clusterName, String jobType) {
+  // Counters
+  private SimpleDynamicMetric<Long> _successfulJobCount;
+  private SimpleDynamicMetric<Long> _failedJobCount;
+  private SimpleDynamicMetric<Long> _abortedJobCount;
+
+  // Gauges
+  private SimpleDynamicMetric<Long> _existingJobGauge;
+  private SimpleDynamicMetric<Long> _queuedJobGauge;
+  private SimpleDynamicMetric<Long> _runningJobGauge;
+  @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
+  private SimpleDynamicMetric<Long> _maximumJobLatencyGauge;
+  @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
+  private SimpleDynamicMetric<Long> _jobLatencyCount;
+
+  // Histogram
+  private HistogramDynamicMetric _jobLatencyGauge;
+  private HistogramDynamicMetric _submissionToProcessDelayGauge;
+  private HistogramDynamicMetric _submissionToScheduleDelayGauge;
+  private HistogramDynamicMetric _controllerInducedDelayGauge;
+
+  public JobMonitor(String clusterName, String jobType, ObjectName objectName) {
     _clusterName = clusterName;
     _jobType = jobType;
-    _successfullJobCount = 0L;
-    _failedJobCount = 0L;
-    _abortedJobCount = 0L;
-    _existingJobGauge = 0L;
-    _queuedJobGauge = 0L;
-    _runningJobGauge = 0L;
+    _initObjectName = objectName;
     _lastResetTime = System.currentTimeMillis();
-    _jobLatencyCount = 0L;
-    _maximumJobLatencyGauge = 0L;
-  }
-
-  @Override
-  public long getSuccessfulJobCount() {
-    return _successfullJobCount;
-  }
-
-  @Override
-  public long getFailedJobCount() {
-    return _failedJobCount;
-  }
-
-  @Override
-  public long getAbortedJobCount() {
-    return _abortedJobCount;
-  }
-
-  @Override
-  public long getExistingJobGauge() {
-    return _existingJobGauge;
-  }
-
-  @Override
-  public long getQueuedJobGauge() {
-    return _queuedJobGauge;
-  }
-
-  @Override
-  public long getRunningJobGauge() {
-    return _runningJobGauge;
-  }
 
-  @Override
-  public long getMaximumJobLatencyGauge() {
-    return _maximumJobLatencyGauge;
-  }
-
-  @Override
-  public long getJobLatencyCount() {
-    return _jobLatencyCount;
+    // Instantiate simple dynamic metrics
+    _successfulJobCount = new SimpleDynamicMetric("SuccessfulJobCount", 0L);
+    _failedJobCount = new SimpleDynamicMetric("FailedJobCount", 0L);
+    _abortedJobCount = new SimpleDynamicMetric("AbortedJobCount", 0L);
+    _existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L);
+    _queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L);
+    _runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L);
+    _maximumJobLatencyGauge = new SimpleDynamicMetric("MaximumJobLatencyGauge", 0L);
+    _jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L);
+
+    // Instantiate histogram dynamic metrics
+    _jobLatencyGauge = new HistogramDynamicMetric("JobLatencyGauge", new Histogram(
+        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _submissionToProcessDelayGauge =
+        new HistogramDynamicMetric("SubmissionToProcessDelayGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _submissionToScheduleDelayGauge =
+        new HistogramDynamicMetric("SubmissionToScheduleDelayGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _controllerInducedDelayGauge =
+        new HistogramDynamicMetric("ControllerInducedDelayGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
   }
 
   @Override
@@ -110,38 +110,37 @@ public class JobMonitor implements JobMonitorMBean {
    * Update job counters with transition state
    * @param to The to state of job, cleaned by ZK when it is null
    */
-
-  public void updateJobCounters(TaskState to) {
-    updateJobCounters(to, 0);
+  public void updateJobMetricsWithLatency(TaskState to) {
+    updateJobMetricsWithLatency(to, 0);
   }
 
-  public void updateJobCounters(TaskState to, long latency) {
+  public void updateJobMetricsWithLatency(TaskState to, long latency) {
     // TODO maybe use separate TIMED_OUT counter later
     if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) {
-      _failedJobCount++;
+      incrementSimpleDynamicMetric(_failedJobCount);
     } else if (to.equals(TaskState.COMPLETED)) {
-      _successfullJobCount++;
-
+      incrementSimpleDynamicMetric(_successfulJobCount);
       // Only count succeeded jobs
-      _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency);
-      _jobLatencyCount += latency > 0 ? latency : 0;
+      _maximumJobLatencyGauge.updateValue(Math.max(_maximumJobLatencyGauge.getValue(), latency));
+      if (latency > 0) {
+        incrementSimpleDynamicMetric(_jobLatencyCount, latency);
+        _jobLatencyGauge.updateValue(latency);
+      }
     } else if (to.equals(TaskState.ABORTED)) {
-      _abortedJobCount++;
+      incrementSimpleDynamicMetric(_abortedJobCount);
     }
-
-
   }
 
   /**
    * Reset job gauges
    */
   public void resetJobGauge() {
-    _queuedJobGauge = 0L;
-    _existingJobGauge = 0L;
-    _runningJobGauge = 0L;
+    _queuedJobGauge.updateValue(0L);
+    _existingJobGauge.updateValue(0L);
+    _runningJobGauge.updateValue(0L);
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
       _lastResetTime = System.currentTimeMillis();
-      _maximumJobLatencyGauge = 0L;
+      _maximumJobLatencyGauge.updateValue(0L);
     }
   }
 
@@ -150,11 +149,76 @@ public class JobMonitor implements JobMonitorMBean {
    * @param to The current state of job
    */
   public void updateJobGauge(TaskState to) {
-    _existingJobGauge++;
+    incrementSimpleDynamicMetric(_existingJobGauge);
     if (to == null || to.equals(TaskState.NOT_STARTED)) {
-      _queuedJobGauge++;
+      incrementSimpleDynamicMetric(_queuedJobGauge);
     } else if (to.equals(TaskState.IN_PROGRESS)) {
-      _runningJobGauge++;
+      incrementSimpleDynamicMetric(_runningJobGauge);
     }
   }
+
+  /**
+   * Update SubmissionToProcessDelay to its corresponding HistogramDynamicMetric.
+   * @param delay
+   */
+  public void updateSubmissionToProcessDelayGauge(long delay) {
+    _submissionToProcessDelayGauge.updateValue(delay);
+  }
+
+  /**
+   * Update SubmissionToScheduleDelay to its corresponding HistogramDynamicMetric.
+   * @param delay
+   */
+  public void updateSubmissionToScheduleDelayGauge(long delay) {
+    _submissionToScheduleDelayGauge.updateValue(delay);
+  }
+
+  /**
+   * Update ControllerInducedDelay to its corresponding HistogramDynamicMetric.
+   * @param delay
+   */
+  public void updateControllerInducedDelayGauge(long delay) {
+    _controllerInducedDelayGauge.updateValue(delay);
+  }
+
+  /**
+   * This method registers the dynamic metrics.
+   * @return
+   * @throws JMException
+   */
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_successfulJobCount);
+    attributeList.add(_failedJobCount);
+    attributeList.add(_abortedJobCount);
+    attributeList.add(_existingJobGauge);
+    attributeList.add(_queuedJobGauge);
+    attributeList.add(_runningJobGauge);
+    attributeList.add(_maximumJobLatencyGauge);
+    attributeList.add(_jobLatencyCount);
+    attributeList.add(_jobLatencyGauge);
+    attributeList.add(_submissionToProcessDelayGauge);
+    attributeList.add(_submissionToScheduleDelayGauge);
+    attributeList.add(_controllerInducedDelayGauge);
+    doRegister(attributeList, _initObjectName);
+    return this;
+  }
+
+  /**
+   * NOTE: This method is not thread-safe nor atomic.
+   * Increment the value of a given SimpleDynamicMetric by 1.
+   */
+  private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) {
+    metric.updateValue(metric.getValue() + 1);
+  }
+
+  /**
+   * NOTE: This method is not thread-safe nor atomic.
+   * Increment the value of a given SimpleDynamicMetric by 1.
+   */
+  private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long
value) {
+    metric.updateValue(metric.getValue() + value);
+  }
+
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index a6345f6..4de8112 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -1,5 +1,24 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
@@ -11,10 +30,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -23,12 +45,14 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.JobMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class AbstractTaskDispatcher {
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTaskDispatcher.class);
+  private static final String TASK_LATENCY_TAG = "Latency";
 
   // For connection management
   protected HelixManager _manager;
@@ -470,12 +494,13 @@ public abstract class AbstractTaskDispatcher {
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
       Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String>
excludedInstances,
-      String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig
jobCfg,
-      WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
-      WorkflowControllerDataProvider cache, ResourceAssignment prevTaskToInstanceStateAssignment,
+      String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
+      final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
+      final WorkflowControllerDataProvider cache,
+      ResourceAssignment prevTaskToInstanceStateAssignment,
       Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment>
paMap,
       Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
-      Set<Integer> allPartitions, long currentTime, Collection<String> liveInstances)
{
+      Set<Integer> allPartitions, final long currentTime, Collection<String>
liveInstances) {
 
     // See if there was LiveInstance change and cache LiveInstances from this iteration of
pipeline
     boolean existsLiveInstanceOrCurrentStateChange =
@@ -602,7 +627,14 @@ public abstract class AbstractTaskDispatcher {
           excludeSet.add(pId);
           jobCtx.setAssignedParticipant(pId, instance);
           jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
-          jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
+          final long currentTimestamp = System.currentTimeMillis();
+          jobCtx.setPartitionStartTime(pId, currentTimestamp);
+          if (jobCtx.getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
+            // This means this is the very first task scheduled for this job
+            jobCtx.setExecutionStartTime(currentTimestamp);
+            reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, workflowConfig,
jobCfg,
+                currentTimestamp);
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
                 TaskPartitionState.RUNNING, instance));
@@ -792,12 +824,13 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-  protected void markJobComplete(String jobName, JobContext jobContext,
-      WorkflowConfig workflowConfig, WorkflowContext workflowContext,
-      Map<String, JobConfig> jobConfigMap, WorkflowControllerDataProvider dataProvider)
{
+  protected void markJobComplete(final String jobName, final JobContext jobContext,
+      final WorkflowConfig workflowConfig, WorkflowContext workflowContext,
+      final Map<String, JobConfig> jobConfigMap,
+      final WorkflowControllerDataProvider dataProvider) {
     finishJobInRuntimeJobDag(dataProvider.getTaskDataCache(), workflowConfig.getWorkflowId(),
         jobName);
-    long currentTime = System.currentTimeMillis();
+    final long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.COMPLETED);
     jobContext.setFinishTime(currentTime);
     if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, dataProvider))
{
@@ -805,6 +838,13 @@ public abstract class AbstractTaskDispatcher {
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
     scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+
+    // Job has completed successfully so report ControllerInducedDelay
+    JobConfig jobConfig = jobConfigMap.get(jobName);
+    if (jobConfig != null) {
+      reportControllerInducedDelay(dataProvider, _clusterStatusMonitor, workflowConfig, jobConfig,
+          currentTime);
+    }
   }
 
   protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
@@ -1175,4 +1215,105 @@ public abstract class AbstractTaskDispatcher {
               jobName));
     }
   }
+
+  /**
+   * TODO: Move this logic to Task Framework metrics class for refactoring.
+   * Computes and passes on submissionToProcessDelay to the dynamic metric.
+   * @param dataProvider
+   * @param clusterStatusMonitor
+   * @param workflowConfig
+   * @param jobConfig
+   * @param currentTimestamp
+   */
+  protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider,
+      final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+      final JobConfig jobConfig, final long currentTimestamp) {
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+      @Override
+      public Object call() {
+        // Asynchronously update the appropriate JobMonitor
+        JobMonitor jobMonitor = clusterStatusMonitor
+            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+        if (jobMonitor == null) {
+          return null;
+        }
+
+        // Compute SubmissionToProcessDelay
+        long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+        jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * TODO: Move this logic to Task Framework metrics class for refactoring.
+   * Computes and passes on submissionToScheduleDelay to the dynamic metric.
+   * @param dataProvider
+   * @param clusterStatusMonitor
+   * @param workflowConfig
+   * @param jobConfig
+   * @param currentTimestamp
+   */
+  private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider,
+      final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+      final JobConfig jobConfig, final long currentTimestamp) {
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+      @Override
+      public Object call() {
+        // Asynchronously update the appropriate JobMonitor
+        JobMonitor jobMonitor = clusterStatusMonitor
+            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+        if (jobMonitor == null) {
+          return null;
+        }
+
+        // Compute SubmissionToScheduleDelay
+        long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+        jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * TODO: Move this logic to Task Framework metrics class for refactoring.
+   * Computes and passes on controllerInducedDelay to the dynamic metric.
+   * @param dataProvider
+   * @param clusterStatusMonitor
+   * @param workflowConfig
+   * @param jobConfig
+   * @param currentTimestamp
+   */
+  private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider,
+      final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+      final JobConfig jobConfig, final long currentTimestamp) {
+    AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+      @Override
+      public Object call() {
+        // Asynchronously update the appropriate JobMonitor
+        JobMonitor jobMonitor = clusterStatusMonitor
+            .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+        if (jobMonitor == null) {
+          return null;
+        }
+
+        // Compute ControllerInducedDelay only if the workload is a test load
+        // NOTE: this metric cannot be computed for general user-submitted workloads because
+        // the actual runtime of the tasks vary, and there could exist multiple tasks per
+        // job
+        // NOTE: a test workload will have the "latency" field in the mapField of the
+        // JobConfig (taskConfig)
+        String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
+        if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG))
{
+          long taskDuration =
+              Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
+          long controllerInducedDelay =
+              currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
+          jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
+        }
+        return null;
+      }
+    });
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 2d878fb..c84f660 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
  */
 public class JobContext extends HelixProperty {
   private enum ContextProperties {
-    START_TIME,
+    START_TIME, // Time at which this JobContext was created
     STATE,
     NUM_ATTEMPTS,
     FINISH_TIME,
@@ -47,7 +47,8 @@ public class JobContext extends HelixProperty {
     ASSIGNED_PARTICIPANT,
     NEXT_RETRY_TIME,
     INFO,
-    NAME
+    NAME,
+    EXECUTION_START_TIME, // Time at which the first task of this job got scheduled
   }
 
   public JobContext(ZNRecord record) {
@@ -61,7 +62,7 @@ public class JobContext extends HelixProperty {
   public long getStartTime() {
     String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
     if (tStr == null) {
-      return WorkflowContext.UNSTARTED;
+      return WorkflowContext.NOT_STARTED;
     }
     return Long.parseLong(tStr);
   }
@@ -141,11 +142,11 @@ public class JobContext extends HelixProperty {
   public long getPartitionStartTime(int p) {
     Map<String, String> map = getMapField(p);
     if (map == null) {
-      return WorkflowContext.UNSTARTED;
+      return WorkflowContext.NOT_STARTED;
     }
     String tStr = map.get(ContextProperties.START_TIME.toString());
     if (tStr == null) {
-      return WorkflowContext.UNSTARTED;
+      return WorkflowContext.NOT_STARTED;
     }
     return Long.parseLong(tStr);
   }
@@ -273,8 +274,30 @@ public class JobContext extends HelixProperty {
   }
 
   /**
+   * Only set the execution start time when it hasn't already been set.
+   * NOTE: This method is not thread-safe. However, it is okay because even if this does
get written
+   * twice due to a race condition, that means the timestamps will be close enough to get
a fairly
+   * good estimate for the execution start time. We do not want to affect the task status
update
+   * performance and ultimately, this execution start time is an estimate in and of itself
anyways.
+   * @param t
+   */
+  public void setExecutionStartTime(long t) {
+    String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
+    if (tStr == null) {
+      _record.setSimpleField(ContextProperties.EXECUTION_START_TIME.toString(), String.valueOf(t));
+    }
+  }
+
+  public long getExecutionStartTime() {
+    String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
+    if (tStr == null) {
+      return WorkflowContext.NOT_STARTED;
+    }
+    return Long.parseLong(tStr);
+  }
+
+  /**
    * Get MapField for the given partition.
-   *
    * @param p
    * @return mapField for the partition, NULL if the partition has not scheduled yet.
    */
@@ -286,7 +309,7 @@ public class JobContext extends HelixProperty {
     String pStr = String.valueOf(p);
     Map<String, String> map = _record.getMapField(pStr);
     if (map == null && createIfNotPresent) {
-      map = new TreeMap<String, String>();
+      map = new TreeMap<>();
       _record.setMapField(pStr, map);
     }
     return map;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 6530f3a..56e04ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -29,8 +29,10 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
@@ -54,7 +56,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
   public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
       CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
     // Fetch job configuration
-    JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
+    final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
     if (jobCfg == null) {
       LOG.error("Job configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -62,7 +64,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
-    WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
+    final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
     if (workflowCfg == null) {
       LOG.error("Workflow configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
@@ -113,13 +115,19 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     JobContext jobCtx = _dataProvider.getJobContext(jobName);
     if (jobCtx == null) {
       jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
-      jobCtx.setStartTime(System.currentTimeMillis());
+      final long currentTimestamp = System.currentTimeMillis();
+      jobCtx.setStartTime(currentTimestamp);
       jobCtx.setName(jobName);
       // This job's JobContext has not been created yet. Since we are creating a new JobContext
       // here, we must also create its UserContentStore
       TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
           new ZNRecord(TaskUtil.USER_CONTENT_NODE));
       workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
+
+      // Since this job has been processed for the first time, we report SubmissionToProcessDelay
+      // here asynchronously
+      reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg,
+          currentTimestamp);
     }
 
     if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 0e91648..c915e3b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -50,7 +50,7 @@ public class WorkflowContext extends HelixProperty {
     NAME
     }
 
-  public static final int UNSTARTED = -1;
+  public static final int NOT_STARTED = -1;
   public static final int UNFINISHED = -1;
 
   public WorkflowContext(ZNRecord record) {
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
new file mode 100644
index 0000000..2792488
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
@@ -0,0 +1,151 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.Query;
+import javax.management.QueryExp;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskSynchronizedTestBase;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests that performance profiling metrics via JobMonitorMBean are computed correctly.
+ */
+public class TestTaskPerformanceMetrics extends TaskSynchronizedTestBase {
+  private static final long TASK_LATENCY = 100L;
+  // Configurable values for test setup
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+  private Map<String, Object> _beanValueMap = new HashMap<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  /**
+   * Test the following metrics are dynamically emitted:
+   * SubmissionToStartDelay
+   * ControllerInducedDelay
+   * The test schedules a workflow with 30 jobs, each with one task with TASK_LATENCY.
+   * AllowOverlapJobAssignment is false, so these jobs will be run in series, one at a time.
+   * With this setup, we can assume that the mean value of the metrics above will increase
every
+   * time we poll at some interval greater than TASK_LATENCY.
+   * @throws Exception
+   */
+  @Test
+  public void testTaskPerformanceMetrics() throws Exception {
+    // Create a workflow
+    JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+    TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    TaskConfig taskConfig = taskConfigBuilder.setTaskId("1").setCommand("Reindex").build();
+    taskConfig.getConfigMap().put("Latency", Long.toString(TASK_LATENCY));
+    taskConfigs.add(taskConfig);
+    jobConfigBuilder.addTaskConfigs(taskConfigs)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, Long.toString(TASK_LATENCY)));
+    Workflow.Builder workflowBuilder = new Workflow.Builder("wf");
+    for (int i = 0; i < 30; i++) {
+      workflowBuilder.addJob("job_" + i, jobConfigBuilder);
+    }
+    Workflow workflow = workflowBuilder.build();
+
+    // Start the controller and start the workflow
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME);
+    _controller.syncStart();
+    _driver.start(workflow);
+
+    // Confirm that there are metrics computed dynamically here and keeps increasing because
jobs
+    // are processed one by one
+    double oldSubmissionToStartDelay = -1L;
+    double oldControllerInducedDelay = -1L;
+    for (int i = 0; i < 5; i++) {
+      // The dynamic metrics should generally be updated within 2 seconds or it would be
too slow
+      Thread.sleep(2000L);
+
+      extractMetrics();
+
+      // For SubmissionToProcessDelay, the value will stay constant because the Controller
will
+      // create JobContext right away most of the time
+      Assert.assertTrue(_beanValueMap.containsKey("SubmissionToProcessDelayGauge.Mean"));
+      Assert.assertTrue(_beanValueMap.containsKey("SubmissionToScheduleDelayGauge.Mean"));
+      Assert.assertTrue(_beanValueMap.containsKey("ControllerInducedDelayGauge.Mean"));
+
+      // Get the new values
+      double submissionToProcessDelay =
+          (double) _beanValueMap.get("SubmissionToProcessDelayGauge.Mean");
+      double newSubmissionToScheduleDelay =
+          (double) _beanValueMap.get("SubmissionToScheduleDelayGauge.Mean");
+      double newControllerInducedDelay =
+          (double) _beanValueMap.get("ControllerInducedDelayGauge.Mean");
+
+      Assert.assertTrue(submissionToProcessDelay > 0);
+      Assert.assertTrue(oldSubmissionToStartDelay < newSubmissionToScheduleDelay);
+      Assert.assertTrue(oldControllerInducedDelay < newControllerInducedDelay);
+
+      oldSubmissionToStartDelay = newSubmissionToScheduleDelay;
+      oldControllerInducedDelay = newControllerInducedDelay;
+    }
+  }
+
+  /**
+   * Queries for all MBeans from the MBean Server and only looks at the relevant MBean and
gets its
+   * metric numbers.
+   */
+  private void extractMetrics() {
+    try {
+      QueryExp exp = Query.match(Query.attr("SensorName"), Query.value("*"));
+      Set<ObjectInstance> mbeans = new HashSet<>(
+          ManagementFactory.getPlatformMBeanServer().queryMBeans(new ObjectName(""), exp));
+      for (ObjectInstance instance : mbeans) {
+        ObjectName beanName = instance.getObjectName();
+        if (instance.getClassName().contains("JobMonitor")) {
+          MBeanInfo info = _server.getMBeanInfo(beanName);
+          MBeanAttributeInfo[] infos = info.getAttributes();
+          for (MBeanAttributeInfo infoItem : infos) {
+            Object val = _server.getAttribute(beanName, infoItem.getName());
+            _beanValueMap.put(infoItem.getName(), val);
+          }
+        }
+      }
+    } catch (Exception e) {
+      // update failed
+    }
+  }
+}


Mime
View raw message