helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-682] delete duplicated message and log error in HelixTaskExecutor on participant
Date Tue, 24 Apr 2018 22:40:58 GMT
Repository: helix
Updated Branches:
  refs/heads/master 5c49de3c8 -> 5f9fadc72


[HELIX-682] delete duplicated message and log error in HelixTaskExecutor on participant


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5f9fadc7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5f9fadc7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5f9fadc7

Branch: refs/heads/master
Commit: 5f9fadc72bc1916f008792707db848ee51bbd997
Parents: 5c49de3
Author: Harry Zhang <zhan849@usc.edu>
Authored: Tue Apr 24 15:34:08 2018 -0700
Committer: Harry Zhang <zhan849@usc.edu>
Committed: Tue Apr 24 15:40:17 2018 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   |  9 +++
 .../java/org/apache/helix/MockAccessor.java     |  2 +-
 .../handling/TestHelixTaskExecutor.java         | 63 ++++++++++++++++++--
 3 files changed, 68 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5f9fadc7/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 70a30ec..bb55604 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -864,6 +864,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
         }
         if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
             .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+          String messageTarget =
+              getMessageTarget(message.getResourceName(), message.getPartitionName());
+          if (stateTransitionHandlers.containsKey(messageTarget)) {
+            Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
+            throw new HelixException(String.format(
+                "Duplicated state transition message: %s. Existing: %s -> %s; New (Discarded):
%s -> %s",
+                message.getMsgId(), duplicatedMessage.getFromState(),
+                duplicatedMessage.getToState(), message.getFromState(), message.getToState()));
+          }
           stateTransitionHandlers
               .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
                   createHandler);

http://git-wip-us.apache.org/repos/asf/helix/blob/5f9fadc7/helix-core/src/test/java/org/apache/helix/MockAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
index 8bcfe19..96ab5dc 100644
--- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
@@ -209,7 +209,7 @@ public class MockAccessor implements HelixDataAccessor {
   @Override
   public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey>
keys,
       List<T> children) {
-    throw new HelixException("Method not implemented!");
+    return setChildren(keys, children);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/5f9fadc7/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index f0b50a3..ea2bb20 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -20,13 +20,20 @@ package org.apache.helix.messaging.handling;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import com.google.common.collect.ImmutableList;
 
-import org.apache.helix.*;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.mock.MockManager;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
@@ -34,8 +41,6 @@ import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.ImmutableList;
-
 public class TestHelixTaskExecutor {
   public static class MockClusterManager extends MockManager {
 
@@ -87,8 +92,9 @@ public class TestHelixTaskExecutor {
       return "TestingMessageHandler";
     }
 
-    @Override public List<String> getMessageTypes() {
-      return ImmutableList.of("TestingMessageHandler");
+    @Override
+    public List<String> getMessageTypes() {
+      return Arrays.asList(new String[]{"TestingMessageHandler", Message.MessageType.STATE_TRANSITION.name()});
     }
 
     @Override
@@ -295,6 +301,53 @@ public class TestHelixTaskExecutor {
   }
 
   @Test()
+  public void testDuplicatedMessage() throws InterruptedException {
+    System.out.println("START TestHelixTaskExecutor.testDuplicatedMessage()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    for (String type : factory.getMessageTypes()) {
+      executor.registerMessageHandlerFactory(type, factory);
+    }
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    List<Message> msgList = new ArrayList<Message>();
+
+    int nMsgs = 10;
+    String instanceName = "someInstance";
+    for (int i = 0; i < nMsgs; i++) {
+      Message msg =
+          new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString());
+      msg.setTgtSessionId(manager.getSessionId());
+      msg.setTgtName("Localhost_1123");
+      msg.setSrcName("127.101.1.23_2234");
+      msg.setPartitionName("Partition");
+      msg.setResourceName("Resource");
+      dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg);
+      msgList.add(msg);
+    }
+
+    System.out.println(dataAccessor.getChildNames(keyBuilder.messages(instanceName)));
+    AssertJUnit
+        .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(),
nMsgs);
+
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage(instanceName, msgList, changeContext);
+
+    Thread.sleep(1000);
+
+    // Will not be able to process state transition messages, but we shall verify that
+    // only 1 message is left over
+    AssertJUnit
+        .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(),
1);
+
+    System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()");
+  }
+
+  @Test()
   public void testUnknownTypeMsgExecution() throws InterruptedException {
     HelixTaskExecutor executor = new HelixTaskExecutor();
     HelixManager manager = new MockClusterManager();


Mime
View raw message