helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [09/38] helix git commit: Restructure ClusterVerifiers. Add HelixClusterVerifier interface, add abstract class ZkHelixClusterVerifier, and a BestPossibleExternViewVerifier implementation.
Date Wed, 08 Feb 2017 17:59:44 GMT
Restructure ClusterVerifiers. Add HelixClusterVerifier interface, add abstract class ZkHelixClusterVerifier,
and a BestPossibleExternViewVerifier implementation.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/04495e70
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/04495e70
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/04495e70

Branch: refs/heads/helix-0.6.x
Commit: 04495e707a3a1a9067fd487cac96a169dc45c6dd
Parents: 695228e
Author: Lei Xia <lxia@linkedin.com>
Authored: Tue Jul 19 18:17:19 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Sun Feb 5 18:56:03 2017 -0800

----------------------------------------------------------------------
 .../BestPossibleExternalViewVerifier.java       | 359 +++++++++++++++++++
 .../ClusterExternalViewVerifier.java            |   6 +-
 .../ClusterLiveNodesVerifier.java               |  27 +-
 .../ClusterStateVerifier.java                   |   6 +
 .../ClusterStateVerifier/ClusterVerifier.java   |   1 +
 .../HelixClusterVerifier.java                   |  40 +++
 .../ZkHelixClusterVerifier.java                 | 269 ++++++++++++++
 .../apache/helix/tools/IntegrationTestUtil.java |   2 +-
 .../helix/tools/TestClusterStateVerifier.java   |   1 +
 .../apache/helix/tools/TestClusterVerifier.java | 139 +++++++
 10 files changed, 839 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
