From commits-return-6674-apmail-helix-commits-archive=helix.apache.org@helix.apache.org Tue Oct 3 21:44:02 2017 Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8412D10E1E for ; Tue, 3 Oct 2017 21:44:02 +0000 (UTC) Received: (qmail 64538 invoked by uid 500); 3 Oct 2017 21:44:02 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 64486 invoked by uid 500); 3 Oct 2017 21:44:02 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 64473 invoked by uid 99); 3 Oct 2017 21:44:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Oct 2017 21:44:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B83B6F5A90; Tue, 3 Oct 2017 21:44:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jxue@apache.org To: commits@helix.apache.org Date: Tue, 03 Oct 2017 21:44:02 -0000 Message-Id: <5af40ffdd58c4cd29d8f0d1ef2a4442a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] helix git commit: [HELIX-669] State Transition Cancellation Client side change Part II http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index 2f78007..5304a45 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -29,12 +29,15 @@ import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.Mocks; import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; + public class TestHelixTaskExecutor { public static class MockClusterManager extends Mocks.MockManager { @Override @@ -85,6 +88,10 @@ public class TestHelixTaskExecutor { return "TestingMessageHandler"; } + @Override public List getMessageTypes() { + return ImmutableList.of("TestingMessageHandler"); + } + @Override public void reset() { // TODO Auto-generated method stub @@ -99,6 +106,12 @@ public class TestHelixTaskExecutor { return "TestingMessageHandler2"; } + @Override + public List getMessageTypes() { + // TODO Auto-generated method stub + return ImmutableList.of("TestingMessageHandler2"); + } + } class CancellableHandlerFactory implements MessageHandlerFactory { @@ -165,6 +178,10 @@ public class TestHelixTaskExecutor { return "Cancellable"; } + @Override public List getMessageTypes() { + return ImmutableList.of("Cancellable"); + } + @Override public void reset() { // TODO Auto-generated method stub @@ -186,26 +203,36 @@ public class TestHelixTaskExecutor { super(message, context); } - @Override public HelixTaskResult handleMessage() throws InterruptedException { + @Override + public HelixTaskResult handleMessage() throws InterruptedException { HelixTaskResult result = new HelixTaskResult(); _processedMsgIds.put(_message.getMsgId(), _message.getMsgId()); result.setSuccess(true); return result; } - @Override public void onError(Exception e, ErrorCode code, ErrorType type) { + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { } } - @Override public MessageHandler createHandler(Message message, NotificationContext context) { + @Override + public MessageHandler createHandler(Message message, NotificationContext context) { return new TestStateTransitionMessageHandler(message, context); } - @Override public String getMessageType() { + @Override + public String getMessageType() { return _msgType; } - @Override public void reset() { + @Override + public List getMessageTypes() { + return ImmutableList.of(_msgType); + } + + @Override + public void reset() { } } @@ -217,17 +244,21 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2(); - executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2); + for (String type : factory2.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory2); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); int nMsgs1 = 5; for (int i = 0; i < nMsgs1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -237,7 +268,7 @@ public class TestHelixTaskExecutor { int nMsgs2 = 6; for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -269,7 +300,9 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2(); @@ -278,7 +311,7 @@ public class TestHelixTaskExecutor { int nMsgs1 = 5; for (int i = 0; i < nMsgs1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -287,7 +320,7 @@ public class TestHelixTaskExecutor { int nMsgs2 = 4; for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -303,7 +336,7 @@ public class TestHelixTaskExecutor { AssertJUnit.assertTrue(factory2._handlersCreated == 0); for (Message message : msgList) { - if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) { + if (factory.getMessageTypes().contains(message.getMsgType())) { AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId())); } } @@ -315,17 +348,21 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2(); - executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2); + for (String type : factory2.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory2); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); int nMsgs1 = 5; for (int i = 0; i < nMsgs1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msg.setTgtName(""); msgList.add(msg); @@ -333,7 +370,7 @@ public class TestHelixTaskExecutor { int nMsgs2 = 4; for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("some other session id"); msg.setTgtName(""); msgList.add(msg); @@ -348,7 +385,7 @@ public class TestHelixTaskExecutor { AssertJUnit.assertTrue(factory2._handlersCreated == 0); for (Message message : msgList) { - if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) { + if (factory.getMessageTypes().contains(message.getMsgType())) { AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId())); } } @@ -361,21 +398,23 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); int nMsgs1 = 5; for (int i = 0; i < nMsgs1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); msg.setCorrelationId(UUID.randomUUID().toString()); msgList.add(msg); } - Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message exceptionMsg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); exceptionMsg.setTgtSessionId(manager.getSessionId()); exceptionMsg.setMsgSubType("EXCEPTION"); exceptionMsg.setTgtName("Localhost_1123"); @@ -400,14 +439,15 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); CancellableHandlerFactory factory = new CancellableHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); - + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); int nMsgs1 = 0; for (int i = 0; i < nMsgs1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -417,7 +457,7 @@ public class TestHelixTaskExecutor { List msgListToCancel = new ArrayList(); int nMsgs2 = 4; for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msgList.add(msg); msg.setTgtName("Localhost_1123"); @@ -439,7 +479,7 @@ public class TestHelixTaskExecutor { AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2); for (Message message : msgList) { - if (message.getMsgType().equalsIgnoreCase(factory.getMessageType())) { + if (factory.getMessageTypes().contains(message.getMsgType())) { AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId())); } } @@ -452,18 +492,23 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); - + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2(); - executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2); - + for (String type : factory2.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory2); + } CancellableHandlerFactory factory3 = new CancellableHandlerFactory(); - executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3); + for (String type : factory3.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory3); + } + int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10; List msgList = new ArrayList(); for (int i = 0; i < nMsg1; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -471,7 +516,7 @@ public class TestHelixTaskExecutor { } for (int i = 0; i < nMsg2; i++) { - Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory2.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msgList.add(msg); msg.setTgtName("Localhost_1123"); @@ -480,7 +525,7 @@ public class TestHelixTaskExecutor { } for (int i = 0; i < nMsg3; i++) { - Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory3.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msgList.add(msg); msg.setTgtName("Localhost_1123"); @@ -509,15 +554,16 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); CancellableHandlerFactory factory = new CancellableHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); - + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); int nMsgs2 = 4; // Test the case in which retry = 0 for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -532,7 +578,7 @@ public class TestHelixTaskExecutor { AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2); // AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut")); for (int i = 0; i < nMsgs2 - 2; i++) { - if (msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType())) { + if (factory.getMessageTypes().contains(msgList.get(i).getMsgType())) { AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields() .containsKey("Cancelcount")); AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId())); @@ -550,8 +596,9 @@ public class TestHelixTaskExecutor { HelixManager manager = new MockClusterManager(); CancellableHandlerFactory factory = new CancellableHandlerFactory(); - executor.registerMessageHandlerFactory(factory.getMessageType(), factory); - + for (String type : factory.getMessageTypes()) { + executor.registerMessageHandlerFactory(type, factory); + } NotificationContext changeContext = new NotificationContext(manager); List msgList = new ArrayList(); @@ -561,7 +608,7 @@ public class TestHelixTaskExecutor { // Test the case that the message are executed for the second time int nMsgs2 = 4; for (int i = 0; i < nMsgs2; i++) { - Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString()); + Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString()); msg.setTgtSessionId("*"); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); @@ -615,6 +662,5 @@ public class TestHelixTaskExecutor { Thread.sleep(3000); AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0); AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/b9de8362/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java index f1848a5..a4fcec6 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockDelayMSStateModel.java @@ -1,5 +1,10 @@ package org.apache.helix.mock.participant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -31,15 +36,15 @@ import org.apache.log4j.Logger; }) public class MockDelayMSStateModel extends StateModel { private static Logger LOG = Logger.getLogger(MockDelayMSStateModel.class); - private long _delay; public MockDelayMSStateModel(long delay) { _delay = delay; + _cancelled = false; } @Transition(to = "SLAVE", from = "OFFLINE") - public void onBecomeSLAVEFromOffline(Message message, NotificationContext context) { + public void onBecomeSlaveFromOffline(Message message, NotificationContext context) { if (_delay > 0) { try { Thread.sleep(_delay); @@ -50,10 +55,17 @@ public class MockDelayMSStateModel extends StateModel { LOG.info("Become SLAVE from OFFLINE"); } - @Transition(to = "ONLINE", from = "SLAVE") - public void onBecomeMasterFromSlave(Message message, NotificationContext context) { - LOG.info("Become ONLINE from SLAVE"); + @Transition(to = "MASTER", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) + throws InterruptedException { + if (_delay < 0) { + Thread.sleep(Math.abs(_delay)); + } + LOG.error("Become MASTER from SLAVE"); } - + @Transition(to = "OFFLINE", from = "SLAVE") + public void onBecomeOfflineFromSlave(Message message, NotificationContext context) { + LOG.info("Become OFFLINE from SLAVE"); + } }