helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [helix] 08/13: Add message generation logic for management pipeline (#1803)
Date Fri, 16 Jul 2021 21:03:15 GMT
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 02c7ac0cdd3e02697fcc126b059f2c7d94e5803b
Author: Huizhi Lu <5187721+huizhilu@users.noreply.github.com>
AuthorDate: Mon Jun 28 22:53:46 2021 -0700

    Add message generation logic for management pipeline (#1803)
    
    In cluster freeze mode, controller sends freeze/unfreeze and pending ST cancellation messages
to participants.
    In this commit, the existing message generation stage is leveraged to create the ST cancellation
messages.
    Best possible out is built by copying the state map from current state for generating
cancellation messages.
    For freeze/unfreeze messages, the logic is added in ManagementMessageGenerationPhase.
    The existing MessageDispatchStage is also used for dispatching all the messages.
---
 .../helix/api/status/ClusterManagementMode.java    |  24 ++++
 .../helix/controller/GenericHelixController.java   |  11 +-
 .../helix/controller/stages/AttributeName.java     |   3 +
 .../stages/ManagementMessageDispatchStage.java     |  62 +++++++++
 .../stages/ManagementMessageGenerationPhase.java   | 119 ++++++++++++++++++
 .../controller/stages/ManagementModeStage.java     |  22 ++--
 .../helix/controller/stages/MessageOutput.java     |  11 +-
 .../java/org/apache/helix/model/LiveInstance.java  |   3 +-
 .../java/org/apache/helix/util/RebalanceUtil.java  |  27 ++++
 .../stages/TestManagementMessageGeneration.java    | 140 +++++++++++++++++++++
 .../controller/stages/TestManagementModeStage.java |   2 +
 11 files changed, 410 insertions(+), 14 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
index cbd2019..86817d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
+++ b/helix-core/src/main/java/org/apache/helix/api/status/ClusterManagementMode.java
@@ -19,6 +19,8 @@ package org.apache.helix.api.status;
  * under the License.
  */
 
+import org.apache.helix.model.LiveInstance;
+
 /**
  * Represents the management mode of the cluster:
  * 1. what type of mode it targets to be;
@@ -73,4 +75,26 @@ public class ClusterManagementMode {
     public Type getMode() {
         return mode;
     }
+
+    public boolean isFullyInNormalMode() {
+        return Type.NORMAL.equals(mode) && Status.COMPLETED.equals(status);
+    }
+
+    /**
+     * Gets the desired live instance status for this management mode.
+     *
+     * @return The desired {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}.
+     * If participants status is not expected to change for the management mode, null is
returned.
+     */
+    public LiveInstance.LiveInstanceStatus getDesiredParticipantStatus() {
+        switch (mode) {
+            case CLUSTER_PAUSE:
+                return LiveInstance.LiveInstanceStatus.PAUSED;
+            case NORMAL:
+                return LiveInstance.LiveInstanceStatus.NORMAL;
+            default:
+                // Other modes don't need to change participant status
+                return null;
+        }
+    }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 00bdfcc..95416d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -82,6 +82,8 @@ import org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
+import org.apache.helix.controller.stages.ManagementMessageDispatchStage;
+import org.apache.helix.controller.stages.ManagementMessageGenerationPhase;
 import org.apache.helix.controller.stages.ManagementModeStage;
 import org.apache.helix.controller.stages.MessageGenerationPhase;
 import org.apache.helix.controller.stages.MessageSelectionStage;
@@ -648,9 +650,16 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
       Pipeline dataRefresh = new Pipeline(pipelineName);
       dataRefresh.addStage(new ReadClusterDataStage());
 
+      // data pre-process pipeline
+      Pipeline dataPreprocess = new Pipeline(pipelineName);
+      dataPreprocess.addStage(new ResourceComputationStage());
+      dataPreprocess.addStage(new CurrentStateComputationStage());
+
       // cluster management mode process
       Pipeline managementMode = new Pipeline(pipelineName);
       managementMode.addStage(new ManagementModeStage());
+      managementMode.addStage(new ManagementMessageGenerationPhase());
+      managementMode.addStage(new ManagementMessageDispatchStage());
 
       PipelineRegistry registry = new PipelineRegistry();
       Arrays.asList(
@@ -659,7 +668,7 @@ public class GenericHelixController implements IdealStateChangeListener,
LiveIns
           ClusterEventType.MessageChange,
           ClusterEventType.OnDemandRebalance,
           ClusterEventType.PeriodicalRebalance
-      ).forEach(type -> registry.register(type, dataRefresh, managementMode));
+      ).forEach(type -> registry.register(type, dataRefresh, dataPreprocess, managementMode));
 
       return registry;
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index d9ca40b..8771bff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -45,6 +45,9 @@ public enum AttributeName {
   /** This is the cluster manager's session id when event is received. */
   EVENT_SESSION,
 
+  /** Represents cluster's status, used in management mode pipeline. */
+  CLUSTER_STATUS,
+
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause
race conditions.
   TO_BE_PURGED_WORKFLOWS,
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause
race conditions.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
new file mode 100644
index 0000000..ca7aa31
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageDispatchStage.java
@@ -0,0 +1,62 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.model.Message;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Dispatches participant status change and pending state transition cancellation messages
+ * for the management pipeline.
+ */
+public class ManagementMessageDispatchStage extends MessageDispatchStage {
+  private static final Logger LOG = LoggerFactory.getLogger(ManagementMessageDispatchStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    MessageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    processEvent(event, messageOutput);
+
+    // Send participant status change messages.
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    List<Message> messagesSent =
+        super.sendMessages(manager.getHelixDataAccessor(), messageOutput.getStatusChangeMessages());
+    ManagementControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    cache.cacheMessages(messagesSent);
+
+    // Can exit management mode pipeline after fully in normal mode
+    ClusterManagementMode managementMode = event.getAttribute(AttributeName.CLUSTER_STATUS.name());
+    if (managementMode.isFullyInNormalMode()) {
+      LogUtil.logInfo(LOG, _eventId,
+          "Exiting management mode pipeline for cluster " + event.getClusterName());
+      RebalanceUtil.enableManagementMode(event.getClusterName(), false);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
new file mode 100644
index 0000000..4479d50
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
@@ -0,0 +1,119 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.util.MessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates participant status change (freeze/unfreeze) and pending state transition cancellation
+ * messages for management mode pipeline.
+ */
+public class ManagementMessageGenerationPhase extends MessageGenerationPhase {
+  private static final Logger LOG = LoggerFactory.getLogger(ManagementMessageGenerationPhase.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    String clusterName = event.getClusterName();
+    HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+    ClusterManagementMode managementMode = event.getAttribute(AttributeName.CLUSTER_STATUS.name());
+    ManagementControllerDataProvider cache =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    if (manager == null || managementMode == null || cache == null) {
+      throw new StageException("Missing attributes in event: " + event
+          + ". Requires HelixManager|ClusterStatus|DataCache");
+    }
+
+    PauseSignal pauseSignal = cache.getPauseSignal();
+    if (cache.getClusterConfig().isStateTransitionCancelEnabled()
+        && pauseSignal != null && pauseSignal.getCancelPendingST()) {
+      // Generate ST cancellation messages.
+      LogUtil.logInfo(LOG, _eventId,
+          "Generating ST cancellation messages for cluster " + clusterName);
+      super.process(event);
+    }
+
+    MessageOutput messageOutput =
+        event.getAttributeWithDefault(AttributeName.MESSAGES_ALL.name(), new MessageOutput());
+    // Is participant status change still in progress? Create messages
+    if (!ClusterManagementMode.Status.COMPLETED.equals(managementMode.getStatus())) {
+      LogUtil.logInfo(LOG, _eventId, "Generating messages as cluster " + clusterName
+          + " is still in progress to change participant status");
+      List<Message> messages = generateStatusChangeMessages(managementMode,
+          cache.getEnabledLiveInstances(), cache.getLiveInstances(),
+          cache.getAllInstancesMessages(), manager.getInstanceName(), manager.getSessionId());
+      messageOutput.addStatusChangeMessages(messages);
+    }
+
+    event.addAttribute(AttributeName.MESSAGES_ALL.name(), messageOutput);
+  }
+
+  private List<Message> generateStatusChangeMessages(ClusterManagementMode managementMode,
+      Set<String> enabledLiveInstances, Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> allInstanceMessages, String managerInstance,
+      String managerSessionId) {
+    List<Message> messagesGenerated = new ArrayList<>();
+
+    LiveInstanceStatus desiredStatus = managementMode.getDesiredParticipantStatus();
+
+    // Check status and pending status change messages for all enabled live instances.
+    // Send freeze/unfreeze messages if necessary
+    for (String instanceName : enabledLiveInstances) {
+      LiveInstance liveInstance = liveInstanceMap.get(instanceName);
+      Collection<Message> pendingMessages = allInstanceMessages.get(instanceName);
+      String sessionId = liveInstance.getEphemeralOwner();
+      LiveInstanceStatus currentStatus = liveInstance.getStatus();
+
+      if (needStatusChangeMessage(pendingMessages, currentStatus, desiredStatus)) {
+        Message statusChangeMessage = MessageUtil.createStatusChangeMessage(currentStatus,
+            desiredStatus, managerInstance, managerSessionId, instanceName, sessionId);
+        messagesGenerated.add(statusChangeMessage);
+      }
+    }
+
+    return messagesGenerated;
+  }
+
+  private boolean needStatusChangeMessage(Collection<Message> messages,
+      LiveInstanceStatus currentStatus, LiveInstanceStatus desiredStatus) {
+    // 1. current status is not equal to desired status
+    // 2. participant change status message is not sent
+    return currentStatus != desiredStatus && messages.stream().noneMatch(
+        message -> message.isParticipantStatusChangeType() && desiredStatus.name()
+            .equals(message.getToState()));
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 94ff1d3..b639685 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -40,7 +40,9 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -66,6 +68,14 @@ public class ManagementModeStage extends AbstractBaseStage {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+
+    final BestPossibleStateOutput bestPossibleStateOutput =
+        RebalanceUtil.buildBestPossibleState(resourceMap.keySet(), currentStateOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
 
     ClusterManagementMode managementMode =
         checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(),
@@ -75,17 +85,7 @@ public class ManagementModeStage extends AbstractBaseStage {
     recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(),
         accessor);
 
-    // TODO: move to the last stage of management pipeline
-    checkInManagementMode(clusterName, cache);
-  }
-
-  private void checkInManagementMode(String clusterName, ManagementControllerDataProvider
cache) {
-    // Should exit management mode
-    if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
-        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
-      LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName);
-      RebalanceUtil.enableManagementMode(clusterName, false);
-    }
+    event.addAttribute(AttributeName.CLUSTER_STATUS.name(), managementMode);
   }
 
   // Checks cluster freeze, controller pause mode and status.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
index ad7e6c8..1f47ee4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageOutput.java
@@ -30,10 +30,11 @@ import org.apache.helix.model.Partition;
 
 public class MessageOutput {
   private final Map<String, Map<Partition, List<Message>>> _messagesMap;
+  private final List<Message> _statusChangeMessages;
 
   public MessageOutput() {
     _messagesMap = new HashMap<>();
-
+    _statusChangeMessages = new ArrayList<>();
   }
 
   public void addMessage(String resourceName, Partition partition, Message message) {
@@ -56,6 +57,10 @@ public class MessageOutput {
     _messagesMap.get(resourceName).put(partition, messages);
   }
 
+  public void addStatusChangeMessages(List<Message> messages) {
+    _statusChangeMessages.addAll(messages);
+  }
+
   public List<Message> getMessages(String resourceName, Partition resource) {
     Map<Partition, List<Message>> map = _messagesMap.get(resourceName);
     if (map != null && map.get(resource) != null) {
@@ -68,6 +73,10 @@ public class MessageOutput {
     return _messagesMap.getOrDefault(resourceName, Collections.emptyMap());
   }
 
+  public List<Message> getStatusChangeMessages() {
+    return _statusChangeMessages;
+  }
+
   @Override
   public String toString() {
     return _messagesMap.toString();
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 7a9671d..fed2940 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -50,7 +50,8 @@ public class LiveInstance extends HelixProperty {
    * Saved values for the {@link LiveInstanceProperty#STATUS} field
    */
   public enum LiveInstanceStatus {
-    PAUSED
+    PAUSED,
+    NORMAL
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index f74b98f..bc4054a 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.util;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -30,8 +31,11 @@ import org.apache.helix.HelixException;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -192,6 +196,29 @@ public class RebalanceUtil {
   }
 
   /**
+   * Build best possible state out by copying the state map from current state output.
+   * It'll be used for generating pending ST cancellation messages.
+   *
+   * @param resourceNames collection of resource names
+   * @param currentStateOutput Current state output {@link CurrentStateOutput}
+   * @return {@link BestPossibleStateOutput}
+   */
+  public static BestPossibleStateOutput buildBestPossibleState(Collection<String> resourceNames,
+      CurrentStateOutput currentStateOutput) {
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+    for (String resource : resourceNames) {
+      Map<Partition, Map<String, String>> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resource);
+      if (currentStateMap != null) {
+        output.setState(resource, currentStateMap);
+      }
+    }
+
+    return output;
+  }
+
+  /**
    * runStage allows the run of individual stages. It can be used to mock a part of the Controller
    * pipeline run.
    *
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
new file mode 100644
index 0000000..c16b503
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
@@ -0,0 +1,140 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.RebalanceUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestManagementMessageGeneration extends ManagementMessageGenerationPhase {
+  private static final String TEST_CLUSTER = "testCluster";
+  private static final String TEST_RESOURCE = "resource0";
+  private static final String TEST_INSTANCE = "instance0";
+  private static final String TEST_PARTITION = "partition0";
+
+  @Test
+  public void testCancelPendingSTMessage() throws Exception {
+    List<Message> messages = generateMessages("ONLINE", "ONLINE", "OFFLINE", true);
+
+    Assert.assertEquals(messages.size(), 1, "Should create cancellation message");
+
+    Message msg = messages.get(0);
+    Assert.assertEquals(msg.getMsgType(), Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
+    Assert.assertEquals(msg.getFromState(), "ONLINE");
+    Assert.assertEquals(msg.getToState(), "OFFLINE");
+
+    messages = generateMessages("ONLINE", "ONLINE", "OFFLINE", false);
+    Assert.assertEquals(messages.size(), 0);
+  }
+
+  private List<Message> generateMessages(String currentState, String fromState, String
toState,
+      boolean cancelPendingST) throws Exception {
+    ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown);
+
+    // Set current state to event
+    CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
+    Partition partition = mock(Partition.class);
+    when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
+    when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, TEST_INSTANCE))
+        .thenReturn(currentState);
+    when(currentStateOutput.getCurrentStateMap(TEST_RESOURCE))
+        .thenReturn(ImmutableMap.of(partition, ImmutableMap.of(TEST_INSTANCE, currentState)));
+
+    // Pending message for error partition reset
+    Message pendingMessage = mock(Message.class);
+    when(pendingMessage.getFromState()).thenReturn(fromState);
+    when(pendingMessage.getToState()).thenReturn(toState);
+    when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
+        .thenReturn(pendingMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    // Set helix manager to event
+    event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
+
+    StateModelDefinition stateModelDefinition = new StateModelDefinition.Builder("TestStateModel")
+        .addState("ONLINE", 1).addState("OFFLINE")
+        .addState("DROPPED").addState("ERROR")
+        .initialState("OFFLINE")
+        .addTransition("ERROR", "OFFLINE", 1).addTransition("ONLINE", "OFFLINE", 2)
+        .addTransition("OFFLINE", "DROPPED", 3).addTransition("OFFLINE", "ONLINE", 4)
+        .build();
+
+    // Set controller data provider to event
+    ManagementControllerDataProvider cache = mock(ManagementControllerDataProvider.class);
+    when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
+    Map<String, LiveInstance> liveInstances = mock(Map.class);
+    LiveInstance mockLiveInstance = mock(LiveInstance.class);
+    when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
+    when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
+    when(liveInstances.values()).thenReturn(Collections.singletonList(mockLiveInstance));
+    when(cache.getLiveInstances()).thenReturn(liveInstances);
+    ClusterConfig clusterConfig = mock(ClusterConfig.class);
+    when(cache.getClusterConfig()).thenReturn(clusterConfig);
+    when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(cancelPendingST);
+    PauseSignal pauseSignal = mock(PauseSignal.class);
+    when(pauseSignal.getCancelPendingST()).thenReturn(true);
+    when(cache.getPauseSignal()).thenReturn(pauseSignal);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Set event attribute: resources to rebalance
+    Map<String, Resource> resourceMap = new HashMap<>();
+    Resource resource = mock(Resource.class);
+    when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
+    List<Partition> partitions = Collections.singletonList(partition);
+    when(resource.getPartitions()).thenReturn(partitions);
+    when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
+    resourceMap.put(TEST_RESOURCE, resource);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+
+    // set up resource state map
+    BestPossibleStateOutput bestPossibleStateOutput =
+        RebalanceUtil.buildBestPossibleState(resourceMap.keySet(), currentStateOutput);
+
+    // Process the event
+    ClusterManagementMode mode = new ClusterManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE,
+        ClusterManagementMode.Status.IN_PROGRESS);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CLUSTER_STATUS.name(), mode);
+    process(event);
+    MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
+
+    return output.getMessages(TEST_RESOURCE, partition);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
index 28ca524..2737bf7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -83,6 +83,8 @@ public class TestManagementModeStage extends ZkTestBase {
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
+    dataRefresh.addStage(new ResourceComputationStage());
+    dataRefresh.addStage(new CurrentStateComputationStage());
     runPipeline(event, dataRefresh, false);
     ManagementModeStage managementModeStage = new ManagementModeStage();
     managementModeStage.process(event);

Mime
View raw message