helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] [HELIX-109] Review Helix model package, first half of rebalance-pipeline refactor
Date Wed, 04 Sep 2013 20:15:00 GMT
Updated Branches:
  refs/heads/helix-logical-model 75b534ddb -> 9c7de4c33


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
new file mode 100644
index 0000000..95862ae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -0,0 +1,154 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+public class NewTaskAssignmentStage extends AbstractBaseStage {
+  private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    long startTime = System.currentTimeMillis();
+    logger.info("START TaskAssignmentStage.process()");
+
+    HelixManager manager = event.getAttribute("helixmanager");
+    Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+    MessageThrottleStageOutput messageOutput =
+        event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    Cluster cluster = event.getAttribute("ClusterDataCache");
+    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+    if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+        || liveParticipantMap == null) {
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
+    }
+
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+    List<Message> messagesToSend = new ArrayList<Message>();
+    for (ResourceId resourceId : resourceMap.keySet()) {
+      Resource resource = resourceMap.get(resourceId);
+      // TODO fix it
+      // for (Partition partition : resource.getPartitions()) {
+      // List<Message> messages = messageOutput.getMessages(resourceName, partition);
+      // messagesToSend.addAll(messages);
+      // }
+    }
+
+    List<Message> outputMessages =
+        batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
+            manager.getProperties());
+    sendMessages(dataAccessor, outputMessages);
+
+    long endTime = System.currentTimeMillis();
+    logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
+
+  }
+
+  List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
+      Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant>
liveParticipantMap,
+      HelixManagerProperties properties) {
+    // group messages by its CurrentState path + "/" + fromState + "/" + toState
+    Map<String, Message> batchMessages = new HashMap<String, Message>();
+    List<Message> outputMessages = new ArrayList<Message>();
+
+    Iterator<Message> iter = messages.iterator();
+    while (iter.hasNext()) {
+      Message message = iter.next();
+      ResourceId resourceId = message.getResourceId();
+      Resource resource = resourceMap.get(resourceId.stringify());
+
+      String participantId = message.getTgtName();
+      Participant liveParticipant = liveParticipantMap.get(participantId);
+      String participantVersion = null;
+      if (liveParticipant != null) {
+        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
+      }
+
+      if (resource == null || !resource.getRebalancerConfig().getBatchMessageMode()
+          || participantVersion == null
+          || !properties.isFeatureSupported("batch_message", participantVersion)) {
+        outputMessages.add(message);
+        continue;
+      }
+
+      String key =
+          keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId().stringify(),
+              message.getResourceId().stringify()).getPath()
+              + "/" + message.getFromState() + "/" + message.getToState();
+
+      if (!batchMessages.containsKey(key)) {
+        Message batchMessage = new Message(message.getRecord());
+        batchMessage.setBatchMessageMode(true);
+        outputMessages.add(batchMessage);
+        batchMessages.put(key, batchMessage);
+      }
+      batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
+    }
+
+    return outputMessages;
+  }
+
+  protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
{
+    if (messages == null || messages.isEmpty()) {
+      return;
+    }
+
+    Builder keyBuilder = dataAccessor.keyBuilder();
+
+    List<PropertyKey> keys = new ArrayList<PropertyKey>();
+    for (Message message : messages) {
+      logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+          + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + "
from:"
+          + message.getFromState() + " to:" + message.getToState());
+
+      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
+      // message.getTgtName()
+      // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
+      // + " from: " + message.getFromState() + " to: " + message.getToState());
+
+      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
+    }
+
+    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
new file mode 100644
index 0000000..98ae60b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -0,0 +1,142 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNewStages extends ZkUnitTestBase {
+  final int n = 2;
+  final int p = 8;
+  MockParticipant[] _participants = new MockParticipant[n];
+  ClusterController _controller;
+
+  ClusterId _clusterId;
+  HelixDataAccessor _dataAccessor;
+
+  @Test
+  public void testReadClusterDataStage() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+    Cluster cluster = clusterAccessor.readCluster();
+
+    ClusterId id = cluster.getId();
+    Assert.assertEquals(id, _clusterId);
+    Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+    Assert.assertEquals(liveParticipantMap.size(), n);
+
+    for (ParticipantId participantId : liveParticipantMap.keySet()) {
+      Participant participant = liveParticipantMap.get(participantId);
+      Map<ResourceId, CurrentState> curStateMap = participant.getCurrentStateMap();
+      Assert.assertEquals(curStateMap.size(), 1);
+
+      ResourceId resourceId = new ResourceId("TestDB0");
+      Assert.assertTrue(curStateMap.containsKey(resourceId));
+      CurrentState curState = curStateMap.get(resourceId);
+      Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
+      Assert.assertEquals(partitionStateMap.size(), p);
+    }
+
+    Map<ResourceId, Resource> resourceMap = cluster.getResourceMap();
+    Assert.assertEquals(resourceMap.size(), 1);
+
+    ResourceId resourceId = new ResourceId("TestDB0");
+    Assert.assertTrue(resourceMap.containsKey(resourceId));
+    Resource resource = resourceMap.get(resourceId);
+    Assert
+        .assertEquals(resource.getRebalancerConfig().getRebalancerMode(), RebalanceMode.SEMI_AUTO);
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // set up a running class
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    _clusterId = new ClusterId(clusterName);
+
+    System.out.println("START " + _clusterId + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        p, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    _controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      _participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      _participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    _dataAccessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // tear down the cluster
+    _controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      _participants[i].syncStop();
+    }
+
+    System.out.println("END " + _clusterId + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message