helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: Add test to validate no duplicated state transition message sent when p2p message is enabled.
Date Tue, 26 Jun 2018 22:47:35 GMT
Repository: helix
Updated Branches:
  refs/heads/master 8dc19afb9 -> 266b8bb1a


Add test to validate no duplicated state transition message sent when p2p message is enabled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/266b8bb1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/266b8bb1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/266b8bb1

Branch: refs/heads/master
Commit: 266b8bb1ae30c23c4310886c111f826e225205f4
Parents: 8dc19af
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Apr 27 09:50:22 2018 -0700
Committer: Junkai Xue <jxue@jxue-mn2.linkedin.biz>
Committed: Tue Jun 26 15:46:59 2018 -0700

----------------------------------------------------------------------
 .../common/caches/InstanceMessagesCache.java    |   6 +-
 .../TestP2PMessagesAvoidDuplicatedMessage.java  | 301 +++++++++++++++++++
 2 files changed, 302 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/266b8bb1/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 69094d3..1929776 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -171,11 +171,7 @@ public class InstanceMessagesCache {
 
           for (Message relayMsg : message.getRelayMessages().values()) {
             relayMsg.setRelayTime(transitionCompleteTime);
-            if (!relayMsg.isExpired()) {
-              relayMessages.add(relayMsg);
-            } else {
-              LOG.info("Relay message " + relayMsg.getId() + " already expired, ignore it!");
-            }
+            relayMessages.add(relayMsg);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/266b8bb1/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
new file mode 100644
index 0000000..fb84117
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -0,0 +1,301 @@
+package org.apache.helix.messaging.p2pMessage;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.common.ResourcesStateMap;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.IntermediateStateOutput;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageSelectionStage;
+import org.apache.helix.controller.stages.MessageSelectionStageOutput;
+import org.apache.helix.controller.stages.MessageThrottleStage;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest {
+  String _db = "testDB";
+  int _numPartition = 1;
+  int _numReplica = 3;
+
+  Partition _partition = new Partition(_db + "_0");
+
+  ClusterDataCache _dataCache;
+  Pipeline _fullPipeline;
+  Pipeline _messagePipeline;
+
+  ResourcesStateMap _bestpossibleState;
+
+  private void preSetup() throws Exception {
+    setupIdealState(3, new String[] { _db }, _numPartition, _numReplica,
+        IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name());
+    setupStateModel();
+    setupInstances(3);
+    setupLiveInstances(3);
+
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.enableP2PMessage(true);
+    setClusterConfig(clusterConfig);
+
+    Map<String, Resource> resourceMap = getResourceMap(new String[] { _db }, _numPartition,
+        BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null);
+
+    _dataCache = new ClusterDataCache();
+    _dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor());
+
+    event.addAttribute(AttributeName.ClusterDataCache.name(), _dataCache);
+    event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    _fullPipeline = new Pipeline("FullPipeline");
+    _fullPipeline.addStage(new ReadClusterDataStage());
+    _fullPipeline.addStage(new BestPossibleStateCalcStage());
+    _fullPipeline.addStage(new IntermediateStateCalcStage());
+    _fullPipeline.addStage(new MessageGenerationPhase());
+    _fullPipeline.addStage(new MessageSelectionStage());
+    _fullPipeline.addStage(new MessageThrottleStage());
+
+    _messagePipeline = new Pipeline("MessagePipeline");
+    _messagePipeline.addStage(new MessageGenerationPhase());
+    _messagePipeline.addStage(new MessageSelectionStage());
+    _messagePipeline.addStage(new MessageThrottleStage());
+
+
+    _fullPipeline.handle(event);
+    _bestpossibleState =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+  }
+
+
+  @Test
+  public void testP2PAvoidDuplicatedMessage() throws Exception {
+    preSetup();
+
+    // Scenario 1:
+    // Disable old master ((initialMaster) instance,
+    // Validate: a M->S message should be sent to initialMaster with a P2P message attached
for secondMaster.
+    String initialMaster = getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db,
_partition),
+        MasterSlaveSMD.States.MASTER.name());
+    Assert.assertNotNull(initialMaster);
+
+    // disable existing master instance
+    admin.enableInstance(_clusterName, initialMaster, false);
+    _dataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+    CurrentStateOutput currentStateOutput =
+        populateCurrentStateFromBestPossible(_bestpossibleState);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _fullPipeline.handle(event);
+
+    _bestpossibleState = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    MessageSelectionStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    List<Message> messages = messageOutput.getMessages(_db, _partition);
+
+    Assert.assertEquals(messages.size(), 1);
+    Message toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), initialMaster);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+    // verify p2p message are attached to the M->S message sent to the old master instance
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+    String secondMaster =
+        getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), MasterSlaveSMD.States.MASTER.name());
+
+    Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), secondMaster);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), initialMaster);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+
+    // Scenario 2:
+    // Old master (initialMaster) completes the M->S transition,
+    // but has not forward p2p message to new master (secondMaster) yet.
+    // Validate: Controller should not send S->M message to new master.
+
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+    currentStateOutput.setPendingState(_db, _partition, initialMaster, toSlaveMessage);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _fullPipeline.handle(event);
+
+    messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // Scenario 3:
+    // Old master (initialMaster) completes the M->S transition,
+    // and has already forwarded p2p message to new master (secondMaster)
+    // The original S->M message sent to old master has been removed.
+    // Validate: Controller should send S->O to old master, but not S->M message to
new master.
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(_bestpossibleState);
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
+    currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _fullPipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 1);
+
+    Message toOfflineMessage = messages.get(0);
+    Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster);
+    Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name());
+
+
+    // Scenario 4:
+    // The old master (initialMaster) finish state transition, but has not forward p2p message
yet.
+    // Then the preference list has changed, so now the new master (thirdMaster) is different
from previously calculated new master (secondMaster)
+    // Validate: controller should not send S->M to thirdMaster.
+    currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    String thirdMaster =
+        getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
+            MasterSlaveSMD.States.SLAVE.name());
+
+    Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db,
_partition);
+    instanceStateMap.put(secondMaster, "SLAVE");
+    instanceStateMap.put(thirdMaster, "MASTER");
+    _bestpossibleState.setState(_db, _partition, instanceStateMap);
+
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
+
+    _messagePipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // Scenario 5:
+    // The initial master has forwarded the p2p message to secondMaster and deleted original
M->S message on initialMaster,
+    // But the S->M state-transition has not completed yet in secondMaster.
+    // Validate: Controller should not send S->M to thirdMaster.
+    currentStateOutput.setPendingState(_db, _partition, secondMaster, relayMessage);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState);
+
+    _messagePipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 0);
+
+
+    // Scenario 5:
+    // The thirdMaster completed the state transition and deleted the p2p message.
+    // Validate: Controller should M->S message to secondMaster.
+    currentStateOutput =
+        populateCurrentStateFromBestPossible(_bestpossibleState);
+    currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER");
+    currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE");
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    _messagePipeline.handle(event);
+
+    messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+    messages = messageOutput.getMessages(_db, _partition);
+    Assert.assertEquals(messages.size(), 1);
+
+    toSlaveMessage = messages.get(0);
+    Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster);
+    Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name());
+    Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name());
+
+    // verify p2p message are attached to the M->S message sent to the secondMaster
+    Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1);
+
+    relayMessage = toSlaveMessage.getRelayMessage(thirdMaster);
+    Assert.assertNotNull(relayMessage);
+    Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name());
+    Assert.assertEquals(relayMessage.getTgtName(), thirdMaster);
+    Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster);
+    Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name());
+    Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name());
+  }
+
+
+  private String getTopStateInstance(Map<String, String> instanceStateMap, String topState)
{
+    String masterInstance = null;
+    for (Map.Entry<String, String> e : instanceStateMap.entrySet()) {
+      if (topState.equals(e.getValue())) {
+        masterInstance = e.getKey();
+      }
+    }
+
+    return masterInstance;
+  }
+
+  private CurrentStateOutput populateCurrentStateFromBestPossible(
+      ResourcesStateMap bestPossibleStateOutput) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) {
+      PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource);
+      for (Partition p : partitionStateMap.partitionSet()) {
+        Map<String, String> stateMap = partitionStateMap.getPartitionMap(p);
+
+        for (Map.Entry<String, String> e : stateMap.entrySet()) {
+          currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue());
+        }
+      }
+    }
+    return currentStateOutput;
+  }
+}


Mime
View raw message