Repository: helix
Updated Branches:
refs/heads/master 8a3705714 -> 593918880
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
index 54f81cd..c441381 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
@@ -106,19 +106,6 @@ public class TestZKLiveInstanceData extends ZkTestBase {
@BeforeClass()
public void beforeClass() throws Exception {
- ZkClient zkClient = null;
- try {
- zkClient = new ZkClient(_zkaddr);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- if (zkClient.exists("/" + clusterName)) {
- zkClient.deleteRecursive("/" + clusterName);
- }
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
- }
-
ClusterSetup.processCommandLineArgs(getArgs("-zkSvr", _zkaddr, "-addCluster", clusterName));
ClusterSetup.processCommandLineArgs(getArgs("-zkSvr", _zkaddr, "-addNode", clusterName,
"localhost:54321"));
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 0a770d0..0a325d6 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -27,12 +27,9 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.AccessOption;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -59,16 +56,9 @@ public class TestZkClusterManager extends ZkTestBase {
public void testController() throws Exception {
final String clusterName = TestUtil.getTestName();
- System.out.println("START " + clusterName + ".testController() at "
- + new Date(System.currentTimeMillis()));
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- // basic test
- if (_zkclient.exists("/" + clusterName)) {
- _zkclient.deleteRecursive("/" + clusterName);
- }
-
- ZKHelixManager controller =
- new ZKHelixManager(clusterName, null, InstanceType.CONTROLLER, _zkaddr);
+ MockController controller = new MockController(_zkaddr, clusterName, "controller");
try {
controller.connect();
@@ -110,19 +100,17 @@ public class TestZkClusterManager extends ZkTestBase {
controller.getMessagingService();
controller.getClusterManagmentTool();
- controller.handleNewSession();
+ controller.getConn().handleNewSession();
controller.disconnect();
AssertJUnit.assertFalse(controller.isConnected());
- System.out.println("END " + clusterName + ".testController() at "
- + new Date(System.currentTimeMillis()));
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test
public void testLiveInstanceInfoProvider() throws Exception {
final String clusterName = TestUtil.getTestName();
- System.out.println("START " + clusterName + ".testLiveInstanceInfoProvider() at "
- + new Date(System.currentTimeMillis()));
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
class provider implements LiveInstanceInfoProvider {
boolean _flag = false;
@@ -197,8 +185,7 @@ public class TestZkClusterManager extends ZkTestBase {
// //////////////////////////////////
- MockParticipantManager manager2 =
- new MockParticipantManager(_zkaddr, clusterName, "localhost_3");
+ MockParticipant manager2 = new MockParticipant(_zkaddr, clusterName, "localhost_3");
manager2.setLiveInstanceInfoProvider(new provider(true));
@@ -224,20 +211,13 @@ public class TestZkClusterManager extends ZkTestBase {
Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
Assert.assertFalse(sessionId.equals(liveInstance.getTypedSessionId().stringify()));
- System.out.println("END " + clusterName + ".testLiveInstanceInfoProvider() at "
- + new Date(System.currentTimeMillis()));
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test()
public void testAdministrator() throws Exception {
final String clusterName = TestUtil.getTestName();
- System.out.println("START " + clusterName + ".testAdministrator() at "
- + new Date(System.currentTimeMillis()));
-
- // basic test
- if (_zkclient.exists("/" + clusterName)) {
- _zkclient.deleteRecursive("/" + clusterName);
- }
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
ZKHelixManager admin =
new ZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR, _zkaddr);
@@ -265,8 +245,7 @@ public class TestZkClusterManager extends ZkTestBase {
admin.disconnect();
AssertJUnit.assertFalse(admin.isConnected());
- System.out.println("END " + clusterName + ".testAdministrator() at "
- + new Date(System.currentTimeMillis()));
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
private void setupInstances(String clusterName, int[] instances) {
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index 6a5f002..89eb323 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -31,8 +31,6 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.TestHelper.Verifier;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -153,8 +151,8 @@ public class TestZkFlapping extends ZkTestBase {
"MasterSlave", false);
final String instanceName = "localhost_12918";
- MockParticipantManager participant =
- new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ MockParticipant participant =
+ new MockParticipant(_zkaddr, clusterName, instanceName);
participant.syncStart();
final ZkClient client = participant.getZkClient();
@@ -232,8 +230,8 @@ public class TestZkFlapping extends ZkTestBase {
1, // replicas
"MasterSlave", false);
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller");
controller.syncStart();
final ZkClient client = controller.getZkClient();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
index 96952d0..a34dc66 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -21,7 +21,7 @@ package org.apache.helix.manager.zk;
import java.util.Date;
-import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixMultiClusterController;
import org.apache.helix.HelixConnection;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
@@ -59,7 +59,7 @@ public class TestZkHelixAutoController extends ZkTestBase {
// start auto-controller
ClusterId clusterId = ClusterId.from(clusterName);
- final HelixAutoController[] controllers = new HelixAutoController[n];
+ final HelixMultiClusterController[] controllers = new HelixMultiClusterController[n];
for (int i = 0; i < n; i++) {
int port = 12918 + i;
ControllerId controllerId = ControllerId.from("localhost_" + port);
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
index 86aa6e3..63bf1c5 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
@@ -21,8 +21,6 @@ package org.apache.helix.manager.zk;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.testutil.ZkTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -44,7 +42,7 @@ public class TestZkManagerFlappingDetection extends ZkTestBase {
"MasterSlave", true); // do rebalance
String instanceName = "localhost_" + (12918 + 0);
- MockParticipantManager manager = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ MockParticipant manager = new MockParticipant(_zkaddr, clusterName, instanceName);
manager.connect();
ZkClient zkClient = manager.getZkClient();
@@ -139,7 +137,7 @@ public class TestZkManagerFlappingDetection extends ZkTestBase {
// flapping time window to 5 sec
System.setProperty("helixmanager.flappingTimeWindow", "5000");
System.setProperty("helixmanager.maxDisconnectThreshold", "3");
- ClusterControllerManager manager2 = new ClusterControllerManager(_zkaddr, clusterName, null);
+ MockController manager2 = new MockController(_zkaddr, clusterName, null);
manager2.connect();
Thread.sleep(100);
ZkClient zkClient = manager2.getZkClient();
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index a9c028c..c1972eb 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -20,7 +20,6 @@ package org.apache.helix.manager.zk;
*/
import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.testng.Assert;
public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
@@ -36,7 +35,7 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
// 11 disconnects in 5 sec
for (int i = 0; i < 11; i++) {
Thread.sleep(200);
- _controller.handleStateChanged(KeeperState.Disconnected);
+ // _controller.handleStateChanged(KeeperState.Disconnected);
if (i < 10) {
Assert.assertTrue(_controller.isConnected());
} else {
@@ -51,7 +50,7 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
// ZkStateChangeListener listener2 = new ZkStateChangeListener(_participants[0], 5000, 0);
for (int i = 0; i < 2; i++) {
Thread.sleep(200);
- _participants[0].handleStateChanged(KeeperState.Disconnected);
+ // _participants[0].handleStateChanged(KeeperState.Disconnected);
if (i < 1) {
Assert.assertTrue(_participants[0].isConnected());
} else {
@@ -66,22 +65,22 @@ public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
// ZkStateChangeListener listener3 = new ZkStateChangeListener(_participants[1], 5000, 5);
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
- _participants[1].handleStateChanged(KeeperState.Disconnected);
+ // _participants[1].handleStateChanged(KeeperState.Disconnected);
Assert.assertTrue(_participants[1].isConnected());
}
Thread.sleep(5000);
// Old entries should be cleaned up
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
- _participants[1].handleStateChanged(KeeperState.Disconnected);
+ // _participants[1].handleStateChanged(KeeperState.Disconnected);
Assert.assertTrue(_participants[1].isConnected());
}
for (int i = 0; i < 2; i++) {
Thread.sleep(200);
- _participants[1].handleStateChanged(KeeperState.Disconnected);
+ // _participants[1].handleStateChanged(KeeperState.Disconnected);
Assert.assertTrue(_participants[1].isConnected());
}
- _participants[1].handleStateChanged(KeeperState.Disconnected);
+ // _participants[1].handleStateChanged(KeeperState.Disconnected);
Assert.assertFalse(_participants[1].isConnected());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java b/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
new file mode 100644
index 0000000..c2c4d42
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/ZkConnTestHelper.java
@@ -0,0 +1,34 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Helper class for testing ZkHelixConnection
+ */
+public class ZkConnTestHelper extends ZkHelixConnection {
+
+ public ZkConnTestHelper(String zkAddr) {
+ super(zkAddr);
+ }
+
+ public ZkClient getZkClient() {
+ return _zkclient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 80a46fa..a1c55d5 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -27,9 +27,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -80,12 +81,14 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName)
- .build();
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
+ .forCluster(manager.getClusterName()).forParticipant(instanceName).build();
accessor.set(scope, "TestMsg." + HelixTaskExecutor.MAX_THREADS, "" + 12);
- scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
+ scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
+ manager.getClusterName()).build();
accessor.set(scope, "TestMsg." + HelixTaskExecutor.MAX_THREADS, "" + 8);
for (int i = 0; i < NODE_NR; i++) {
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 97a56be..694af9c 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -25,9 +25,10 @@ import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,8 +38,9 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
public void TestThreadPoolSizeConfig() {
HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+ .forCluster(manager.getClusterName()).forResource("NextDB").build();
accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + 12);
_setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
deleted file mode 100644
index 4ddaac4..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/MockController.java
+++ /dev/null
@@ -1,140 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.State;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState.IdealStateProperty;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.util.HelixUtil;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class MockController {
- private final ZkClient client;
- private final String srcName;
- private final String clusterName;
-
- public MockController(String src, String zkServer, String cluster) {
- srcName = src;
- clusterName = cluster;
- client = new ZkClient(zkServer);
- client.setZkSerializer(new ZNRecordSerializer());
- }
-
- void sendMessage(MessageId msgId, String instanceName, String fromState, String toState,
- String partitionKey, int partitionId) throws InterruptedException, JsonGenerationException,
- JsonMappingException, IOException {
- Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setMessageId(msgId);
- message.setSrcName(srcName);
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setFromState(State.from(fromState));
- message.setToState(State.from(toState));
- // message.setPartitionId(partitionId);
- message.setPartitionId(PartitionId.from(partitionKey));
-
- String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/" + message.getId();
- ObjectMapper mapper = new ObjectMapper();
- StringWriter sw = new StringWriter();
- mapper.writeValueUsingView(sw, message, Message.class);
- System.out.println(sw.toString());
- client.delete(path);
-
- Thread.sleep(10000);
- ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName, instanceName));
- message.setTgtSessionId(SessionId.from(record.getSimpleField(
- LiveInstanceProperty.SESSION_ID.toString()).toString()));
- client.createPersistent(path, message);
- }
-
- public void createExternalView(List<String> instanceNames, int partitions, int replicas,
- String dbName, long randomSeed) {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
- Builder keyBuilder = accessor.keyBuilder();
-
- ExternalView externalView =
- new ExternalView(computeRoutingTable(instanceNames, partitions, replicas, dbName,
- randomSeed));
-
- accessor.setProperty(keyBuilder.externalView(dbName), externalView);
- }
-
- public ZNRecord computeRoutingTable(List<String> instanceNames, int partitions, int replicas,
- String dbName, long randomSeed) {
- assert (instanceNames.size() > replicas);
- Collections.sort(instanceNames);
-
- ZNRecord result = new ZNRecord(dbName);
-
- Map<String, Object> externalView = new TreeMap<String, Object>();
-
- List<Integer> partitionList = new ArrayList<Integer>(partitions);
- for (int i = 0; i < partitions; i++) {
- partitionList.add(new Integer(i));
- }
- Random rand = new Random(randomSeed);
- // Shuffle the partition list
- Collections.shuffle(partitionList, rand);
-
- for (int i = 0; i < partitionList.size(); i++) {
- int partitionId = partitionList.get(i);
- Map<String, String> partitionAssignment = new TreeMap<String, String>();
- int masterNode = i % instanceNames.size();
- // the first in the list is the node that contains the master
- partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
-
- // for the jth replica, we put it on (masterNode + j) % nodes-th
- // node
- for (int j = 1; j <= replicas; j++) {
- partitionAssignment
- .put(instanceNames.get((masterNode + j) % instanceNames.size()), "SLAVE");
- }
- String partitionName = dbName + ".partition-" + partitionId;
- result.setMapField(partitionName, partitionAssignment);
- }
- result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java b/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
deleted file mode 100644
index 193abd3..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/MockControllerProcess.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.helix.api.id.MessageId;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-
-public class MockControllerProcess {
-
- /**
- * @param args
- * @throws IOException
- * @throws JsonMappingException
- * @throws JsonGenerationException
- * @throws InterruptedException
- */
- public static void main(String[] args) throws JsonGenerationException, JsonMappingException,
- InterruptedException, IOException {
-
- MockController storageController =
- new MockController("cm-instance-0", "localhost:2181", "storage-cluster");
- MockController relayController =
- new MockController("cm-instance-0", "localhost:2181", "relay-cluster");
-
- ArrayList<String> instanceNames = new ArrayList<String>();
- instanceNames.add("relay0");
- instanceNames.add("relay1");
- instanceNames.add("relay2");
- instanceNames.add("relay3");
- instanceNames.add("relay4");
-
- relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 0);
-
- // Messages to initiate offline->slave->master->slave transitions
-
- storageController.sendMessage(MessageId.from("TestMessageId1"), "localhost_8900", "Offline",
- "Slave", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- storageController.sendMessage(MessageId.from("TestMessageId2"), "localhost_8900", "Slave",
- "Master", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- storageController.sendMessage(MessageId.from("TestMessageId3"), "localhost_8900", "Master",
- "Slave", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
-
- // Change the external view to trigger the consumer to listen from
- // another relay
- relayController.createExternalView(instanceNames, 10, 2, "EspressoDB", 10);
-
- storageController.sendMessage(MessageId.from("TestMessageId4"), "localhost_8900", "Slave",
- "Offline", "EspressoDB.partition-0", 0);
- Thread.sleep(10000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
index 4d5dd95..6f78427 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestConstraint.java
@@ -28,13 +28,11 @@ import java.util.TreeMap;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.State;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.testutil.HelixTestUtil;
import org.apache.helix.testutil.TestUtil;
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index c31b641..2b8d5e5 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -29,12 +29,11 @@ import javax.management.MalformedObjectNameException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterDistributedController;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
import org.apache.helix.model.IdealState;
import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
import org.apache.helix.testutil.ZkTestBase;
-import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.log4j.Logger;
@@ -46,8 +45,8 @@ import org.testng.annotations.Test;
public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
- MockParticipantManager[] _participants;
- ClusterDistributedController[] _controllers;
+ MockParticipant[] _participants;
+ MockMultiClusterController[] _controllers;
String _controllerClusterName;
String _clusterNamePrefix;
String _firstClusterName;
@@ -92,10 +91,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
"LeaderStandby", true); // do rebalance
// start distributed cluster controllers
- _controllers = new ClusterDistributedController[n + n];
+ _controllers = new MockMultiClusterController[n + n];
for (int i = 0; i < n; i++) {
_controllers[i] =
- new ClusterDistributedController(_zkaddr, _controllerClusterName, "controller_" + i);
+ new MockMultiClusterController(_zkaddr, _controllerClusterName, "controller_" + i);
_controllers[i].syncStart();
}
@@ -106,11 +105,11 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
Assert.assertTrue(result, "Controller cluster NOT in ideal state");
// start first cluster
- _participants = new MockParticipantManager[n];
+ _participants = new MockParticipant[n];
_firstClusterName = _clusterNamePrefix + "0_0";
for (int i = 0; i < n; i++) {
String instanceName = "localhost0_" + (12918 + i);
- _participants[i] = new MockParticipantManager(_zkaddr, _firstClusterName, instanceName);
+ _participants[i] = new MockParticipant(_zkaddr, _firstClusterName, instanceName);
_participants[i].syncStart();
}
@@ -127,7 +126,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
_setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6);
for (int i = n; i < 2 * n; i++) {
_controllers[i] =
- new ClusterDistributedController(_zkaddr, _controllerClusterName, "controller_" + i);
+ new MockMultiClusterController(_zkaddr, _controllerClusterName, "controller_" + i);
_controllers[i].syncStart();
}
@@ -206,8 +205,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
String firstControllerName =
accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId();
- ClusterDistributedController firstController = null;
- for (ClusterDistributedController controller : _controllers) {
+ MockMultiClusterController firstController = null;
+ for (MockMultiClusterController controller : _controllers) {
if (controller.getInstanceName().equals(firstControllerName)) {
firstController = controller;
}
@@ -222,7 +221,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 12);
String instanceName = "localhost0_" + (12918 + 0);
- _participants[0] = new MockParticipantManager(_zkaddr, _firstClusterName, instanceName);
+ _participants[0] = new MockParticipant(_zkaddr, _firstClusterName, instanceName);
_participants[0].syncStart();
// 1 participant comes back
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
index 17e1837..b8a97cc 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
@@ -31,8 +31,8 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterSetup;
@@ -71,14 +71,14 @@ public class TestDropResourceMetricsReset extends ZkTestBase {
// Start participants and controller
ClusterSetup setupTool = new ClusterSetup(_zkclient);
- MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ MockParticipant[] participants = new MockParticipant[NUM_PARTICIPANTS];
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
participants[i] =
- new MockParticipantManager(_zkaddr, clusterName, "localhost_" + (12918 + i));
+ new MockParticipant(_zkaddr, clusterName, "localhost_" + (12918 + i));
participants[i].syncStart();
}
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// Verify that the bean was created
@@ -95,7 +95,7 @@ public class TestDropResourceMetricsReset extends ZkTestBase {
// Clean up
listener.disconnect();
controller.syncStop();
- for (MockParticipantManager participant : participants) {
+ for (MockParticipant participant : participants) {
participant.syncStop();
}
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
index 5497138..c382f55 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
@@ -27,8 +27,8 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
@@ -50,13 +50,13 @@ public class TestResetClusterMetrics extends ZkTestBase {
"OnlineOffline", RebalanceMode.FULL_AUTO, true);
// Add a participant
- MockParticipantManager participant =
- new MockParticipantManager(_zkaddr, clusterName, "localhost_12918");
+ MockParticipant participant =
+ new MockParticipant(_zkaddr, clusterName, "localhost_12918");
participant.syncStart();
// Add a controller
- ClusterControllerManager controller =
- new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
controller.syncStart();
// Make sure everything gets assigned
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 11cdd34..efa30da 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
@@ -93,12 +92,6 @@ public class MockZKHelixManager implements HelixManager {
}
@Override
- public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public void addMessageListener(MessageListener listener, String instanceName) throws Exception {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
index 3e5e068..69f676e 100644
--- a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
@@ -26,7 +26,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.testutil.ZkTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -52,11 +52,11 @@ public class TestZkManagerWithAutoFallbackStore extends ZkTestBase {
"MasterSlave", false); // do rebalance
// start participants
- MockParticipantManager[] participants = new MockParticipantManager[n];
+ MockParticipant[] participants = new MockParticipant[n];
for (int i = 0; i < 1; i++) {
String instanceName = "localhost_" + (12918 + i);
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java b/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
index bc62fca..6ff2b90 100644
--- a/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/testutil/TestUtil.java
@@ -27,7 +27,7 @@ import org.apache.log4j.Logger;
public class TestUtil {
static Logger logger = Logger.getLogger(TestUtil.class);
- public static boolean isTestNGAnnotated(Class clazz, String methodName) {
+ public static boolean isTestNGAnnotated(Class<? extends Object> clazz, String methodName) {
final String annotationsPkgName = "org.testng.annotations";
// Check if the class itself is annotated.
@@ -61,7 +61,7 @@ public class TestUtil {
// The first 2 elements in the stack are getStackTrace and this method itself, so ignore them.
for (int i = 2; i < stackTrace.length; i++) {
- Class clazz = Class.forName(stackTrace[i].getClassName());
+ Class<? extends Object> clazz = Class.forName(stackTrace[i].getClassName());
if (isTestNGAnnotated(clazz, stackTrace[i].getMethodName())) {
String testName = String.format("%s_%s", clazz.getSimpleName(), stackTrace[i].getMethodName());
logger.debug("Detected " + testName + " as the test name");
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
index ec43664..e648a14 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
@@ -23,8 +23,8 @@ import java.util.Arrays;
import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.testutil.ZkTestBase;
@@ -41,8 +41,8 @@ public class TestClusterStateVerifier extends ZkTestBase {
"resource0", "resource1"
};
private HelixAdmin _admin;
- private MockParticipantManager[] _participants;
- private ClusterControllerManager _controller;
+ private MockParticipant[] _participants;
+ private MockController _controller;
private String _clusterName;
@BeforeMethod
@@ -62,13 +62,13 @@ public class TestClusterStateVerifier extends ZkTestBase {
RebalanceMode.SEMI_AUTO.toString());
// Configure and start the participants
- _participants = new MockParticipantManager[RESOURCES.length];
+ _participants = new MockParticipant[RESOURCES.length];
for (int i = 0; i < _participants.length; i++) {
String host = "localhost";
int port = 12918 + i;
String id = host + '_' + port;
_setupTool.addInstanceToCluster(_clusterName, id);
- _participants[i] = new MockParticipantManager(_zkaddr, _clusterName, id);
+ _participants[i] = new MockParticipant(_zkaddr, _clusterName, id);
_participants[i].syncStart();
}
@@ -82,7 +82,7 @@ public class TestClusterStateVerifier extends ZkTestBase {
}
// Start the controller
- _controller = new ClusterControllerManager(_zkaddr, _clusterName, "controller_0");
+ _controller = new MockController(_zkaddr, _clusterName, "controller_0");
_controller.syncStart();
Thread.sleep(1000);
}
@@ -91,7 +91,7 @@ public class TestClusterStateVerifier extends ZkTestBase {
public void afterMethod() {
// Cleanup
_controller.syncStop();
- for (MockParticipantManager participant : _participants) {
+ for (MockParticipant participant : _participants) {
participant.syncStop();
}
_admin.dropCluster(_clusterName);
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index 6d27dcb..9e95b51 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -31,8 +31,8 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.integration.manager.ClusterDistributedController;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -239,8 +239,8 @@ public class TestHelixAdminCli extends ZkTestBase {
final int n = 6;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
- ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+ MockParticipant[] participants = new MockParticipant[n];
+ MockMultiClusterController[] controllers = new MockMultiClusterController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
// activate clusters
@@ -310,7 +310,7 @@ public class TestHelixAdminCli extends ZkTestBase {
Assert.assertTrue(verifyResult);
// clean up
- for (ClusterDistributedController controller : controllers) {
+ for (MockMultiClusterController controller : controllers) {
controller.syncStop();
}
for (int i = 0; i < participants.length; i++) {
@@ -330,8 +330,8 @@ public class TestHelixAdminCli extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
- ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+ MockParticipant[] participants = new MockParticipant[n];
+ MockMultiClusterController[] controllers = new MockMultiClusterController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
String command =
"-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -387,7 +387,7 @@ public class TestHelixAdminCli extends ZkTestBase {
}
private void setupCluster(String clusterName, String grandClusterName, final int n,
- MockParticipantManager[] participants, ClusterDistributedController[] controllers)
+ MockParticipant[] participants, MockMultiClusterController[] controllers)
throws Exception, InterruptedException {
// add cluster
String command = "-zkSvr " + _zkaddr + " -addCluster " + clusterName;
@@ -419,14 +419,14 @@ public class TestHelixAdminCli extends ZkTestBase {
// start mock nodes
for (int i = 0; i < n; i++) {
String instanceName = "localhost_123" + i;
- participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
participants[i].syncStart();
}
// start controller nodes
for (int i = 0; i < 2; i++) {
controllers[i] =
- new ClusterDistributedController(_zkaddr, grandClusterName, "controller_900" + i);
+ new MockMultiClusterController(_zkaddr, grandClusterName, "controller_900" + i);
controllers[i].syncStart();
}
@@ -443,8 +443,8 @@ public class TestHelixAdminCli extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
- ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+ MockParticipant[] participants = new MockParticipant[n];
+ MockMultiClusterController[] controllers = new MockMultiClusterController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
String command =
"-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -522,8 +522,8 @@ public class TestHelixAdminCli extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
- ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+ MockParticipant[] participants = new MockParticipant[n];
+ MockMultiClusterController[] controllers = new MockMultiClusterController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
String command =
"-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
@@ -538,10 +538,10 @@ public class TestHelixAdminCli extends ZkTestBase {
command = "-zkSvr localhost:2183 -expandCluster " + clusterName;
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
- MockParticipantManager[] newParticipants = new MockParticipantManager[4];
+ MockParticipant[] newParticipants = new MockParticipant[4];
for (int i = 3; i <= 6; i++) {
String instanceName = "localhost_123" + i + "1";
- newParticipants[i - 3] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+ newParticipants[i - 3] = new MockParticipant(_zkaddr, clusterName, instanceName);
newParticipants[i - 3].syncStart();
}
@@ -579,8 +579,8 @@ public class TestHelixAdminCli extends ZkTestBase {
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- MockParticipantManager[] participants = new MockParticipantManager[n];
- ClusterDistributedController[] controllers = new ClusterDistributedController[2];
+ MockParticipant[] participants = new MockParticipant[n];
+ MockMultiClusterController[] controllers = new MockMultiClusterController[2];
setupCluster(clusterName, grandClusterName, n, participants, controllers);
String command =
"-zkSvr " + _zkaddr + " -activateCluster " + clusterName + " " + grandClusterName + " true";
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 4064e10..9d56cdf 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -49,7 +49,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkHelixConnection;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.provisioning.ApplicationSpec;
@@ -190,7 +190,7 @@ public class AppMasterLauncher {
readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
appAttemptID.getApplicationId());
Workflow workflow = Workflow.parse(is);
- TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+ TaskDriver taskDriver = new TaskDriver(new ZKHelixManager(controller));
taskDriver.start(workflow);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
index e588ea8..59a2a92 100644
--- a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
@@ -29,6 +29,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRole;
import org.apache.helix.InstanceType;
@@ -38,7 +39,7 @@ import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ContainerConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.provisioning.ApplicationSpec;
import org.apache.helix.provisioning.ApplicationSpecFactory;
@@ -82,7 +83,7 @@ public class JobRunnerMain {
final HelixConnection connection = launcher.pollForConnection();
final ClusterId clusterId = ClusterId.from(appSpec.getAppName());
// TODO: this is a hack -- TaskDriver should accept a connection instead of a manager
- HelixManager manager = new HelixConnectionAdaptor(new HelixRole() {
+ HelixManager manager = new ZKHelixManager(new HelixRole() {
@Override
public HelixConnection getConnection() {
return connection;
@@ -107,6 +108,11 @@ public class JobRunnerMain {
public ClusterMessagingService getMessagingService() {
return null;
}
+
+ @Override
+ public HelixDataAccessor getAccessor() {
+ return null;
+ }
});
// Get all submitted jobs
http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
index 7c50e53..d77a71e 100644
--- a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/MyTaskService.java
@@ -27,7 +27,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.participant.AbstractParticipantService;
import org.apache.helix.provisioning.ServiceConfig;
import org.apache.helix.provisioning.participant.StatelessParticipantService;
@@ -56,7 +56,7 @@ public class MyTaskService extends StatelessParticipantService {
LOG.info("Initialized service with config " + serviceConfig);
// Register for callbacks for tasks
- HelixManager manager = new HelixConnectionAdaptor(getParticipant());
+ HelixManager manager = new ZKHelixManager(getParticipant());
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
taskFactoryReg.put("RunTask", new TaskFactory() {
@Override
|