new file mode 100644
index 0000000..b350d91
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
@@ -0,0 +1,359 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * verifier that the ExternalViews of given resources (or all resources in the cluster)
+ * match its best possible mapping states.
+ */
+public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
+  private static Logger LOG = Logger.getLogger(BestPossibleExternalViewVerifier.class);
+
+  private final Map<String, Map<String, String>> _errStates;
+  private final Set<String> _resources;
+  private final Set<String> _expectLiveInstances;
+
+  public BestPossibleExternalViewVerifier(String zkAddr, String clusterName, Set<String>
resources,
+      Map<String, Map<String, String>> errStates, Set<String> expectLiveInstances)
{
+    super(zkAddr, clusterName);
+    _errStates = errStates;
+    _resources = resources;
+    _expectLiveInstances = expectLiveInstances;
+  }
+
+  public BestPossibleExternalViewVerifier(ZkClient zkClient, String clusterName,
+      Set<String> resources, Map<String, Map<String, String>> errStates,
+      Set<String> expectLiveInstances) {
+    super(zkClient, clusterName);
+    _errStates = errStates;
+    _resources = resources;
+    _expectLiveInstances = expectLiveInstances;
+  }
+
+  public static class Builder {
+    private String _clusterName;
+    private Map<String, Map<String, String>> _errStates;
+    private Set<String> _resources;
+    private Set<String> _expectLiveInstances;
+    private String _zkAddr;
+    private ZkClient _zkClient;
+
+    public BestPossibleExternalViewVerifier build() {
+      if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
+        throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+      }
+
+      if (_zkClient != null) {
+        return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources,
_errStates,
+            _expectLiveInstances);
+      }
+      return new BestPossibleExternalViewVerifier(_zkAddr, _clusterName, _resources, _errStates,
+          _expectLiveInstances);
+    }
+
+    public String getClusterName() {
+      return _clusterName;
+    }
+
+    public Builder setClusterName(String clusterName) {
+      _clusterName = clusterName;
+      return this;
+    }
+
+    public Map<String, Map<String, String>> getErrStates() {
+      return _errStates;
+    }
+
+    public Builder setErrStates(Map<String, Map<String, String>> errStates) {
+      _errStates = errStates;
+      return this;
+    }
+
+    public Set<String> getResources() {
+      return _resources;
+    }
+
+    public Builder setResources(Set<String> resources) {
+      _resources = resources;
+      return this;
+    }
+
+    public Set<String> getExpectLiveInstances() {
+      return _expectLiveInstances;
+    }
+
+    public Builder setExpectLiveInstances(Set<String> expectLiveInstances) {
+      _expectLiveInstances = expectLiveInstances;
+      return this;
+    }
+
+    public String getZkAddr() {
+      return _zkAddr;
+    }
+
+    public Builder setZkAddr(String zkAddr) {
+      _zkAddr = zkAddr;
+      return this;
+    }
+
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    public Builder setZkClient(ZkClient zkClient) {
+      _zkClient = zkClient;
+      return this;
+    }
+  }
+
+  @Override
+  public boolean verify(long timeout) {
+    return verifyByZkCallback(timeout);
+  }
+
+  @Override
+  public boolean verifyByZkCallback(long timeout) {
+    List<ClusterVerifyTrigger> triggers = new ArrayList<ClusterVerifyTrigger>();
+
+    // setup triggers
+    if (_resources != null && !_resources.isEmpty()) {
+      for (String resource : _resources) {
+        triggers
+            .add(new ClusterVerifyTrigger(_keyBuilder.idealStates(resource), true, false,
false));
+        triggers
+            .add(new ClusterVerifyTrigger(_keyBuilder.externalView(resource), true, false,
false));
+      }
+
+    } else {
+      triggers.add(new ClusterVerifyTrigger(_keyBuilder.idealStates(), false, true, true));
+      triggers.add(new ClusterVerifyTrigger(_keyBuilder.externalViews(), false, true, true));
+    }
+
+    return verifyByCallback(timeout, triggers);
+  }
+
+  @Override
+  protected boolean verifyState() {
+    try {
+      PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+      // read cluster once and do verification
+      ClusterDataCache cache = new ClusterDataCache();
+      cache.refresh(_accessor);
+
+      Map<String, IdealState> idealStates = cache.getIdealStates();
+      if (idealStates == null) {
+        // ideal state is null because ideal state is dropped
+        idealStates = Collections.emptyMap();
+      }
+
+      // filter out all resources that use Task state model
+      Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<String, IdealState> pair = it.next();
+        if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME))
{
+          it.remove();
+        }
+      }
+
+      // verify live instances.
+      if (_expectLiveInstances != null && !_expectLiveInstances.isEmpty()) {
+        Set<String> actualLiveNodes = cache.getLiveInstances().keySet();
+        if (!_expectLiveInstances.equals(actualLiveNodes)) {
+          return false;
+        }
+      }
+
+      Map<String, ExternalView> extViews = _accessor.getChildValuesMap(keyBuilder.externalViews());
+      if (extViews == null) {
+        extViews = Collections.emptyMap();
+      }
+
+      // Filter resources if requested
+      if (_resources != null && !_resources.isEmpty()) {
+        idealStates.keySet().retainAll(_resources);
+        extViews.keySet().retainAll(_resources);
+      }
+
+      // if externalView is not empty and idealState doesn't exist
+      // add empty idealState for the resource
+      for (String resource : extViews.keySet()) {
+        if (!idealStates.containsKey(resource)) {
+          idealStates.put(resource, new IdealState(resource));
+        }
+      }
+
+      // calculate best possible state
+      BestPossibleStateOutput bestPossOutput = calcBestPossState(cache);
+      Map<String, Map<Partition, Map<String, String>>> bestPossStateMap
=
+          bestPossOutput.getStateMap();
+
+      // set error states
+      if (_errStates != null) {
+        for (String resourceName : _errStates.keySet()) {
+          Map<String, String> partErrStates = _errStates.get(resourceName);
+          for (String partitionName : partErrStates.keySet()) {
+            String instanceName = partErrStates.get(partitionName);
+
+            if (!bestPossStateMap.containsKey(resourceName)) {
+              bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String,
String>>());
+            }
+            Partition partition = new Partition(partitionName);
+            if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
+              bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+            }
+            bestPossStateMap.get(resourceName).get(partition)
+                .put(instanceName, HelixDefinedState.ERROR.toString());
+          }
+        }
+      }
+
+      for (String resourceName : idealStates.keySet()) {
+        ExternalView extView = extViews.get(resourceName);
+        if (extView == null) {
+          IdealState is = idealStates.get(resourceName);
+          if (is.isExternalViewDisabled()) {
+            continue;
+          } else {
+            LOG.debug("externalView for " + resourceName + " is not available");
+            return false;
+          }
+        }
+
+        // step 0: remove empty map and DROPPED state from best possible state
+        Map<Partition, Map<String, String>> bpStateMap =
+            bestPossOutput.getResourceMap(resourceName);
+
+        boolean result = verifyExternalView(extView, bpStateMap);
+        if (!result) {
+          return false;
+        }
+      }
+      return true;
+    } catch (Exception e) {
+      LOG.error("exception in verification", e);
+      return false;
+    }
+  }
+
+  private boolean verifyExternalView(ExternalView externalView,
+      Map<Partition, Map<String, String>> bestPossibleState) {
+
+    // TODO: Is this necessary?
+    // remove empty and dropped items.
+    Iterator<Map.Entry<Partition, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Partition, Map<String, String>> entry = iter.next();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (instanceStateMap.isEmpty()) {
+        iter.remove();
+      } else {
+        // remove instances with DROPPED state
+        Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+        while (insIter.hasNext()) {
+          Map.Entry<String, String> insEntry = insIter.next();
+          String state = insEntry.getValue();
+          if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+            insIter.remove();
+          }
+        }
+      }
+    }
+
+    Map<String, Map<String, String>> bestPossibleStateMap =
+        convertBestPossibleState(bestPossibleState);
+    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
+
+    return externalViewMap.equals(bestPossibleStateMap);
+  }
+
+  private Map<String, Map<String, String>> convertBestPossibleState(
+      Map<Partition, Map<String, String>> bestPossibleState) {
+    Map<String, Map<String, String>> result = new HashMap<String, Map<String,
String>>();
+    for (Partition partition : bestPossibleState.keySet()) {
+      result.put(partition.getPartitionName(), bestPossibleState.get(partition));
+    }
+    return result;
+  }
+
+  /**
+   * calculate the best possible state note that DROPPED states are not checked since when
+   * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+   *
+   * @param cache
+   * @return
+   * @throws Exception
+   */
+  private BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
{
+    ClusterEvent event = new ClusterEvent("sampleEvent");
+    event.addAttribute("ClusterDataCache", cache);
+
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new BestPossibleStateCalcStage());
+
+    BestPossibleStateOutput output =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+
+    return output;
+  }
+
+  private void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
+
+  @Override
+  public String toString() {
+    String verifierName = getClass().getName();
+    verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+    return verifierName + "(" + _clusterName + "@" + _zkClient.getServers() + "@resources["
+        + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])";
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
index 4ec882e..af69e63 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
@@ -42,8 +42,12 @@ import org.apache.log4j.Logger;
 /**
  * given zk, cluster, and a list of expected live-instances
  * check whether cluster's external-view reaches best-possible states
- *
  */
