This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8ba4c9c19981fffe3c958c6851e2b8b8bf90bfbb
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:30:09 2019 -0700
TASK2.0: Add performance metrics to JobMonitor
We want to add more metrics to Task Framework so that the user could understand what's
going on in case of a slowdown, or get a general sense of how fast the workload is moving.
Changelist:
1. Add SubmissionToProcessDelay
2. Add SubmissionToScheduleDelay
3. Add ControllerInducedDelay (for testing)
4. Add JobLatencyGauge
5. Change regular metrics to Dynamic metrics in JobMonitor
6. Add an integration test: TestTaskPerformanceMetrics
---
.../monitoring/mbeans/ClusterStatusMonitor.java | 35 ++--
.../apache/helix/monitoring/mbeans/JobMonitor.java | 220 +++++++++++++--------
.../apache/helix/task/AbstractTaskDispatcher.java | 159 ++++++++++++++-
.../java/org/apache/helix/task/JobContext.java | 37 +++-
.../java/org/apache/helix/task/JobDispatcher.java | 14 +-
.../org/apache/helix/task/WorkflowContext.java | 2 +-
.../mbeans/TestTaskPerformanceMetrics.java | 151 ++++++++++++++
7 files changed, 508 insertions(+), 110 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 4616f9e..1d778ee 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -641,10 +641,22 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
jobType = preProcessJobMonitor(jobType);
JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
if (jobMonitor != null) {
- jobMonitor.updateJobCounters(to, latency);
+ jobMonitor.updateJobMetricsWithLatency(to, latency);
}
}
+ /**
+ * TODO: Separate Workflow/Job Monitors from ClusterStatusMonitor because ClusterStatusMonitor
is
+ * getting too big.
+ * Returns the appropriate JobMonitor for the given type. If it does not exist, create
one and
+ * return it.
+ * @param jobType
+ * @return
+ */
+ public JobMonitor getJobMonitor(String jobType) {
+ return _perTypeJobMonitorMap.get(preProcessJobMonitor(jobType));
+ }
+
private void updateJobGauges(String jobType, TaskState current) {
// When first time for WorkflowRebalancer call, jobconfig may not ready.
// Thus only check it for gauge.
@@ -662,13 +674,17 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
synchronized (_perTypeJobMonitorMap) {
if (!_perTypeJobMonitorMap.containsKey(jobType)) {
- JobMonitor monitor = new JobMonitor(_clusterName, jobType);
+ String jobMonitorBeanName = getJobBeanName(jobType);
+ JobMonitor monitor = null;
try {
- registerJob(monitor);
- } catch (MalformedObjectNameException e) {
+ monitor = new JobMonitor(_clusterName, jobType, getObjectName(jobMonitorBeanName));
+ monitor.register(); // Necessary for dynamic metrics
+ } catch (Exception e) {
LOG.error("Failed to register job type : " + jobType, e);
}
- _perTypeJobMonitorMap.put(jobType, monitor);
+ if (monitor != null) {
+ _perTypeJobMonitorMap.put(jobType, monitor);
+ }
}
}
return jobType;
@@ -789,17 +805,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
}
}
- private void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException {
- String jobBeanName = getJobBeanName(jobMonitor.getJobType());
- register(jobMonitor, getObjectName(jobBeanName));
- }
-
- private void unregisterAllJobs() throws MalformedObjectNameException {
+ private void unregisterAllJobs() {
synchronized (_perTypeJobMonitorMap) {
Iterator<Map.Entry<String, JobMonitor>> jobIter = _perTypeJobMonitorMap.entrySet().iterator();
while (jobIter.hasNext()) {
Map.Entry<String, JobMonitor> jobEntry = jobIter.next();
- unregister(getObjectName(getJobBeanName(jobEntry.getKey())));
+ jobEntry.getValue().unregister();
jobIter.remove();
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 39108cf..6589e96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -19,82 +19,82 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
import org.apache.helix.task.TaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-public class JobMonitor implements JobMonitorMBean {
+public class JobMonitor extends DynamicMBeanProvider {
private static final String JOB_KEY = "Job";
private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class);
private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
+ // For registering dynamic metrics
+ private final ObjectName _initObjectName;
+
private String _clusterName;
private String _jobType;
-
- private long _successfullJobCount;
- private long _failedJobCount;
- private long _abortedJobCount;
- private long _existingJobGauge;
- private long _queuedJobGauge;
- private long _runningJobGauge;
- private long _maximumJobLatencyGauge;
- private long _jobLatencyCount;
private long _lastResetTime;
- public JobMonitor(String clusterName, String jobType) {
+ // Counters
+ private SimpleDynamicMetric<Long> _successfulJobCount;
+ private SimpleDynamicMetric<Long> _failedJobCount;
+ private SimpleDynamicMetric<Long> _abortedJobCount;
+
+ // Gauges
+ private SimpleDynamicMetric<Long> _existingJobGauge;
+ private SimpleDynamicMetric<Long> _queuedJobGauge;
+ private SimpleDynamicMetric<Long> _runningJobGauge;
+ @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
+ private SimpleDynamicMetric<Long> _maximumJobLatencyGauge;
+ @Deprecated // To be removed (replaced by jobLatencyGauge Histogram)
+ private SimpleDynamicMetric<Long> _jobLatencyCount;
+
+ // Histogram
+ private HistogramDynamicMetric _jobLatencyGauge;
+ private HistogramDynamicMetric _submissionToProcessDelayGauge;
+ private HistogramDynamicMetric _submissionToScheduleDelayGauge;
+ private HistogramDynamicMetric _controllerInducedDelayGauge;
+
+ public JobMonitor(String clusterName, String jobType, ObjectName objectName) {
_clusterName = clusterName;
_jobType = jobType;
- _successfullJobCount = 0L;
- _failedJobCount = 0L;
- _abortedJobCount = 0L;
- _existingJobGauge = 0L;
- _queuedJobGauge = 0L;
- _runningJobGauge = 0L;
+ _initObjectName = objectName;
_lastResetTime = System.currentTimeMillis();
- _jobLatencyCount = 0L;
- _maximumJobLatencyGauge = 0L;
- }
-
- @Override
- public long getSuccessfulJobCount() {
- return _successfullJobCount;
- }
-
- @Override
- public long getFailedJobCount() {
- return _failedJobCount;
- }
-
- @Override
- public long getAbortedJobCount() {
- return _abortedJobCount;
- }
-
- @Override
- public long getExistingJobGauge() {
- return _existingJobGauge;
- }
-
- @Override
- public long getQueuedJobGauge() {
- return _queuedJobGauge;
- }
-
- @Override
- public long getRunningJobGauge() {
- return _runningJobGauge;
- }
- @Override
- public long getMaximumJobLatencyGauge() {
- return _maximumJobLatencyGauge;
- }
-
- @Override
- public long getJobLatencyCount() {
- return _jobLatencyCount;
+ // Instantiate simple dynamic metrics
+ _successfulJobCount = new SimpleDynamicMetric("SuccessfulJobCount", 0L);
+ _failedJobCount = new SimpleDynamicMetric("FailedJobCount", 0L);
+ _abortedJobCount = new SimpleDynamicMetric("AbortedJobCount", 0L);
+ _existingJobGauge = new SimpleDynamicMetric("ExistingJobGauge", 0L);
+ _queuedJobGauge = new SimpleDynamicMetric("QueuedJobGauge", 0L);
+ _runningJobGauge = new SimpleDynamicMetric("RunningJobGauge", 0L);
+ _maximumJobLatencyGauge = new SimpleDynamicMetric("MaximumJobLatencyGauge", 0L);
+ _jobLatencyCount = new SimpleDynamicMetric("JobLatencyCount", 0L);
+
+ // Instantiate histogram dynamic metrics
+ _jobLatencyGauge = new HistogramDynamicMetric("JobLatencyGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _submissionToProcessDelayGauge =
+ new HistogramDynamicMetric("SubmissionToProcessDelayGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _submissionToScheduleDelayGauge =
+ new HistogramDynamicMetric("SubmissionToScheduleDelayGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _controllerInducedDelayGauge =
+ new HistogramDynamicMetric("ControllerInducedDelayGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
}
@Override
@@ -110,38 +110,37 @@ public class JobMonitor implements JobMonitorMBean {
* Update job counters with transition state
* @param to The to state of job, cleaned by ZK when it is null
*/
-
- public void updateJobCounters(TaskState to) {
- updateJobCounters(to, 0);
+ public void updateJobMetricsWithLatency(TaskState to) {
+ updateJobMetricsWithLatency(to, 0);
}
- public void updateJobCounters(TaskState to, long latency) {
+ public void updateJobMetricsWithLatency(TaskState to, long latency) {
// TODO maybe use separate TIMED_OUT counter later
if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) {
- _failedJobCount++;
+ incrementSimpleDynamicMetric(_failedJobCount);
} else if (to.equals(TaskState.COMPLETED)) {
- _successfullJobCount++;
-
+ incrementSimpleDynamicMetric(_successfulJobCount);
// Only count succeeded jobs
- _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency);
- _jobLatencyCount += latency > 0 ? latency : 0;
+ _maximumJobLatencyGauge.updateValue(Math.max(_maximumJobLatencyGauge.getValue(), latency));
+ if (latency > 0) {
+ incrementSimpleDynamicMetric(_jobLatencyCount, latency);
+ _jobLatencyGauge.updateValue(latency);
+ }
} else if (to.equals(TaskState.ABORTED)) {
- _abortedJobCount++;
+ incrementSimpleDynamicMetric(_abortedJobCount);
}
-
-
}
/**
* Reset job gauges
*/
public void resetJobGauge() {
- _queuedJobGauge = 0L;
- _existingJobGauge = 0L;
- _runningJobGauge = 0L;
+ _queuedJobGauge.updateValue(0L);
+ _existingJobGauge.updateValue(0L);
+ _runningJobGauge.updateValue(0L);
if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
_lastResetTime = System.currentTimeMillis();
- _maximumJobLatencyGauge = 0L;
+ _maximumJobLatencyGauge.updateValue(0L);
}
}
@@ -150,11 +149,76 @@ public class JobMonitor implements JobMonitorMBean {
* @param to The current state of job
*/
public void updateJobGauge(TaskState to) {
- _existingJobGauge++;
+ incrementSimpleDynamicMetric(_existingJobGauge);
if (to == null || to.equals(TaskState.NOT_STARTED)) {
- _queuedJobGauge++;
+ incrementSimpleDynamicMetric(_queuedJobGauge);
} else if (to.equals(TaskState.IN_PROGRESS)) {
- _runningJobGauge++;
+ incrementSimpleDynamicMetric(_runningJobGauge);
}
}
+
+ /**
+ * Update SubmissionToProcessDelay to its corresponding HistogramDynamicMetric.
+ * @param delay
+ */
+ public void updateSubmissionToProcessDelayGauge(long delay) {
+ _submissionToProcessDelayGauge.updateValue(delay);
+ }
+
+ /**
+ * Update SubmissionToScheduleDelay to its corresponding HistogramDynamicMetric.
+ * @param delay
+ */
+ public void updateSubmissionToScheduleDelayGauge(long delay) {
+ _submissionToScheduleDelayGauge.updateValue(delay);
+ }
+
+ /**
+ * Update ControllerInducedDelay to its corresponding HistogramDynamicMetric.
+ * @param delay
+ */
+ public void updateControllerInducedDelayGauge(long delay) {
+ _controllerInducedDelayGauge.updateValue(delay);
+ }
+
+ /**
+ * This method registers the dynamic metrics.
+ * @return
+ * @throws JMException
+ */
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_successfulJobCount);
+ attributeList.add(_failedJobCount);
+ attributeList.add(_abortedJobCount);
+ attributeList.add(_existingJobGauge);
+ attributeList.add(_queuedJobGauge);
+ attributeList.add(_runningJobGauge);
+ attributeList.add(_maximumJobLatencyGauge);
+ attributeList.add(_jobLatencyCount);
+ attributeList.add(_jobLatencyGauge);
+ attributeList.add(_submissionToProcessDelayGauge);
+ attributeList.add(_submissionToScheduleDelayGauge);
+ attributeList.add(_controllerInducedDelayGauge);
+ doRegister(attributeList, _initObjectName);
+ return this;
+ }
+
+ /**
+ * NOTE: This method is not thread-safe nor atomic.
+ * Increment the value of a given SimpleDynamicMetric by 1.
+ */
+ private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric) {
+ metric.updateValue(metric.getValue() + 1);
+ }
+
+ /**
+ * NOTE: This method is not thread-safe nor atomic.
+ * Increment the value of a given SimpleDynamicMetric by 1.
+ */
+ private void incrementSimpleDynamicMetric(SimpleDynamicMetric<Long> metric, long
value) {
+ metric.updateValue(metric.getValue() + value);
+ }
+
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index a6345f6..4de8112 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -1,5 +1,24 @@
package org.apache.helix.task;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
@@ -11,10 +30,13 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -23,12 +45,14 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.JobMonitor;
import org.apache.helix.task.assigner.AssignableInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractTaskDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTaskDispatcher.class);
+ private static final String TASK_LATENCY_TAG = "Latency";
// For connection management
protected HelixManager _manager;
@@ -470,12 +494,13 @@ public abstract class AbstractTaskDispatcher {
// This is the actual assigning part
protected void handleAdditionalTaskAssignment(
Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String>
excludedInstances,
- String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig
jobCfg,
- WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
- WorkflowControllerDataProvider cache, ResourceAssignment prevTaskToInstanceStateAssignment,
+ String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
+ final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
+ final WorkflowControllerDataProvider cache,
+ ResourceAssignment prevTaskToInstanceStateAssignment,
Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment>
paMap,
Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
- Set<Integer> allPartitions, long currentTime, Collection<String> liveInstances)
{
+ Set<Integer> allPartitions, final long currentTime, Collection<String>
liveInstances) {
// See if there was LiveInstance change and cache LiveInstances from this iteration of
pipeline
boolean existsLiveInstanceOrCurrentStateChange =
@@ -602,7 +627,14 @@ public abstract class AbstractTaskDispatcher {
excludeSet.add(pId);
jobCtx.setAssignedParticipant(pId, instance);
jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
- jobCtx.setPartitionStartTime(pId, System.currentTimeMillis());
+ final long currentTimestamp = System.currentTimeMillis();
+ jobCtx.setPartitionStartTime(pId, currentTimestamp);
+ if (jobCtx.getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
+ // This means this is the very first task scheduled for this job
+ jobCtx.setExecutionStartTime(currentTimestamp);
+ reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, workflowConfig,
jobCfg,
+ currentTimestamp);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
TaskPartitionState.RUNNING, instance));
@@ -792,12 +824,13 @@ public abstract class AbstractTaskDispatcher {
}
}
- protected void markJobComplete(String jobName, JobContext jobContext,
- WorkflowConfig workflowConfig, WorkflowContext workflowContext,
- Map<String, JobConfig> jobConfigMap, WorkflowControllerDataProvider dataProvider)
{
+ protected void markJobComplete(final String jobName, final JobContext jobContext,
+ final WorkflowConfig workflowConfig, WorkflowContext workflowContext,
+ final Map<String, JobConfig> jobConfigMap,
+ final WorkflowControllerDataProvider dataProvider) {
finishJobInRuntimeJobDag(dataProvider.getTaskDataCache(), workflowConfig.getWorkflowId(),
jobName);
- long currentTime = System.currentTimeMillis();
+ final long currentTime = System.currentTimeMillis();
workflowContext.setJobState(jobName, TaskState.COMPLETED);
jobContext.setFinishTime(currentTime);
if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, dataProvider))
{
@@ -805,6 +838,13 @@ public abstract class AbstractTaskDispatcher {
updateWorkflowMonitor(workflowContext, workflowConfig);
}
scheduleJobCleanUp(jobConfigMap.get(jobName), workflowConfig, currentTime);
+
+ // Job has completed successfully so report ControllerInducedDelay
+ JobConfig jobConfig = jobConfigMap.get(jobName);
+ if (jobConfig != null) {
+ reportControllerInducedDelay(dataProvider, _clusterStatusMonitor, workflowConfig, jobConfig,
+ currentTime);
+ }
}
protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
@@ -1175,4 +1215,105 @@ public abstract class AbstractTaskDispatcher {
jobName));
}
}
+
+ /**
+ * TODO: Move this logic to Task Framework metrics class for refactoring.
+ * Computes and passes on submissionToProcessDelay to the dynamic metric.
+ * @param dataProvider
+ * @param clusterStatusMonitor
+ * @param workflowConfig
+ * @param jobConfig
+ * @param currentTimestamp
+ */
+ protected static void reportSubmissionToProcessDelay(BaseControllerDataProvider dataProvider,
+ final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+ final JobConfig jobConfig, final long currentTimestamp) {
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+ @Override
+ public Object call() {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
+ return null;
+ }
+
+ // Compute SubmissionToProcessDelay
+ long submissionToProcessDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+ jobMonitor.updateSubmissionToProcessDelayGauge(submissionToProcessDelay);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * TODO: Move this logic to Task Framework metrics class for refactoring.
+ * Computes and passes on submissionToScheduleDelay to the dynamic metric.
+ * @param dataProvider
+ * @param clusterStatusMonitor
+ * @param workflowConfig
+ * @param jobConfig
+ * @param currentTimestamp
+ */
+ private static void reportSubmissionToScheduleDelay(BaseControllerDataProvider dataProvider,
+ final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+ final JobConfig jobConfig, final long currentTimestamp) {
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+ @Override
+ public Object call() {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
+ return null;
+ }
+
+ // Compute SubmissionToScheduleDelay
+ long submissionToStartDelay = currentTimestamp - jobConfig.getStat().getCreationTime();
+ jobMonitor.updateSubmissionToScheduleDelayGauge(submissionToStartDelay);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * TODO: Move this logic to Task Framework metrics class for refactoring.
+ * Computes and passes on controllerInducedDelay to the dynamic metric.
+ * @param dataProvider
+ * @param clusterStatusMonitor
+ * @param workflowConfig
+ * @param jobConfig
+ * @param currentTimestamp
+ */
+ private static void reportControllerInducedDelay(BaseControllerDataProvider dataProvider,
+ final ClusterStatusMonitor clusterStatusMonitor, final WorkflowConfig workflowConfig,
+ final JobConfig jobConfig, final long currentTimestamp) {
+ AbstractBaseStage.asyncExecute(dataProvider.getAsyncTasksThreadPool(), new Callable<Object>()
{
+ @Override
+ public Object call() {
+ // Asynchronously update the appropriate JobMonitor
+ JobMonitor jobMonitor = clusterStatusMonitor
+ .getJobMonitor(TaskAssignmentCalculator.getQuotaType(workflowConfig, jobConfig));
+ if (jobMonitor == null) {
+ return null;
+ }
+
+ // Compute ControllerInducedDelay only if the workload is a test load
+ // NOTE: this metric cannot be computed for general user-submitted workloads because
+ // the actual runtime of the tasks vary, and there could exist multiple tasks per
+ // job
+ // NOTE: a test workload will have the "latency" field in the mapField of the
+ // JobConfig (taskConfig)
+ String firstTask = jobConfig.getTaskConfigMap().keySet().iterator().next();
+ if (jobConfig.getTaskConfig(firstTask).getConfigMap().containsKey(TASK_LATENCY_TAG))
{
+ long taskDuration =
+ Long.valueOf(jobConfig.getTaskConfig(firstTask).getConfigMap().get(TASK_LATENCY_TAG));
+ long controllerInducedDelay =
+ currentTimestamp - jobConfig.getStat().getCreationTime() - taskDuration;
+ jobMonitor.updateControllerInducedDelayGauge(controllerInducedDelay);
+ }
+ return null;
+ }
+ });
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 2d878fb..c84f660 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
*/
public class JobContext extends HelixProperty {
private enum ContextProperties {
- START_TIME,
+ START_TIME, // Time at which this JobContext was created
STATE,
NUM_ATTEMPTS,
FINISH_TIME,
@@ -47,7 +47,8 @@ public class JobContext extends HelixProperty {
ASSIGNED_PARTICIPANT,
NEXT_RETRY_TIME,
INFO,
- NAME
+ NAME,
+ EXECUTION_START_TIME, // Time at which the first task of this job got scheduled
}
public JobContext(ZNRecord record) {
@@ -61,7 +62,7 @@ public class JobContext extends HelixProperty {
public long getStartTime() {
String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString());
if (tStr == null) {
- return WorkflowContext.UNSTARTED;
+ return WorkflowContext.NOT_STARTED;
}
return Long.parseLong(tStr);
}
@@ -141,11 +142,11 @@ public class JobContext extends HelixProperty {
public long getPartitionStartTime(int p) {
Map<String, String> map = getMapField(p);
if (map == null) {
- return WorkflowContext.UNSTARTED;
+ return WorkflowContext.NOT_STARTED;
}
String tStr = map.get(ContextProperties.START_TIME.toString());
if (tStr == null) {
- return WorkflowContext.UNSTARTED;
+ return WorkflowContext.NOT_STARTED;
}
return Long.parseLong(tStr);
}
@@ -273,8 +274,30 @@ public class JobContext extends HelixProperty {
}
/**
+ * Only set the execution start time when it hasn't already been set.
+ * NOTE: This method is not thread-safe. However, it is okay because even if this does
get written
+ * twice due to a race condition, that means the timestamps will be close enough to get
a fairly
+ * good estimate for the execution start time. We do not want to affect the task status
update
+ * performance and ultimately, this execution start time is an estimate in and of itself
anyways.
+ * @param t
+ */
+ public void setExecutionStartTime(long t) {
+ String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
+ if (tStr == null) {
+ _record.setSimpleField(ContextProperties.EXECUTION_START_TIME.toString(), String.valueOf(t));
+ }
+ }
+
+ public long getExecutionStartTime() {
+ String tStr = _record.getSimpleField(ContextProperties.EXECUTION_START_TIME.toString());
+ if (tStr == null) {
+ return WorkflowContext.NOT_STARTED;
+ }
+ return Long.parseLong(tStr);
+ }
+
+ /**
* Get MapField for the given partition.
- *
* @param p
* @return mapField for the partition, NULL if the partition has not scheduled yet.
*/
@@ -286,7 +309,7 @@ public class JobContext extends HelixProperty {
String pStr = String.valueOf(p);
Map<String, String> map = _record.getMapField(pStr);
if (map == null && createIfNotPresent) {
- map = new TreeMap<String, String>();
+ map = new TreeMap<>();
_record.setMapField(pStr, map);
}
return map;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 6530f3a..56e04ac 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -29,8 +29,10 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
@@ -54,7 +56,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
// Fetch job configuration
- JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
+ final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
if (jobCfg == null) {
LOG.error("Job configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -62,7 +64,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
- WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
+ final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
if (workflowCfg == null) {
LOG.error("Workflow configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
@@ -113,13 +115,19 @@ public class JobDispatcher extends AbstractTaskDispatcher {
JobContext jobCtx = _dataProvider.getJobContext(jobName);
if (jobCtx == null) {
jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
- jobCtx.setStartTime(System.currentTimeMillis());
+ final long currentTimestamp = System.currentTimeMillis();
+ jobCtx.setStartTime(currentTimestamp);
jobCtx.setName(jobName);
// This job's JobContext has not been created yet. Since we are creating a new JobContext
// here, we must also create its UserContentStore
TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
new ZNRecord(TaskUtil.USER_CONTENT_NODE));
workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
+
+ // Since this job has been processed for the first time, we report SubmissionToProcessDelay
+ // here asynchronously
+ reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg,
+ currentTimestamp);
}
if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 0e91648..c915e3b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -50,7 +50,7 @@ public class WorkflowContext extends HelixProperty {
NAME
}
- public static final int UNSTARTED = -1;
+ public static final int NOT_STARTED = -1;
public static final int UNFINISHED = -1;
public WorkflowContext(ZNRecord record) {
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
new file mode 100644
index 0000000..2792488
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTaskPerformanceMetrics.java
@@ -0,0 +1,151 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.Query;
+import javax.management.QueryExp;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskSynchronizedTestBase;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests that performance profiling metrics via JobMonitorMBean are computed correctly.
+ */
+public class TestTaskPerformanceMetrics extends TaskSynchronizedTestBase {
+ private static final long TASK_LATENCY = 100L;
+ // Configurable values for test setup
+ private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+ private Map<String, Object> _beanValueMap = new HashMap<>();
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ setSingleTestEnvironment();
+ super.beforeClass();
+ }
+
+ /**
+ * Test the following metrics are dynamically emitted:
+ * SubmissionToStartDelay
+ * ControllerInducedDelay
+ * The test schedules a workflow with 30 jobs, each with one task with TASK_LATENCY.
+ * AllowOverlapJobAssignment is false, so these jobs will be run in series, one at a time.
+ * With this setup, we can assume that the mean value of the metrics above will increase
every
+ * time we poll at some interval greater than TASK_LATENCY.
+ * @throws Exception
+ */
+ @Test
+ public void testTaskPerformanceMetrics() throws Exception {
+ // Create a workflow
+ JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
+ TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ TaskConfig taskConfig = taskConfigBuilder.setTaskId("1").setCommand("Reindex").build();
+ taskConfig.getConfigMap().put("Latency", Long.toString(TASK_LATENCY));
+ taskConfigs.add(taskConfig);
+ jobConfigBuilder.addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, Long.toString(TASK_LATENCY)));
+ Workflow.Builder workflowBuilder = new Workflow.Builder("wf");
+ for (int i = 0; i < 30; i++) {
+ workflowBuilder.addJob("job_" + i, jobConfigBuilder);
+ }
+ Workflow workflow = workflowBuilder.build();
+
+ // Start the controller and start the workflow
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME);
+ _controller.syncStart();
+ _driver.start(workflow);
+
+ // Confirm that there are metrics computed dynamically here and keeps increasing because
jobs
+ // are processed one by one
+ double oldSubmissionToStartDelay = -1L;
+ double oldControllerInducedDelay = -1L;
+ for (int i = 0; i < 5; i++) {
+ // The dynamic metrics should generally be updated within 2 seconds or it would be
too slow
+ Thread.sleep(2000L);
+
+ extractMetrics();
+
+ // For SubmissionToProcessDelay, the value will stay constant because the Controller
will
+ // create JobContext right away most of the time
+ Assert.assertTrue(_beanValueMap.containsKey("SubmissionToProcessDelayGauge.Mean"));
+ Assert.assertTrue(_beanValueMap.containsKey("SubmissionToScheduleDelayGauge.Mean"));
+ Assert.assertTrue(_beanValueMap.containsKey("ControllerInducedDelayGauge.Mean"));
+
+ // Get the new values
+ double submissionToProcessDelay =
+ (double) _beanValueMap.get("SubmissionToProcessDelayGauge.Mean");
+ double newSubmissionToScheduleDelay =
+ (double) _beanValueMap.get("SubmissionToScheduleDelayGauge.Mean");
+ double newControllerInducedDelay =
+ (double) _beanValueMap.get("ControllerInducedDelayGauge.Mean");
+
+ Assert.assertTrue(submissionToProcessDelay > 0);
+ Assert.assertTrue(oldSubmissionToStartDelay < newSubmissionToScheduleDelay);
+ Assert.assertTrue(oldControllerInducedDelay < newControllerInducedDelay);
+
+ oldSubmissionToStartDelay = newSubmissionToScheduleDelay;
+ oldControllerInducedDelay = newControllerInducedDelay;
+ }
+ }
+
+ /**
+ * Queries for all MBeans from the MBean Server and only looks at the relevant MBean and
gets its
+ * metric numbers.
+ */
+ private void extractMetrics() {
+ try {
+ QueryExp exp = Query.match(Query.attr("SensorName"), Query.value("*"));
+ Set<ObjectInstance> mbeans = new HashSet<>(
+ ManagementFactory.getPlatformMBeanServer().queryMBeans(new ObjectName(""), exp));
+ for (ObjectInstance instance : mbeans) {
+ ObjectName beanName = instance.getObjectName();
+ if (instance.getClassName().contains("JobMonitor")) {
+ MBeanInfo info = _server.getMBeanInfo(beanName);
+ MBeanAttributeInfo[] infos = info.getAttributes();
+ for (MBeanAttributeInfo infoItem : infos) {
+ Object val = _server.getAttribute(beanName, infoItem.getName());
+ _beanValueMap.put(infoItem.getName(), val);
+ }
+ }
+ }
+ } catch (Exception e) {
+ // update failed
+ }
+ }
+}
|