helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/7] [HELIX-376] Remove HelixConnection/HelixManager duplicate code
Date Mon, 04 Aug 2014 20:19:18 GMT
Repository: helix
Updated Branches:
  refs/heads/master 8a3705714 -> 593918880


http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
index 54f81cd..c441381 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
@@ -106,19 +106,6 @@ public class TestZKLiveInstanceData extends ZkTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    ZkClient zkClient = null;
-    try {
-      zkClient = new ZkClient(_zkaddr);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
-      if (zkClient.exists("/" + clusterName)) {
-        zkClient.deleteRecursive("/" + clusterName);
-      }
-    } finally {
-      if (zkClient != null) {
-        zkClient.close();
-      }
-    }
-
     ClusterSetup.processCommandLineArgs(getArgs("-zkSvr", _zkaddr, "-addCluster", clusterName));
     ClusterSetup.processCommandLineArgs(getArgs("-zkSvr", _zkaddr, "-addNode", clusterName,
         "localhost:54321"));

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 0a770d0..0a325d6 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -27,12 +27,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.AccessOption;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -59,16 +56,9 @@ public class TestZkClusterManager extends ZkTestBase {
   public void testController() throws Exception {
     final String clusterName = TestUtil.getTestName();
 
-    System.out.println("START " + clusterName + ".testController() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    // basic test
-    if (_zkclient.exists("/" + clusterName)) {
-      _zkclient.deleteRecursive("/" + clusterName);
-    }
-
-    ZKHelixManager controller =
-        new ZKHelixManager(clusterName, null, InstanceType.CONTROLLER, _zkaddr);
+    MockController controller = new MockController(_zkaddr, clusterName, "controller");
 
     try {
       controller.connect();
@@ -110,19 +100,17 @@ public class TestZkClusterManager extends ZkTestBase {
     controller.getMessagingService();
     controller.getClusterManagmentTool();
 
-    controller.handleNewSession();
+    controller.getConn().handleNewSession();
     controller.disconnect();
     AssertJUnit.assertFalse(controller.isConnected());
 
-    System.out.println("END " + clusterName + ".testController() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   @Test
   public void testLiveInstanceInfoProvider() throws Exception {
     final String clusterName = TestUtil.getTestName();
-    System.out.println("START " + clusterName + ".testLiveInstanceInfoProvider() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
     class provider implements LiveInstanceInfoProvider {
       boolean _flag = false;
 
@@ -197,8 +185,7 @@ public class TestZkClusterManager extends ZkTestBase {
 
     // //////////////////////////////////
 
-    MockParticipantManager manager2 =
-        new MockParticipantManager(_zkaddr, clusterName, "localhost_3");
+    MockParticipant manager2 = new MockParticipant(_zkaddr, clusterName, "localhost_3");
 
     manager2.setLiveInstanceInfoProvider(new provider(true));
 
@@ -224,20 +211,13 @@ public class TestZkClusterManager extends ZkTestBase {
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     Assert.assertFalse(sessionId.equals(liveInstance.getTypedSessionId().stringify()));
 
-    System.out.println("END " + clusterName + ".testLiveInstanceInfoProvider() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   @Test()
   public void testAdministrator() throws Exception {
     final String clusterName = TestUtil.getTestName();
-    System.out.println("START " + clusterName + ".testAdministrator() at "
-        + new Date(System.currentTimeMillis()));
-
-    // basic test
-    if (_zkclient.exists("/" + clusterName)) {
-      _zkclient.deleteRecursive("/" + clusterName);
-    }
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     ZKHelixManager admin =
         new ZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR, _zkaddr);
@@ -265,8 +245,7 @@ public class TestZkClusterManager extends ZkTestBase {
     admin.disconnect();
     AssertJUnit.assertFalse(admin.isConnected());
 
-    System.out.println("END " + clusterName + ".testAdministrator() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   private void setupInstances(String clusterName, int[] instances) {

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index 6a5f002..89eb323 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -31,8 +31,6 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.TestHelper.Verifier;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -153,8 +151,8 @@ public class TestZkFlapping extends ZkTestBase {
         "MasterSlave", false);
 
     final String instanceName = "localhost_12918";
-    MockParticipantManager participant =
-        new MockParticipantManager(_zkaddr, clusterName, instanceName);
+    MockParticipant participant =
+        new MockParticipant(_zkaddr, clusterName, instanceName);
     participant.syncStart();
 
     final ZkClient client = participant.getZkClient();
@@ -232,8 +230,8 @@ public class TestZkFlapping extends ZkTestBase {
         1, // replicas
         "MasterSlave", false);
 
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
     final ZkClient client = controller.getZkClient();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
index 96952d0..a34dc66 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -21,7 +21,7 @@ package org.apache.helix.manager.zk;
 
 import java.util.Date;
 
-import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixMultiClusterController;
 import org.apache.helix.HelixConnection;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -59,7 +59,7 @@ public class TestZkHelixAutoController extends ZkTestBase {
 
     // start auto-controller
     ClusterId clusterId = ClusterId.from(clusterName);
-    final HelixAutoController[] controllers = new HelixAutoController[n];
+    final HelixMultiClusterController[] controllers = new HelixMultiClusterController[n];
     for (int i = 0; i < n; i++) {
       int port = 12918 + i;
       ControllerId controllerId = ControllerId.from("localhost_" + port);

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
index 86aa6e3..63bf1c5 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
@@ -21,8 +21,6 @@ package org.apache.helix.manager.zk;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.testutil.ZkTestBase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -44,7 +42,7 @@ public class TestZkManagerFlappingDetection extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     String instanceName = "localhost_" + (12918 + 0);
-    MockParticipantManager manager = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+    MockParticipant manager = new MockParticipant(_zkaddr, clusterName, instanceName);
 
     manager.connect();
     ZkClient zkClient = manager.getZkClient();
@@ -139,7 +137,7 @@ public class TestZkManagerFlappingDetection extends ZkTestBase {
     // flapping time window to 5 sec
     System.setProperty("helixmanager.flappingTimeWindow", "5000");
     System.setProperty("helixmanager.maxDisconnectThreshold", "3");
-    ClusterControllerManager manager2 = new ClusterControllerManager(_zkaddr, clusterName, null);
+    MockController manager2 = new MockController(_zkaddr, clusterName, null);
     manager2.connect();
     Thread.sleep(100);
     ZkClient zkClient = manager2.getZkClient();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index a9c028c..c1972eb 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -20,7 +20,6 @@ package org.apache.helix.manager.zk;
  */
 
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.testng.Assert;
 
 public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
@@ -36,7 +35,7 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
     // 11 disconnects in 5 sec
     for (int i = 0; i < 11; i++) {
       Thread.sleep(200);
-      _controller.handleStateChanged(KeeperState.Disconnected);
+      // _controller.handleStateChanged(KeeperState.Disconnected);
       if (i < 10) {
         Assert.assertTrue(_controller.isConnected());
       } else {
@@ -51,7 +50,7 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
     // ZkStateChangeListener listener2 = new ZkStateChangeListener(_participants[0], 5000, 0);
     for (int i = 0; i < 2; i++) {
       Thread.sleep(200);
-      _participants[0].handleStateChanged(KeeperState.Disconnected);
+      // _participants[0].handleStateChanged(KeeperState.Disconnected);
       if (i < 1) {
         Assert.assertTrue(_participants[0].isConnected());
       } else {
@@ -66,22 +65,22 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
     // ZkStateChangeListener listener3 = new ZkStateChangeListener(_participants[1], 5000, 5);
     for (int i = 0; i < 3; i++) {
       Thread.sleep(200);
-      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      // _participants[1].handleStateChanged(KeeperState.Disconnected);
       Assert.assertTrue(_participants[1].isConnected());
     }
     Thread.sleep(5000);
     // Old entries should be cleaned up
     for (int i = 0; i < 3; i++) {
       Thread.sleep(200);
-      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      // _participants[1].handleStateChanged(KeeperState.Disconnected);
       Assert.assertTrue(_participants[1].isConnected());
     }
     for (int i = 0; i < 2; i++) {
       Thread.sleep(200);
-      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      // _participants[1].handleStateChanged(KeeperState.Disconnected);
       Assert.assertTrue(_participants[1].isConnected());
     }
-    _participants[1].handleStateChanged(KeeperState.Disconnected);
+    // _participants[1].handleStateChanged(KeeperState.Disconnected);
     Assert.assertFalse(_participants[1].isConnected());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java b/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
new file mode 100644
index 0000000..c2c4d42
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
@@ -0,0 +1,34 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Helper class for testing ZkHelixConnection
+ */
+public class ZkConnTestHelper extends ZkHelixConnection {
+
+  public ZkConnTestHelper(String zkAddr) {
+    super(zkAddr);
+  }
+
+  public ZkClient getZkClient() {
+    return _zkclient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 80a46fa..a1c55d5 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -27,9 +27,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -80,12 +81,14 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
     HelixManager manager = _participants[0];
 
     ConfigAccessor accessor = manager.getConfigAccessor();
-    ConfigScope scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName)
-            .build();
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
+            .forCluster(manager.getClusterName()).forParticipant(instanceName).build();
     accessor.set(scope, "TestMsg." + HelixTaskExecutor.MAX_THREADS, "" + 12);
 
-    scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
+    scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
+            manager.getClusterName()).build();
     accessor.set(scope, "TestMsg." + HelixTaskExecutor.MAX_THREADS, "" + 8);
 
     for (int i = 0; i < NODE_NR; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 97a56be..694af9c 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -25,9 +25,10 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -37,8 +38,9 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
   public void TestThreadPoolSizeConfig() {
     HelixManager manager = _participants[0];
     ConfigAccessor accessor = manager.getConfigAccessor();
-    ConfigScope scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+            .forCluster(manager.getClusterName()).forResource("NextDB").build();
     accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + 12);
 
     _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
deleted file mode 100644
index 4ddaac4..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
+++ /dev/null
@@ -1,140 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState.IdealStateProperty;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.util.HelixUtil;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class MockController {
-  private final ZkClient client;
-  private final String srcName;
-  private final String clusterName;
-
-  public MockController(String src, String zkServer, String cluster) {
-    srcName = src;
-    clusterName = cluster;
-    client = new ZkClient(zkServer);
-    client.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  void sendMessage(MessageId msgId, String instanceName, String fromState, String toState,
-      String partitionKey, int partitionId) throws InterruptedException, JsonGenerationException,
-      JsonMappingException, IOException {
-    Message message = new Message(MessageType.STATE_TRANSITION, msgId);
-    message.setMessageId(msgId);
-    message.setSrcName(srcName);
-    message.setTgtName(instanceName);
-    message.setMsgState(MessageState.NEW);
-    message.setFromState(State.from(fromState));
-    message.setToState(State.from(toState));
-    // message.setPartitionId(partitionId);
-    message.setPartitionId(PartitionId.from(partitionKey));
-
-    String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/" + message.getId();
-    ObjectMapper mapper = new ObjectMapper();
-    StringWriter sw = new StringWriter();
-    mapper.writeValueUsingView(sw, message, Message.class);
-    System.out.println(sw.toString());
-    client.delete(path);
-
-    Thread.sleep(10000);
-    ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName, instanceName));
-    message.setTgtSessionId(SessionId.from(record.getSimpleField(
-        LiveInstanceProperty.SESSION_ID.toString()).toString()));
-    client.createPersistent(path, message);
-  }
-
-  public void createExternalView(List<String> instanceNames, int partitions, int replicas,
-      String dbName, long randomSeed) {
-    ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
-    Builder keyBuilder = accessor.keyBuilder();
-
-    ExternalView externalView =
-        new ExternalView(computeRoutingTable(instanceNames, partitions, replicas, dbName,
-            randomSeed));
-
-    accessor.setProperty(keyBuilder.externalView(dbName), externalView);
-  }
-
-  public ZNRecord computeRoutingTable(List<String> instanceNames, int partitions, int replicas,
-      String dbName, long randomSeed) {
-    assert (instanceNames.size() > replicas);
-    Collections.sort(instanceNames);
-
-    ZNRecord result = new ZNRecord(dbName);
-
-    Map<String, Object> externalView = new TreeMap<String, Object>();
-
-    List<Integer> partitionList = new ArrayList<Integer>(partitions);
-    for (int i = 0; i < partitions; i++) {
-      partitionList.add(new Integer(i));
-    }
-    Random rand = new Random(randomSeed);
-    // Shuffle the partition list
-    Collections.shuffle(partitionList, rand);
-
-    for (int i = 0; i < partitionList.size(); i++) {
-      int partitionId = partitionList.get(i);
-      Map<String, String> partitionAssignment = new TreeMap<String, String>();
-      int masterNode = i % instanceNames.size();
-      // the first in the list is the node that contains the master
-      partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
-
-      // for the jth replica, we put it on (masterNode + j) % nodes-th
-      // node
-      for (int j = 1; j <= replicas; j++) {
-        partitionAssignment
-            .put(instanceNames.get((masterNode + j) % instanceNames.size()), "SLAVE");
-      }
-      String partitionName = dbName + ".partition-" + partitionId;
-      result.setMapField(partitionName, partitionAssignment);
-    }
-    result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
deleted file mode 100644
index 193abd3..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.helix.api.id.MessageId;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
-public class MockControllerProcess {
-
-  /**
-   * @param args
-   * @throws IOException
-   * @throws JsonMappingException
-   * @throws JsonGenerationException
-   * @throws InterruptedException
-   */
-  public static void main(String[] args) throws JsonGenerationException, JsonMappingException,
-      InterruptedException, IOException {
-
-    MockController storageController =
-        new MockController("cm-instance-0", "localhost:2181", "storage-cluster");
-    MockController relayController =
-        new MockController("cm-instance-0", "localhost:2181", "relay-cluster");
-
-    ArrayList<String> instanceNames = new ArrayList<String>();
-    instanceNames.add("relay0");
-    instanceNames.add("relay1");
-    instanceNames.add("relay2");
-    instanceNames.add("relay3");
-    instanceNames.add("relay4");
-
-    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
-
-    // Messages to initiate offline->slave->master->slave transitions
-
-    storageController.sendMessage(MessageId.from("TestMessageId1"), "localhost_8900", "Offline",
-        "Slave", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-    storageController.sendMessage(MessageId.from("TestMessageId2"), "localhost_8900", "Slave",
-        "Master", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-    storageController.sendMessage(MessageId.from("TestMessageId3"), "localhost_8900", "Master",
-        "Slave", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-
-    // Change the external view to trigger the consumer to listen from
-    // another relay
-    relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
-
-    storageController.sendMessage(MessageId.from("TestMessageId4"), "localhost_8900", "Slave",
-        "Offline", "EspressoDB.partition-0", 0);
-    Thread.sleep(10000);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
index 4d5dd95..6f78427 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
@@ -28,13 +28,11 @@ import java.util.TreeMap;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.State;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.testutil.HelixTestUtil;
 import org.apache.helix.testutil.TestUtil;

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index c31b641..2b8d5e5 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -29,12 +29,11 @@ import javax.management.MalformedObjectNameException;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterDistributedController;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.log4j.Logger;
@@ -46,8 +45,8 @@ import org.testng.annotations.Test;
 public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
   private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
 
-  MockParticipantManager[] _participants;
-  ClusterDistributedController[] _controllers;
+  MockParticipant[] _participants;
+  MockMultiClusterController[] _controllers;
   String _controllerClusterName;
   String _clusterNamePrefix;
   String _firstClusterName;
@@ -92,10 +91,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         "LeaderStandby", true); // do rebalance
 
     // start distributed cluster controllers
-    _controllers = new ClusterDistributedController[n + n];
+    _controllers = new MockMultiClusterController[n + n];
     for (int i = 0; i < n; i++) {
       _controllers[i] =
-          new ClusterDistributedController(_zkaddr, _controllerClusterName, "controller_" + i);
+          new MockMultiClusterController(_zkaddr, _controllerClusterName, "controller_" + i);
       _controllers[i].syncStart();
     }
 
@@ -106,11 +105,11 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // start first cluster
-    _participants = new MockParticipantManager[n];
+    _participants = new MockParticipant[n];
     _firstClusterName = _clusterNamePrefix + "0_0";
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost0_" + (12918 + i);
-      _participants[i] = new MockParticipantManager(_zkaddr, _firstClusterName, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, _firstClusterName, instanceName);
       _participants[i].syncStart();
     }
 
@@ -127,7 +126,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     _setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6);
     for (int i = n; i < 2 * n; i++) {
       _controllers[i] =
-          new ClusterDistributedController(_zkaddr, _controllerClusterName, "controller_" + i);
+          new MockMultiClusterController(_zkaddr, _controllerClusterName, "controller_" + i);
       _controllers[i].syncStart();
     }
 
@@ -206,8 +205,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     String firstControllerName =
         accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId();
 
-    ClusterDistributedController firstController = null;
-    for (ClusterDistributedController controller : _controllers) {
+    MockMultiClusterController firstController = null;
+    for (MockMultiClusterController controller : _controllers) {
       if (controller.getInstanceName().equals(firstControllerName)) {
         firstController = controller;
       }
@@ -222,7 +221,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 12);
 
     String instanceName = "localhost0_" + (12918 + 0);
-    _participants[0] = new MockParticipantManager(_zkaddr, _firstClusterName, instanceName);
+    _participants[0] = new MockParticipant(_zkaddr, _firstClusterName, instanceName);
     _participants[0].syncStart();
 
     // 1 participant comes back

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
index 17e1837..b8a97cc 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
@@ -31,8 +31,8 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterSetup;
@@ -71,14 +71,14 @@ public class TestDropResourceMetricsReset extends ZkTestBase {
 
     // Start participants and controller
     ClusterSetup setupTool = new ClusterSetup(_zkclient);
-    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    MockParticipant[] participants = new MockParticipant[NUM_PARTICIPANTS];
     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
       participants[i] =
-          new MockParticipantManager(_zkaddr, clusterName, "localhost_" + (12918 + i));
+          new MockParticipant(_zkaddr, clusterName, "localhost_" + (12918 + i));
       participants[i].syncStart();
     }
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // Verify that the bean was created
@@ -95,7 +95,7 @@ public class TestDropResourceMetricsReset extends ZkTestBase {
     // Clean up
     listener.disconnect();
     controller.syncStop();
-    for (MockParticipantManager participant : participants) {
+    for (MockParticipant participant : participants) {
       participant.syncStop();
     }
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
index 5497138..c382f55 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
@@ -27,8 +27,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -50,13 +50,13 @@ public class TestResetClusterMetrics extends ZkTestBase {
         "OnlineOffline", RebalanceMode.FULL_AUTO, true);
 
     // Add a participant
-    MockParticipantManager participant =
-        new MockParticipantManager(_zkaddr, clusterName, "localhost_12918");
+    MockParticipant participant =
+        new MockParticipant(_zkaddr, clusterName, "localhost_12918");
     participant.syncStart();
 
     // Add a controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // Make sure everything gets assigned

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 11cdd34..efa30da 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
@@ -93,12 +92,6 @@ public class MockZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
   public void addMessageListener(MessageListener listener, String instanceName) throws Exception {
     // TODO Auto-generated method stub
 

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
index 3e5e068..69f676e 100644
--- a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
@@ -26,7 +26,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.testutil.ZkTestBase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -52,11 +52,11 @@ public class TestZkManagerWithAutoFallbackStore extends ZkTestBase {
         "MasterSlave", false); // do rebalance
 
     // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < 1; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java b/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
index bc62fca..6ff2b90 100644
--- a/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
@@ -27,7 +27,7 @@ import org.apache.log4j.Logger;
 public class TestUtil {
   static Logger logger = Logger.getLogger(TestUtil.class);
 
-  public static boolean isTestNGAnnotated(Class clazz, String methodName) {
+  public static boolean isTestNGAnnotated(Class<? extends Object> clazz, String methodName) {
     final String annotationsPkgName = "org.testng.annotations";
 
     // Check if the class itself is annotated.
@@ -61,7 +61,7 @@ public class TestUtil {
 
       // The first 2 elements in the stack are getStackTrace and this method itself, so ignore them.
       for (int i = 2; i < stackTrace.length; i++) {
-        Class clazz = Class.forName(stackTrace[i].getClassName());
+        Class<? extends Object> clazz = Class.forName(stackTrace[i].getClassName());
         if (isTestNGAnnotated(clazz, stackTrace[i].getMethodName())) {
           String testName = String.format("%s_%s", clazz.getSimpleName(), stackTrace[i].getMethodName());
           logger.debug("Detected " + testName + " as the test name");

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
index ec43664..e648a14 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
@@ -23,8 +23,8 @@ import java.util.Arrays;
 
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.testutil.ZkTestBase;
@@ -41,8 +41,8 @@ public class TestClusterStateVerifier extends ZkTestBase {
       "resource0", "resource1"
   };
   private HelixAdmin _admin;
-  private MockParticipantManager[] _participants;
-  private ClusterControllerManager _controller;
+  private MockParticipant[] _participants;
+  private MockController _controller;
   private String _clusterName;
 
   @BeforeMethod
@@ -62,13 +62,13 @@ public class TestClusterStateVerifier extends ZkTestBase {
         RebalanceMode.SEMI_AUTO.toString());
 
     // Configure and start the participants
-    _participants = new MockParticipantManager[RESOURCES.length];
+    _participants = new MockParticipant[RESOURCES.length];
     for (int i = 0; i < _participants.length; i++) {
       String host = "localhost";
       int port = 12918 + i;
       String id = host + '_' + port;
       _setupTool.addInstanceToCluster(_clusterName, id);
-      _participants[i] = new MockParticipantManager(_zkaddr, _clusterName, id);
+      _participants[i] = new MockParticipant(_zkaddr, _clusterName, id);
       _participants[i].syncStart();
     }
 
@@ -82,7 +82,7 @@ public class TestClusterStateVerifier extends ZkTestBase {
     }
 
     // Start the controller
-    _controller = new ClusterControllerManager(_zkaddr, _clusterName, "controller_0");
+    _controller = new MockController(_zkaddr, _clusterName, "controller_0");
     _controller.syncStart();
     Thread.sleep(1000);
   }
@@ -91,7 +91,7 @@ public class TestClusterStateVerifier extends ZkTestBase {
   public void afterMethod() {
     // Cleanup
     _controller.syncStop();
-    for (MockParticipantManager participant : _participants) {
+    for (MockParticipant participant : _participants) {
       participant.syncStop();
     }
     _admin.dropCluster(_clusterName);

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index 6d27dcb..9e95b51 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -31,8 +31,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.integration.manager.ClusterDistributedController;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -239,8 +239,8 @@ public class TestHelixAdminCli extends ZkTestBase {
     final int n = 6;
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+    MockParticipant[] participants = new MockParticipant[n];
+    MockMultiClusterController[] controllers = new MockMultiClusterController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
 
     // activate clusters
@@ -310,7 +310,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     Assert.assertTrue(verifyResult);
 
     // clean up
-    for (ClusterDistributedController controller : controllers) {
+    for (MockMultiClusterController controller : controllers) {
       controller.syncStop();
     }
     for (int i = 0; i < participants.length; i++) {
@@ -330,8 +330,8 @@ public class TestHelixAdminCli extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+    MockParticipant[] participants = new MockParticipant[n];
+    MockMultiClusterController[] controllers = new MockMultiClusterController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
     String command =
         "-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -387,7 +387,7 @@ public class TestHelixAdminCli extends ZkTestBase {
   }
 
   private void setupCluster(String clusterName, String grandClusterName, final int n,
-      MockParticipantManager[] participants, ClusterDistributedController[] controllers)
+      MockParticipant[] participants, MockMultiClusterController[] controllers)
       throws Exception, InterruptedException {
     // add cluster
     String command = "-zkSvr " + _zkaddr + " -addCluster " + clusterName;
@@ -419,14 +419,14 @@ public class TestHelixAdminCli extends ZkTestBase {
     // start mock nodes
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_123" + i;
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
     // start controller nodes
     for (int i = 0; i < 2; i++) {
       controllers[i] =
-          new ClusterDistributedController(_zkaddr, grandClusterName, "controller_900" + i);
+          new MockMultiClusterController(_zkaddr, grandClusterName, "controller_900" + i);
       controllers[i].syncStart();
     }
 
@@ -443,8 +443,8 @@ public class TestHelixAdminCli extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+    MockParticipant[] participants = new MockParticipant[n];
+    MockMultiClusterController[] controllers = new MockMultiClusterController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
     String command =
         "-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -522,8 +522,8 @@ public class TestHelixAdminCli extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+    MockParticipant[] participants = new MockParticipant[n];
+    MockMultiClusterController[] controllers = new MockMultiClusterController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
     String command =
         "-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -538,10 +538,10 @@ public class TestHelixAdminCli extends ZkTestBase {
     command = "-zkSvr localhost:2183 -expandCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
 
-    MockParticipantManager[] newParticipants = new MockParticipantManager[4];
+    MockParticipant[] newParticipants = new MockParticipant[4];
     for (int i = 3; i <= 6; i++) {
       String instanceName = "localhost_123" + i + "1";
-      newParticipants[i - 3] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      newParticipants[i - 3] = new MockParticipant(_zkaddr, clusterName, instanceName);
       newParticipants[i - 3].syncStart();
     }
 
@@ -579,8 +579,8 @@ public class TestHelixAdminCli extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
-    ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+    MockParticipant[] participants = new MockParticipant[n];
+    MockMultiClusterController[] controllers = new MockMultiClusterController[2];
     setupCluster(clusterName, grandClusterName, n, participants, controllers);
     String command =
         "-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 4064e10..9d56cdf 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -49,7 +49,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.provisioning.ApplicationSpec;
@@ -190,7 +190,7 @@ public class AppMasterLauncher {
               readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
                   appAttemptID.getApplicationId());
           Workflow workflow = Workflow.parse(is);
-          TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+          TaskDriver taskDriver = new TaskDriver(new ZKHelixManager(controller));
           taskDriver.start(workflow);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
index e588ea8..59a2a92 100644
--- a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
@@ -29,6 +29,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRole;
 import org.apache.helix.InstanceType;
@@ -38,7 +39,7 @@ import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.config.ContainerConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.Id;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.provisioning.ApplicationSpec;
 import org.apache.helix.provisioning.ApplicationSpecFactory;
@@ -82,7 +83,7 @@ public class JobRunnerMain {
     final HelixConnection connection = launcher.pollForConnection();
     final ClusterId clusterId = ClusterId.from(appSpec.getAppName());
     // TODO: this is a hack -- TaskDriver should accept a connection instead of a manager
-    HelixManager manager = new HelixConnectionAdaptor(new HelixRole() {
+    HelixManager manager = new ZKHelixManager(new HelixRole() {
       @Override
       public HelixConnection getConnection() {
         return connection;
@@ -107,6 +108,11 @@ public class JobRunnerMain {
       public ClusterMessagingService getMessagingService() {
         return null;
       }
+
+      @Override
+      public HelixDataAccessor getAccessor() {
+        return null;
+      }
     });
 
     // Get all submitted jobs

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
index 7c50e53..d77a71e 100644
--- a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.participant.AbstractParticipantService;
 import org.apache.helix.provisioning.ServiceConfig;
 import org.apache.helix.provisioning.participant.StatelessParticipantService;
@@ -56,7 +56,7 @@ public class MyTaskService extends StatelessParticipantService {
     LOG.info("Initialized service with config " + serviceConfig);
 
     // Register for callbacks for tasks
-    HelixManager manager = new HelixConnectionAdaptor(getParticipant());
+    HelixManager manager = new ZKHelixManager(getParticipant());
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put("RunTask", new TaskFactory() {
       @Override


Mime
View raw message