+
+/**
+ * This class is deprecated, please use BestPossibleExternalViewVerifier instead.
+ */
+@Deprecated
 public class ClusterExternalViewVerifier extends ClusterVerifier {
   private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
index 5c502e0..51de34e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
@@ -19,27 +19,36 @@ package org.apache.helix.tools.ClusterStateVerifier;
  * under the License.
  */
 
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.helix.manager.zk.ZkClient;
 
-public class ClusterLiveNodesVerifier extends ClusterVerifier {
+public class ClusterLiveNodesVerifier extends ZkHelixClusterVerifier {
 
-  final List<String> _expectSortedLiveNodes; // always sorted
+  final Set<String> _expectLiveNodes;
 
   public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
-    _expectSortedLiveNodes = expectLiveNodes;
-    Collections.sort(_expectSortedLiveNodes);
+    _expectLiveNodes = new HashSet<String>(expectLiveNodes);
   }
 
   @Override
-  public boolean verify() throws Exception {
-    List<String> actualLiveNodes = _accessor.getChildNames(_keyBuilder.liveInstances());
-    Collections.sort(actualLiveNodes);
-    return _expectSortedLiveNodes.equals(actualLiveNodes);
+  public boolean verifyByZkCallback(long timeout) {
+    List<ClusterVerifyTrigger> triggers = new ArrayList<ClusterVerifyTrigger>();
+    triggers.add(new ClusterVerifyTrigger(_keyBuilder.liveInstances(), false, true, true));
+
+    return verifyByCallback(timeout, triggers);
+  }
+
+  @Override
+  protected boolean verifyState() throws Exception {
+    Set<String> actualLiveNodes =
+        new HashSet<String>(_accessor.getChildNames(_keyBuilder.liveInstances()));
+    return _expectLiveNodes.equals(actualLiveNodes);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
index f399f1f..9bfa786 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
@@ -70,6 +70,10 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
 
+/**
+ * This class is deprecated, please use dedicated verifier classes, such as BestPossibleExternViewVerifier,
etc.
+ */
+@Deprecated
 public class ClusterStateVerifier {
   public static String cluster = "cluster";
   public static String zkServerAddress = "zkSvr";
@@ -90,6 +94,8 @@ public class ClusterStateVerifier {
     String getClusterName();
   }
 
+  /** Use BestPossibleExternViewVerifier instead */
+  @Deprecated
   static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
     final CountDownLatch _countDown;
     final ZkClient _zkClient;

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterVerifier.java
index c7560ef..4012ef2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterVerifier.java
@@ -33,6 +33,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 
+@Deprecated
 public abstract class ClusterVerifier implements IZkChildListener, IZkDataListener {
   private static Logger LOG = Logger.getLogger(ClusterVerifier.class);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/HelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/HelixClusterVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/HelixClusterVerifier.java
new file mode 100644
index 0000000..bf94e5f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/HelixClusterVerifier.java
@@ -0,0 +1,40 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface HelixClusterVerifier {
+  /**
+   *  Verify the cluster.
+   *  The method will be blocked at most {@code timeout}.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @param timeout in milliseconds
+   * @return true if succeed, false if not.
+   */
+  boolean verify(long timeout);
+
+  /**
+   *  Verify the cluster.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   *  @return true if succeed, false if not.
+   */
+  boolean verify();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ZkHelixClusterVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ZkHelixClusterVerifier.java
new file mode 100644
index 0000000..094deb8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ZkHelixClusterVerifier.java
@@ -0,0 +1,269 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ZkHelixClusterVerifier
+    implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
+  private static Logger LOG = Logger.getLogger(ZkHelixClusterVerifier.class);
+  protected static int DEFAULT_TIMEOUT = 30 * 1000;
+  protected static int DEFAULT_PERIOD = 1000;
+
+
+  protected final ZkClient _zkClient;
+  protected final String _clusterName;
+  protected final HelixDataAccessor _accessor;
+  protected final PropertyKey.Builder _keyBuilder;
+  private CountDownLatch _countdown;
+
+  protected static class ClusterVerifyTrigger {
+    final PropertyKey _triggerKey;
+    final boolean _triggerOnDataChange;
+    final boolean _triggerOnChildChange;
+    final boolean _triggerOnChildDataChange;
+
+    public ClusterVerifyTrigger(PropertyKey triggerKey, boolean triggerOnDataChange,
+        boolean triggerOnChildChange, boolean triggerOnChildDataChange) {
+      _triggerKey = triggerKey;
+      _triggerOnDataChange = triggerOnDataChange;
+      _triggerOnChildChange = triggerOnChildChange;
+      _triggerOnChildDataChange = triggerOnChildDataChange;
+    }
+
+    public boolean isTriggerOnDataChange() {
+      return _triggerOnDataChange;
+    }
+
+    public PropertyKey getTriggerKey() {
+      return _triggerKey;
+    }
+
+    public boolean isTriggerOnChildChange() {
+      return _triggerOnChildChange;
+    }
+
+    public boolean isTriggerOnChildDataChange() {
+      return _triggerOnChildDataChange;
+    }
+  }
+
+  public ZkHelixClusterVerifier(ZkClient zkClient, String clusterName) {
+    if (zkClient == null || clusterName == null) {
+      throw new IllegalArgumentException("requires zkClient|clusterName");
+    }
+    _zkClient = zkClient;
+    _clusterName = clusterName;
+    _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    _keyBuilder = _accessor.keyBuilder();
+  }
+
+  public ZkHelixClusterVerifier(String zkAddr, String clusterName) {
+    if (zkAddr == null || clusterName == null) {
+      throw new IllegalArgumentException("requires zkAddr|clusterName");
+    }
+    _zkClient = ZKClientPool.getZkClient(zkAddr);
+    _clusterName = clusterName;
+    _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    _keyBuilder = _accessor.keyBuilder();
+  }
+
+  /**
+   *  Verify the cluster.
+   *  The method will be blocked at most {@code timeout}.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @param timeout in milliseconds
+   * @return true if succeed, false if not.
+   */
+  public boolean verify(long timeout) {
+    return verifyByZkCallback(timeout);
+  }
+
+  /**
+   *  Verify the cluster.
+   *  The method will be blocked at most 30 seconds.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @return true if succeed, false if not.
+   */
+  public boolean verify() {
+    return verify(DEFAULT_TIMEOUT);
+  }
+
+  /**
+   *  Verify the cluster by relying on zookeeper callback and verify.
+   *  The method will be blocked at most {@code timeout}.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @param timeout in milliseconds
+   * @return true if succeed, false if not.
+   */
+  public abstract boolean verifyByZkCallback(long timeout);
+
+  /**
+   *  Verify the cluster by relying on zookeeper callback and verify.
+   *  The method will be blocked at most 30 seconds.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @return true if succeed, false if not.
+   */
+  public boolean verifyByZkCallback() {
+    return verifyByZkCallback(DEFAULT_TIMEOUT);
+  }
+
+  /**
+   *  Verify the cluster by periodically polling the cluster status and verify.
+   *  The method will be blocked at most {@code timeout}.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @param timeout
+   * @param period polling interval
+   * @return
+   */
+  public boolean verifyByPolling(long timeout, long period) {
+    try {
+      long start = System.currentTimeMillis();
+      boolean success;
+      do {
+        success = verifyState();
+        if (success) {
+          return true;
+        }
+        TimeUnit.MILLISECONDS.sleep(period);
+      } while ((System.currentTimeMillis() - start) <= timeout);
+    } catch (Exception e) {
+      LOG.error("Exception in verifier", e);
+    }
+    return false;
+  }
+
+  /**
+   *  Verify the cluster by periodically polling the cluster status and verify.
+   *  The method will be blocked at most 30 seconds.
+   *  Return true if the verify succeed, otherwise return false.
+   *
+   * @return true if succeed, false if not.
+   */
+  public boolean verifyByPolling() {
+    return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
+  }
+
+  protected boolean verifyByCallback(long timeout, List<ClusterVerifyTrigger> triggers)
{
+    _countdown = new CountDownLatch(1);
+
+    for (ClusterVerifyTrigger trigger : triggers) {
+      subscribeTrigger(trigger);
+    }
+
+    boolean success = false;
+    try {
+      success = verifyState();
+      if (!success) {
+
+        success = _countdown.await(timeout, TimeUnit.MILLISECONDS);
+        if (!success) {
+          // make a final try if timeout
+          success = verifyState();
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Exception in verifier", e);
+    }
+
+    // clean up
+    _zkClient.unsubscribeAll();
+
+    return success;
+  }
+
+  private void subscribeTrigger(ClusterVerifyTrigger trigger) {
+    String path = trigger.getTriggerKey().getPath();
+    if (trigger.isTriggerOnDataChange()) {
+      _zkClient.subscribeDataChanges(path, this);
+    }
+
+    if (trigger.isTriggerOnChildChange()) {
+      _zkClient.subscribeChildChanges(path, this);
+    }
+
+    if (trigger.isTriggerOnChildDataChange()) {
+      List<String> childs = _zkClient.getChildren(path);
+      for (String child : childs) {
+        String childPath = String.format("%s/%s", path, child);
+        _zkClient.subscribeDataChanges(childPath, this);
+      }
+    }
+  }
+
+  /**
+   * The method actually performs the required verifications.
+   * @return
+   * @throws Exception
+   */
+  protected abstract boolean verifyState() throws Exception;
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) throws Exception {
+    boolean success = verifyState();
+    if (success) {
+      _countdown.countDown();
+    }
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) throws Exception {
+    _zkClient.unsubscribeDataChanges(dataPath, this);
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds) throws
Exception {
+    for (String child : currentChilds) {
+      String childPath = String.format("%s/%s", parentPath, child);
+      _zkClient.subscribeDataChanges(childPath, this);
+    }
+
+    boolean success = verifyState();
+    if (success) {
+      _countdown.countDown();
+    }
+  }
+
+  public ZkClient getZkClient() {
+    return _zkClient;
+  }
+
+  public String getClusterName() {
+    return _clusterName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/main/java/org/apache/helix/tools/IntegrationTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IntegrationTestUtil.java b/helix-core/src/main/java/org/apache/helix/tools/IntegrationTestUtil.java
index 5cf3a56..2cc145c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/IntegrationTestUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/IntegrationTestUtil.java
@@ -100,7 +100,7 @@ public class IntegrationTestUtil {
 
     ClusterLiveNodesVerifier verifier =
         new ClusterLiveNodesVerifier(_zkclient, clusterName, liveNodes);
-    boolean success = verifier.verifyByPolling(timeoutValue);
+    boolean success = verifier.verify(timeoutValue);
     System.out.println(success ? "Successful" : "Failed");
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
index c1718a7..27e3b07 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
@@ -37,6 +37,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.Sets;
 
+@Deprecated
 public class TestClusterStateVerifier extends ZkUnitTestBase {
   final String[] RESOURCES = {
       "resource0", "resource1"

http://git-wip-us.apache.org/repos/asf/helix/blob/04495e70/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
new file mode 100644
index 0000000..92d8640
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
@@ -0,0 +1,139 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Sets;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+public class TestClusterVerifier extends ZkUnitTestBase {
+  final String[] RESOURCES = {
+      "resource0", "resource1"
+  };
+  private HelixAdmin _admin;
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+  private String _clusterName;
+
+  @BeforeMethod
+  public void beforeMethod() throws InterruptedException {
+    final int NUM_PARTITIONS = 1;
+    final int NUM_REPLICAS = 1;
+
+    // Cluster and resource setup
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    _clusterName = className + "_" + methodName;
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    _admin = setupTool.getClusterManagementTool();
+    setupTool.addCluster(_clusterName, true);
+    setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS,
+        BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.toString());
+    setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS,
+        BuiltInStateModelDefinitions.OnlineOffline.name(), RebalanceMode.SEMI_AUTO.toString());
+
+    // Configure and start the participants
+    _participants = new MockParticipantManager[RESOURCES.length];
+    for (int i = 0; i < _participants.length; i++) {
+      String host = "localhost";
+      int port = 12918 + i;
+      String id = host + '_' + port;
+      setupTool.addInstanceToCluster(_clusterName, id);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id);
+      _participants[i].syncStart();
+    }
+
+    // Rebalance the resources
+    for (int i = 0; i < RESOURCES.length; i++) {
+      IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]);
+      idealState.setReplicas(Integer.toString(NUM_REPLICAS));
+      idealState.getRecord().setListField(RESOURCES[i] + "_0",
+          Arrays.asList(_participants[i].getInstanceName()));
+      _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState);
+    }
+
+    // Start the controller
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    _controller.syncStart();
+    Thread.sleep(1000);
+  }
+
+  @AfterMethod
+  public void afterMethod() {
+    // Cleanup
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _admin.dropCluster(_clusterName);
+  }
+
+  @Test public void testEntireCluster() {
+    // Just ensure that the entire cluster passes
+    // ensure that the external view coalesces
+    HelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
+            .setZkClient(_gZkClient).build();
+
+    boolean result = verifier.verify(10000);
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testResourceSubset() throws InterruptedException {
+    // Ensure that this passes even when one resource is down
+    _admin.enableInstance(_clusterName, "localhost_12918", false);
+    Thread.sleep(1000);
+    _admin.enableCluster(_clusterName, false);
+    _admin.enableInstance(_clusterName, "localhost_12918", true);
+
+
+    HelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
+            .setZkClient(_gZkClient).setResources(Sets.newHashSet(RESOURCES[1])).build();
+
+    boolean result = verifier.verify();
+    Assert.assertTrue(result);
+
+    // But the full cluster verification should fail
+
+    verifier =
+        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
+            .setZkClient(_gZkClient).build();
+
+    result = verifier.verify();
+    Assert.assertFalse(result);
+    _admin.enableCluster(_clusterName, true);
+  }
+}


Mime
View raw message