Repository: helix
Updated Branches:
refs/heads/master dd8226b83 -> 8ba16a8e2
[HELIX-495] Make TestPreferenceListAsQueue non-flaky
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8ba16a8e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8ba16a8e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8ba16a8e
Branch: refs/heads/master
Commit: 8ba16a8e26103ebddc04ab57cbac81b5226d6788
Parents: dd8226b
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Mon Aug 4 14:43:05 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Mon Aug 4 14:43:05 2014 -0700
----------------------------------------------------------------------
.../integration/TestPreferenceListAsQueue.java | 171 +++++++++++--------
1 file changed, 97 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8ba16a8e/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
index 06a2b56..f9c2d47 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java
@@ -21,6 +21,9 @@ package org.apache.helix.integration;
import java.util.Arrays;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
@@ -58,13 +61,17 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
private static final int PARALLELISM = 1;
private List<String> _instanceList;
+ private Queue<List<String>> _prefListHistory;
private String _clusterName;
private String _stateModel;
private HelixAdmin _admin;
+ private CountDownLatch _onlineLatch;
+ private CountDownLatch _offlineLatch;
@BeforeMethod
public void beforeMethod() {
_instanceList = Lists.newLinkedList();
+ _prefListHistory = Lists.newLinkedList();
_admin = _setupTool.getClusterManagementTool();
// Create cluster
@@ -162,6 +169,9 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
HelixManagerFactory.getZKHelixManager(_clusterName, null, InstanceType.CONTROLLER,
_zkaddr);
controller.connect();
+ // Disable controller immediately
+ _admin.enableCluster(_clusterName, false);
+
// This resource only has 1 partition
String partitionName = RESOURCE_NAME + "_" + 0;
@@ -187,26 +197,47 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
Arrays.asList("localhost_1", "localhost_2")));
- // Now wait for the first instance to be done
- Thread.sleep(WAIT_TIME);
- Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
- Arrays.asList("localhost_2", "")));
+ // Prepare for synchronization
+ _onlineLatch = new CountDownLatch(2);
+ _offlineLatch = new CountDownLatch(2);
+ _prefListHistory.clear();
+
+ // Now reenable the controller
+ _admin.enableCluster(_clusterName, true);
+
+ // Now wait for both instances to be done
+ boolean countReached = _onlineLatch.await(10000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(countReached);
+ List<String> top = _prefListHistory.poll();
+ Assert.assertTrue(top.equals(Arrays.asList("localhost_1", ""))
+ || top.equals(Arrays.asList("localhost_2", "")));
+ Assert.assertEquals(_prefListHistory.poll(), Arrays.asList("", ""));
- // Add the first instance again; it should not exist
+ // Wait for everything to be fully offline
+ countReached = _offlineLatch.await(10000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(countReached);
+
+ // Add back the instances in the opposite order
+ _admin.enableCluster(_clusterName, false);
+ addInstanceToPreferences(participants[0].getHelixDataAccessor(),
+ participants[1].getInstanceName(), RESOURCE_NAME, Arrays.asList(partitionName));
addInstanceToPreferences(participants[0].getHelixDataAccessor(),
participants[0].getInstanceName(), RESOURCE_NAME, Arrays.asList(partitionName));
Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
Arrays.asList("localhost_2", "localhost_1")));
- // Now wait for the second instance to be done
- Thread.sleep(WAIT_TIME);
- Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
- Arrays.asList("localhost_1", "")));
-
- // Now wait for the first instance to be done again
- Thread.sleep(WAIT_TIME);
- Assert.assertTrue(preferenceListIsCorrect(_admin, _clusterName, RESOURCE_NAME, partitionName,
- Arrays.asList("", "")));
+ // Reset the latch
+ _onlineLatch = new CountDownLatch(2);
+ _prefListHistory.clear();
+ _admin.enableCluster(_clusterName, true);
+
+ // Now wait both to be done again
+ countReached = _onlineLatch.await(10000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(countReached);
+ top = _prefListHistory.poll();
+ Assert.assertTrue(top.equals(Arrays.asList("localhost_1", ""))
+ || top.equals(Arrays.asList("localhost_2", "")));
+ Assert.assertEquals(_prefListHistory.poll(), Arrays.asList("", ""));
Assert.assertEquals(_instanceList.size(), 0);
// Cleanup
@@ -276,37 +307,32 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
* @param resourceName
* @param partitionName
*/
- private static void removeInstanceFromPreferences(HelixDataAccessor accessor,
- final String instanceName, final String resourceName, final String partitionName) {
- // Updater for ideal state
+ private void removeInstanceFromPreferences(HelixDataAccessor accessor, final String instanceName,
+ final String resourceName, final String partitionName) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String idealStatePath = keyBuilder.idealStates(resourceName).getPath();
- DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- List<String> preferenceList = currentData.getListField(partitionName);
- int numReplicas =
- Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
- currentData.setListField(partitionName,
- removeInstanceFromPreferenceList(preferenceList, instanceName, numReplicas));
- return currentData;
- }
- };
-
- // Updater for instance config
- String instanceConfigPath = keyBuilder.instanceConfig(instanceName).getPath();
- DataUpdater<ZNRecord> instanceConfigUpdater = new DataUpdater<ZNRecord>()
{
- @Override
- public ZNRecord update(ZNRecord currentData) {
- // currentData.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), false);
- return currentData;
- }
- };
- List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- updaters.add(idealStateUpdater);
- updaters.add(instanceConfigUpdater);
- accessor.updateChildren(Arrays.asList(idealStatePath, instanceConfigPath), updaters,
- AccessOption.PERSISTENT);
+ synchronized (_prefListHistory) {
+ // Updater for ideal state
+ final List<String> prefList = Lists.newLinkedList();
+ DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ List<String> preferenceList = currentData.getListField(partitionName);
+ int numReplicas =
+ Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
+ List<String> newPrefList =
+ removeInstanceFromPreferenceList(preferenceList, instanceName, numReplicas);
+ currentData.setListField(partitionName, newPrefList);
+ prefList.clear();
+ prefList.addAll(newPrefList);
+ return currentData;
+ }
+ };
+ List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+ updaters.add(idealStateUpdater);
+ accessor.updateChildren(Arrays.asList(idealStatePath), updaters, AccessOption.PERSISTENT);
+ _prefListHistory.add(prefList);
+ }
}
/**
@@ -317,41 +343,36 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
* @param resourceName
* @param partitions
*/
- private static void addInstanceToPreferences(HelixDataAccessor accessor,
- final String instanceName, final String resourceName, final List<String> partitions)
{
- // Updater for ideal state
+ private void addInstanceToPreferences(HelixDataAccessor accessor, final String instanceName,
+ final String resourceName, final List<String> partitions) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String idealStatePath = keyBuilder.idealStates(resourceName).getPath();
- DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- for (String partitionName : partitions) {
- List<String> preferenceList = currentData.getListField(partitionName);
- int numReplicas =
- Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
- currentData.setListField(partitionName,
- addInstanceToPreferenceList(preferenceList, instanceName, numReplicas));
+ synchronized (_prefListHistory) {
+ // Updater for ideal state
+ final List<String> prefList = Lists.newLinkedList();
+ DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ for (String partitionName : partitions) {
+ List<String> preferenceList = currentData.getListField(partitionName);
+ int numReplicas =
+ Integer.valueOf(currentData.getSimpleField(IdealStateProperty.REPLICAS.toString()));
+ List<String> newPrefList =
+ addInstanceToPreferenceList(preferenceList, instanceName, numReplicas);
+ currentData.setListField(partitionName, newPrefList);
+ prefList.clear();
+ prefList.addAll(newPrefList);
+ }
+ return currentData;
}
- return currentData;
- }
- };
+ };
- // Updater for instance config
- String instanceConfigPath = keyBuilder.instanceConfig(instanceName).getPath();
- DataUpdater<ZNRecord> instanceConfigUpdater = new DataUpdater<ZNRecord>()
{
- @Override
- public ZNRecord update(ZNRecord currentData) {
- // currentData.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), true);
- return currentData;
- }
- };
-
- // Send update requests together
- List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
- updaters.add(idealStateUpdater);
- updaters.add(instanceConfigUpdater);
- accessor.updateChildren(Arrays.asList(idealStatePath, instanceConfigPath), updaters,
- AccessOption.PERSISTENT);
+ // Send update requests together
+ List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+ updaters.add(idealStateUpdater);
+ accessor.updateChildren(Arrays.asList(idealStatePath), updaters, AccessOption.PERSISTENT);
+ _prefListHistory.add(prefList);
+ }
}
/**
@@ -455,6 +476,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
newSize = _instanceList.size();
}
Assert.assertEquals(newSize, oldSize); // ensure nothing came in during this time
+ _onlineLatch.countDown();
}
public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
@@ -467,6 +489,7 @@ public class TestPreferenceListAsQueue extends ZkTestBase {
HelixManager manager = context.getManager();
LOG.info("onBecomeDroppedFromOffline for " + message.getPartitionName() + " on "
+ manager.getInstanceName());
+ _offlineLatch.countDown();
}
public void onBecomeOfflineFromError(Message message, NotificationContext context) {
|