helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: HELIX-50: Ensure num replicas and preference list size in idealstate matches
Date Thu, 21 Feb 2013 01:03:11 GMT
Updated Branches:
  refs/heads/master 11ba1bd01 -> 723d014e0


HELIX-50: Ensure num replicas and preference list size in idealstate matches


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/723d014e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/723d014e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/723d014e

Branch: refs/heads/master
Commit: 723d014e0974f29c977d80dc7520cc0c019ab5db
Parents: 11ba1bd
Author: zzhang <zzhang5@uci.edu>
Authored: Wed Feb 20 17:03:01 2013 -0800
Committer: zzhang <zzhang5@uci.edu>
Committed: Wed Feb 20 17:03:01 2013 -0800

----------------------------------------------------------------------
 .../helix/controller/GenericHelixController.java   |    4 +-
 .../controller/stages/MessageGenerationOutput.java |    8 +-
 .../controller/stages/MessageGenerationPhase.java  |   30 ++++-
 .../integration/TestInvalidAutoIdealState.java     |  106 +++++++++++++++
 4 files changed, 137 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/723d014e/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index deb2bdb..914f964 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -606,13 +606,13 @@ public class GenericHelixController implements
 	for (String instance : curInstances.keySet()) {
 		if (lastInstances == null || !lastInstances.containsKey(instance)) {
 	        try {
-	          // add message listeners for new sessions
+	          // add message listeners for new instances
 	          manager.addMessageListener(this, instance);
 	          logger.info("Succeed in adding message listener for " + instance);
 	        }
 	        catch (Exception e)
 	        {
-	          logger.error("Fail to add message listener for instance:" + instance, e);
+	          logger.error("Fail to add message listener for instance: " + instance, e);
 	        }
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/723d014e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
index a461589..23723a7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationOutput.java
@@ -40,7 +40,7 @@ public class MessageGenerationOutput
 
   }
 
-  public void addMessage(String resourceName, Partition resource,
+  public void addMessage(String resourceName, Partition partition,
       Message message)
   {
     if (!_messagesMap.containsKey(resourceName))
@@ -48,13 +48,13 @@ public class MessageGenerationOutput
       _messagesMap.put(resourceName,
           new HashMap<Partition, List<Message>>());
     }
-    if (!_messagesMap.get(resourceName).containsKey(resource))
+    if (!_messagesMap.get(resourceName).containsKey(partition))
     {
-      _messagesMap.get(resourceName).put(resource,
+      _messagesMap.get(resourceName).put(partition,
           new ArrayList<Message>());
 
     }
-    _messagesMap.get(resourceName).get(resource).add(message);
+    _messagesMap.get(resourceName).get(partition).add(message);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/723d014e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index adb81ab..32d9a8e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -19,7 +19,9 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -37,10 +39,8 @@ import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.log4j.Logger;
 
-
 /**
- * Compares the currentState,pendingState with IdealState and generate messages
- * 
+ * Compares the currentState, pendingState with IdealState and generate messages
  * 
  */
 public class MessageGenerationPhase extends AbstractBaseStage
@@ -85,6 +85,11 @@ public class MessageGenerationPhase extends AbstractBaseStage
         Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
             resourceName, partition);
 
+        // we should generate message based on the desired-state priority
+        // so keep generated messages in a temp map keyed by state
+        // desired-state->list of generated-messages
+        Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
+        
         for (String instanceName : instanceStateMap.keySet())
         {
           String desiredState = instanceStateMap.get(instanceName);
@@ -173,10 +178,25 @@ public class MessageGenerationPhase extends AbstractBaseStage
               }
             }
             message.getRecord().setSimpleField("ClusterEventName", event.getName());
-            output.addMessage(resourceName, partition, message);
+            // output.addMessage(resourceName, partition, message);
+            if (!messageMap.containsKey(desiredState)) {
+            	messageMap.put(desiredState, new ArrayList<Message>());
+            }
+            messageMap.get(desiredState).add(message);
           }
         }
-      }
+        
+        // add generated messages to output according to state priority
+        List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+        for (String state : statesPriorityList) {
+        	if (messageMap.containsKey(state)) {
+        		for (Message message : messageMap.get(state)) {
+        			output.addMessage(resourceName, partition, message);
+        		}
+        	}
+        }
+        
+      }	// end of for-each-partition
     }
     event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/723d014e/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
new file mode 100644
index 0000000..ab19d4a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
@@ -0,0 +1,106 @@
+package org.apache.helix.integration;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+// Helix-50: integration test for generate message based on state priority
+public class TestInvalidAutoIdealState extends ZkUnitTestBase {
+    @Test void testInvalidReplica2() throws Exception
+    {
+        HelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+        
+        // create cluster
+	    String className = TestHelper.getTestClassName();
+	    String methodName = TestHelper.getTestMethodName();
+	    String clusterName = className + "_" + methodName;
+	    String db = "TestDB";
+
+	    System.out.println("START " + clusterName + " at "
+	    		+ new Date(System.currentTimeMillis()));
+
+        // System.out.println("Creating cluster: " + clusterName);
+        admin.addCluster(clusterName, true);
+
+        // add MasterSlave state mode definition
+        admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+                new StateModelConfigGenerator().generateConfigForMasterSlave()));
+
+        // Add nodes to the cluster
+	    int n = 3;
+        System.out.println("Adding " + n + " participants to the cluster");
+        for (int i = 0; i < n; i++) {
+        	int port = 12918 + i;
+            InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
+            instanceConfig.setHostName("localhost");
+            instanceConfig.setPort("" + port);
+            instanceConfig.setInstanceEnabled(true);
+            admin.addInstance(clusterName, instanceConfig);
+            // System.out.println("\t Added participant: " + instanceConfig.getInstanceName());
+        }
+
+        // construct ideal-state manually
+        IdealState idealState = new IdealState(db);
+        idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
+        idealState.setNumPartitions(2);
+        idealState.setReplicas("" + 2);	// should be 3
+        idealState.setStateModelDefRef("MasterSlave");
+        idealState.getRecord().setListField("TestDB_0", Arrays.asList("localhost_12918",
"localhost_12919", "localhost_12920"));
+        idealState.getRecord().setListField("TestDB_1", Arrays.asList("localhost_12919",
"localhost_12918", "localhost_12920"));
+        
+        admin.setResourceIdealState(clusterName, "TestDB", idealState);
+
+	    // start participants
+	    MockParticipant[] participants = new MockParticipant[n];
+	    for (int i = 0; i < n; i++)
+	    {
+	      String instanceName = "localhost_" + (12918 + i);
+
+	      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+	      participants[i].syncStart();
+	    }
+
+	    ClusterController controller =
+	        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+	    controller.syncStart();
+
+	    boolean result =
+	        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+	                                                                                 clusterName));
+	    Assert.assertTrue(result);
+
+	    // make sure localhost_12919 is master on TestDB_1
+	    HelixDataAccessor accessor = controller.getManager().getHelixDataAccessor();
+	    Builder keyBuilder = accessor.keyBuilder();
+	    ExternalView extView = accessor.getProperty(keyBuilder.externalView(db));
+	    Map<String, String> stateMap = extView.getStateMap(db + "_1");
+	    Assert.assertEquals(stateMap.get("localhost_12919"), "MASTER", 
+	    		"localhost_12919 should be MASTER even though replicas is set to 2, since we genereate
message based on target-state priority");
+	    
+	    System.out.println("END " + clusterName + " at "
+	            + new Date(System.currentTimeMillis()));
+    }
+}


Mime
View raw message