helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [helix] 03/13: Add management mode pipeline registry and switch logic (#1769)
Date Fri, 16 Jul 2021 21:03:10 GMT
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 7190eb3ee3150a77c53ae768f89ee2b0473cc711
Author: Huizhi Lu <5187721+huizhilu@users.noreply.github.com>
AuthorDate: Wed Jun 9 21:01:30 2021 -0700

    Add management mode pipeline registry and switch logic (#1769)
    
    Management Mode Pipeline will help check the cluster status and determine whether the
default pipelines can be run.
    One use case is, it will help controller to decide when it can exit the cluster freeze
mode.
    
    This commit adds management mode pipeline and logic to switch from/to the default resource/task
pipelines.
---
 .../helix/controller/GenericHelixController.java   | 151 ++++++++++++++++-----
 .../ManagementControllerDataProvider.java}         |  28 +---
 .../apache/helix/controller/pipeline/Pipeline.java |   9 +-
 .../helix/controller/stages/ClusterEventType.java  |   1 +
 .../controller/stages/ManagementModeStage.java     |  48 +++++++
 .../controller/stages/ResourceValidationStage.java |  10 ++
 .../main/java/org/apache/helix/util/HelixUtil.java |  12 ++
 .../java/org/apache/helix/util/RebalanceUtil.java  |  22 +++
 8 files changed, 223 insertions(+), 58 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 11cd9ee..7da6ef0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -63,6 +63,7 @@ import org.apache.helix.api.listeners.TaskCurrentStateChangeListener;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -81,7 +82,7 @@ import org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.ManagementModeStage;
 import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
@@ -141,6 +142,7 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
   private static final int ASYNC_TASKS_THREADPOOL_SIZE = 10;
   private final PipelineRegistry _registry;
   private final PipelineRegistry _taskRegistry;
+  private final PipelineRegistry _managementModeRegistry;
 
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
@@ -163,6 +165,11 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
   private final ClusterEventBlockingQueue _taskEventQueue;
   private final ClusterEventProcessor _taskEventThread;
 
+  // Controller will switch to run management mode pipeline when set to true.
+  private boolean _inManagementMode;
+  private final ClusterEventBlockingQueue _managementModeEventQueue;
+  private final ClusterEventProcessor _managementModeEventThread;
+
   private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
 
   private long _continuousRebalanceFailureCount = 0;
@@ -197,6 +204,7 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
    */
   private final ResourceControllerDataProvider _resourceControlDataProvider;
   private final WorkflowControllerDataProvider _workflowControlDataProvider;
+  private final ManagementControllerDataProvider _managementControllerDataProvider;
   private final ScheduledExecutorService _asyncTasksThreadPool;
 
   /**
@@ -285,13 +293,21 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
 
   public GenericHelixController(String clusterName) {
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName,
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()), clusterName,
         Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   public GenericHelixController(String clusterName, Set<Pipeline.Type> enabledPipelins)
{
     this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
-        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, enabledPipelins);
+        createTaskRegistry(Pipeline.Type.TASK.name()),
+        createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        clusterName,
+        enabledPipelins);
+  }
+
+  public void setInManagementMode(boolean enabled) {
+    _inManagementMode = enabled;
   }
 
   class RebalanceTask extends TimerTask {
@@ -566,6 +582,8 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
       registry
           .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline,
               dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, autoExitMaintenancePipeline,
+          dataPreprocess, externalViewPipeline, rebalancePipeline);
       // TODO: We now include rebalance pipeline in customized state change for correctness.
       // However, it is not efficient, and we should improve this by splitting the pipeline
or
       // controller roles to multiple hosts.
@@ -625,22 +643,49 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
           rebalancePipeline);
       registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, dataPreprocess,
           rebalancePipeline);
+      registry.register(ClusterEventType.ControllerChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      return registry;
+    }
+  }
+
+  private static PipelineRegistry createManagementModeRegistry(String pipelineName) {
+    logger.info("Creating management mode registry");
+    synchronized (GenericHelixController.class) {
+      // cluster data cache refresh
+      Pipeline dataRefresh = new Pipeline(pipelineName);
+      dataRefresh.addStage(new ReadClusterDataStage());
+
+      // cluster management mode process
+      Pipeline managementMode = new Pipeline(pipelineName);
+      managementMode.addStage(new ManagementModeStage());
+
+      PipelineRegistry registry = new PipelineRegistry();
+      Arrays.asList(
+          ClusterEventType.ControllerChange,
+          ClusterEventType.LiveInstanceChange,
+          ClusterEventType.MessageChange,
+          ClusterEventType.OnDemandRebalance,
+          ClusterEventType.PeriodicalRebalance
+      ).forEach(type -> registry.register(type, dataRefresh, managementMode));
+
       return registry;
     }
   }
 
   // TODO: refactor the constructor as providing both registry but only enabling one looks
confusing
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry)
{
-    this(registry, taskRegistry, null, Sets.newHashSet(
-        Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
+    this(registry, taskRegistry, createManagementModeRegistry(Pipeline.Type.MANAGEMENT_MODE.name()),
+        null, Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry,
-      final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) {
-    _paused = false;
+      PipelineRegistry managementModeRegistry, final String clusterName,
+      Set<Pipeline.Type> enabledPipelineTypes) {
     _enabledPipelineTypes = enabledPipelineTypes;
     _registry = registry;
     _taskRegistry = taskRegistry;
+    _managementModeRegistry = managementModeRegistry;
     _lastSeenInstances = new AtomicReference<>();
     _lastSeenSessions = new AtomicReference<>();
     _lastSeenCustomizedStateTypesMapRef = new AtomicReference<>();
@@ -660,6 +705,7 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
     _onDemandRebalanceTimer =
         new Timer("GenericHelixController_" + _clusterName + "_onDemand_Timer", true);
 
+    // TODO: refactor to simplify below similar code of the 3 pipelines
     // initialize pipelines at the end so we have everything else prepared
     if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
       logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
@@ -689,6 +735,16 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
       _taskEventThread = null;
     }
 
+    logger.info("Initializing {} pipeline", Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementControllerDataProvider =
+        new ManagementControllerDataProvider(clusterName, Pipeline.Type.MANAGEMENT_MODE.name());
+    _managementModeEventQueue = new ClusterEventBlockingQueue();
+    _managementModeEventThread =
+        new ClusterEventProcessor(_managementControllerDataProvider, _managementModeEventQueue,
+            Pipeline.Type.MANAGEMENT_MODE.name() + "-" + clusterName);
+    initPipeline(_managementModeEventThread, _managementControllerDataProvider);
+    logger.info("Initialized {} pipeline", Pipeline.Type.MANAGEMENT_MODE.name());
+
     addController(this);
   }
 
@@ -776,12 +832,36 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
 
     _helixManager = manager;
 
-    // TODO If init controller with paused = true, it may not take effect immediately
-    // _paused is default false. If any events come before controllerChangeEvent, the controller
-    // will be excuting in un-paused mode. Which might not be the config in ZK.
-    if (_paused) {
-      logger.info("Cluster " + manager.getClusterName() + " is paused. Ignoring the event:"
+ event
-          .getEventType());
+    // Prepare ClusterEvent
+    // TODO (harry): this is a temporal workaround - after controller is separated we should
not
+    // have this instanceof clauses
+    List<Pipeline> pipelines;
+    boolean isTaskFrameworkPipeline = false;
+    Pipeline.Type pipelineType;
+
+    if (dataProvider instanceof ResourceControllerDataProvider) {
+      pipelines = _registry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.DEFAULT;
+    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
+      pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
+      isTaskFrameworkPipeline = true;
+      pipelineType = Pipeline.Type.TASK;
+    } else if (dataProvider instanceof ManagementControllerDataProvider) {
+      pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
+      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+    } else {
+      logger.warn(String
+          .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
+              event.getEventType(), event.getEventId()));
+      return;
+    }
+
+    // Should not run management mode and default/task pipelines at the same time.
+    if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
+        || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType)))
{
+      logger.info("Should not run management mode and default/task pipelines at the same
time. "
+              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
+          manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
       return;
     }
 
@@ -808,26 +888,6 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
 
     dataProvider.setClusterEventId(event.getEventId());
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
-
-    // Prepare ClusterEvent
-    // TODO (harry): this is a temporal workaround - after controller is separated we should
not
-    // have this instanceof clauses
-    List<Pipeline> pipelines;
-    boolean isTaskFrameworkPipeline = false;
-
-    if (dataProvider instanceof ResourceControllerDataProvider) {
-      pipelines = _registry
-          .getPipelinesForEvent(event.getEventType());
-    } else if (dataProvider instanceof WorkflowControllerDataProvider) {
-      pipelines = _taskRegistry
-          .getPipelinesForEvent(event.getEventType());
-      isTaskFrameworkPipeline = true;
-    } else {
-      logger.warn(String
-          .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
-              event.getEventType(), event.getEventId()));
-      return;
-    }
     event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
 
     logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {},
ID: {}. "
@@ -1202,6 +1262,10 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.notifyDataChange(type, path);
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.notifyDataChange(type, path);
+    }
   }
 
   private void requestDataProvidersFullRefresh() {
@@ -1212,6 +1276,10 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
     if (_workflowControlDataProvider != null) {
       _workflowControlDataProvider.requireFullRefresh();
     }
+
+    if (_managementControllerDataProvider != null) {
+      _managementControllerDataProvider.requireFullRefresh();
+    }
   }
 
   private void pushToEventQueues(ClusterEventType eventType, NotificationContext changeContext,
@@ -1228,6 +1296,14 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
     for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) {
       event.addAttribute(attr.getKey(), attr.getValue());
     }
+
+    // Management mode event will force management mode pipeline.
+    if (_inManagementMode) {
+      event.setEventId(uid + "_" + Pipeline.Type.MANAGEMENT_MODE.name());
+      enqueueEvent(_managementModeEventQueue, event);
+      return;
+    }
+
     enqueueEvent(_eventQueue, event);
     enqueueEvent(_taskEventQueue,
         event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
@@ -1269,7 +1345,10 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
       boolean prevInMaintenanceMode = _inMaintenanceMode;
       _paused = updateControllerState(pauseSignal, _paused);
       _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
-      triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode);
+      // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
+      if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
+        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
+      }
 
       enableClusterStatusMonitor(true);
       _clusterStatusMonitor.setEnabled(!_paused);
@@ -1484,7 +1563,7 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
    * @param prevPaused the previous paused status.
    * @param prevInMaintenanceMode the previous in maintenance mode status.
    */
-  private void triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
+  private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
       boolean prevInMaintenanceMode) {
     /**
      * WARNING: the logic here is tricky.
@@ -1496,7 +1575,9 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
     if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode)))
{
       pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
       logger.info("controller is now resumed from paused/maintenance state");
+      return true;
     }
+    return false;
   }
 
   // TODO: refactor this to use common/ClusterEventProcessor.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
similarity index 62%
copy from helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
copy to helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index cd0ce60..d178ca5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -1,4 +1,4 @@
-package org.apache.helix.controller.stages;
+package org.apache.helix.controller.dataproviders;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,25 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-public enum ClusterEventType {
-  IdealStateChange,
-  CurrentStateChange,
-  TaskCurrentStateChange,
-  CustomizedStateChange,
-  ConfigChange,
-  ClusterConfigChange,
-  ResourceConfigChange,
-  InstanceConfigChange,
-  CustomizeStateConfigChange,
-  LiveInstanceChange,
-  MessageChange,
-  ExternalViewChange,
-  CustomizedViewChange,
-  TargetExternalViewChange,
-  Resume,
-  PeriodicalRebalance,
-  OnDemandRebalance,
-  RetryRebalance,
-  StateVerifier,
-  Unknown
+public class ManagementControllerDataProvider extends BaseControllerDataProvider {
+  // TODO: implement this class to only refresh required event types
+  public ManagementControllerDataProvider(String clusterName, String name) {
+    super(clusterName, name);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index ecf42da..4196e23 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -35,7 +35,14 @@ public class Pipeline {
 
   public enum Type {
     DEFAULT,
-    TASK
+    TASK,
+
+    /**
+     * A pipeline used to manage the cluster when it is in admin management mode:
+     * cluster freeze mode, controller pause mode, etc. Used by Helix internally,
+     * not meant to be used for Helix external users.
+     */
+    MANAGEMENT_MODE
   }
 
   public Pipeline() {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index cd0ce60..65f6bb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -37,6 +37,7 @@ public enum ClusterEventType {
   Resume,
   PeriodicalRebalance,
   OnDemandRebalance,
+  ControllerChange,
   RetryRebalance,
   StateVerifier,
   Unknown
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
new file mode 100644
index 0000000..512224d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks the cluster status whether the cluster is in management mode.
+ */
+public class ManagementModeStage extends AbstractBaseStage {
+  private static final Logger LOG = LoggerFactory.getLogger(ManagementModeStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    // TODO: implement the stage
+    String clusterName = event.getClusterName();
+    ManagementControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    if (!HelixUtil.inManagementMode(cache)) {
+      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+      RebalanceUtil.enableManagementMode(clusterName, false);
+      throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 075c40f..a4d4783 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,8 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,14 @@ public class ResourceValidationStage extends AbstractBaseStage {
     if (cache == null) {
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
+
+    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+    if (HelixUtil.inManagementMode(cache)) {
+      // Trigger an immediate management mode pipeline.
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException("Pipeline should not be run because cluster is in management
mode");
+    }
+
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
       throw new StageException("Resources must be computed prior to validation!");
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index ce22c5f..3151716 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -547,4 +548,15 @@ public final class HelixUtil {
     instanceConfig.setInstanceEnabled(true);
     return instanceConfig;
   }
+
+  /**
+   * Checks whether or not the cluster is in management mode.
+   *
+   * @param cache
+   * @return
+   */
+  public static boolean inManagementMode(BaseControllerDataProvider cache) {
+    // TODO: implement the logic. Parameters can also change
+    return true;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 91ec406..db2b76f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -145,6 +145,28 @@ public class RebalanceUtil {
     return result;
   }
 
+  /**
+   * Enables/disables controller to run management mode pipeline.
+   *
+   * @param clusterName target cluster name
+   * @param enabled enable/disable controller to management mode pipeline
+   */
+  public static void enableManagementMode(String clusterName, boolean enabled) {
+    GenericHelixController leaderController =
+        GenericHelixController.getLeaderController(clusterName);
+    if (leaderController != null) {
+      LOG.info("Switching management mode pipeline for cluster={}, enabled={}", clusterName,
+          enabled);
+      leaderController.setInManagementMode(enabled);
+    } else {
+      LOG.error("Failed to switch management mode pipeline, enabled={}. "
+          + "Controller for cluster {} does not exist", clusterName, enabled);
+    }
+
+    // Triggers an event to immediately run the pipeline
+    scheduleOnDemandPipeline(clusterName, 0L);
+  }
+
   public static void scheduleOnDemandPipeline(String clusterName, long delay) {
     scheduleOnDemandPipeline(clusterName, delay, true);
   }

Mime
View raw message