helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch zooscalability updated: Add integration tests for Helix Java APIs (#892)
Date Tue, 17 Mar 2020 17:35:50 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/zooscalability by this push:
     new 1faaaca  Add integration tests for Helix Java APIs (#892)
1faaaca is described below

commit 1faaaca379459eeaafcb49770582a3a731ad0439
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Tue Mar 17 10:35:43 2020 -0700

    Add integration tests for Helix Java APIs (#892)
    
    This commit adds a comprehensive integration test for Helix Java APIs. All Helix Java
APIs are tested using regular resource rebalancing and task framework.
---
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |  30 +-
 .../java/org/apache/helix/task/TaskDriver.java     |   6 +-
 .../BestPossibleExternalViewVerifier.java          |   4 +-
 .../StrictMatchExternalViewVerifier.java           |   4 +-
 .../src/test/java/org/apache/helix/TestHelper.java |   5 +-
 .../multizk/TestMultiZkHelixJavaApis.java          | 476 +++++++++++++++++++++
 .../helix/integration/task/WorkflowGenerator.java  |  27 +-
 7 files changed, 528 insertions(+), 24 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index bd05ea7..72d5601 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -40,6 +40,7 @@ import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.PathUtils;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -919,7 +920,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
     }
   }
 
-  public static class Builder {
+  public static class Builder<T> {
     private String _zkAddress;
     private RealmAwareZkClient.RealmMode _realmMode;
     private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
@@ -934,39 +935,39 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
     public Builder() {
     }
 
-    public Builder setZkAddress(String zkAddress) {
+    public Builder<T> setZkAddress(String zkAddress) {
       _zkAddress = zkAddress;
       return this;
     }
 
-    public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+    public Builder<T> setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
       _realmMode = realmMode;
       return this;
     }
 
-    public Builder setRealmAwareZkConnectionConfig(
+    public Builder<T> setRealmAwareZkConnectionConfig(
         RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
       _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
       return this;
     }
 
-    public Builder setRealmAwareZkClientConfig(
+    public Builder<T> setRealmAwareZkClientConfig(
         RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
       _realmAwareZkClientConfig = realmAwareZkClientConfig;
       return this;
     }
 
-    public Builder setChrootPath(String chrootPath) {
+    public Builder<T> setChrootPath(String chrootPath) {
       _chrootPath = chrootPath;
       return this;
     }
 
-    public Builder setWtCachePaths(List<String> wtCachePaths) {
+    public Builder<T> setWtCachePaths(List<String> wtCachePaths) {
       _wtCachePaths = wtCachePaths;
       return this;
     }
 
-    public Builder setZkCachePaths(List<String> zkCachePaths) {
+    public Builder<T> setZkCachePaths(List<String> zkCachePaths) {
       _zkCachePaths = zkCachePaths;
       return this;
     }
@@ -977,14 +978,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
      * @param zkClientType
      * @return
      */
-    public Builder setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType) {
+    public Builder<T> setZkClientType(ZkBaseDataAccessor.ZkClientType zkClientType)
{
       _zkClientType = zkClientType;
       return this;
     }
 
-    public ZkCacheBaseDataAccessor build() {
+    public ZkCacheBaseDataAccessor<T> build() {
       validate();
-      return new ZkCacheBaseDataAccessor(this);
+      return new ZkCacheBaseDataAccessor<>(this);
     }
 
     private void validate() {
@@ -997,8 +998,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
         throw new HelixException(
             "ZkCacheBaseDataAccessor: you cannot set ZkClientType on multi-realm mode!");
       }
-      // If ZkClientType is not set, default to SHARED
-      if (!isZkClientTypeSet) {
+      // If ZkClientType is not set and realmMode is single-realm, default to SHARED
+      if (!isZkClientTypeSet && _realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM)
{
         _zkClientType = ZkBaseDataAccessor.ZkClientType.SHARED;
       }
 
@@ -1025,7 +1026,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
 
       // Resolve RealmAwareZkClientConfig
       if (_realmAwareZkClientConfig == null) {
-        _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+        _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig()
+            .setZkSerializer(new ZNRecordSerializer());
       }
 
       // Resolve RealmAwareZkConnectionConfig
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0b8fa17..987cc44 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -38,17 +38,17 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,10 +97,12 @@ public class TaskDriver {
         manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
+  @Deprecated
   public TaskDriver(HelixZkClient client, String clusterName) {
     this(client, new ZkBaseDataAccessor<>(client), clusterName);
   }
 
+  @Deprecated
   public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
       String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 7f57fa9..6d601c4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -135,8 +135,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier
{
     }
 
     public BestPossibleExternalViewVerifier build() {
-      if (_clusterName == null || (_zkAddress == null && _zkClient == null)) {
-        throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+      if (_clusterName == null) {
+        throw new IllegalArgumentException("Cluster name is missing!");
       }
 
       if (_zkClient != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 9c4a587..76f0d09 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -103,8 +103,8 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier
{
     private boolean _isDeactivatedNodeAware = false;
 
     public StrictMatchExternalViewVerifier build() {
-      if (_clusterName == null || (_zkAddress == null && _zkClient == null)) {
-        throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+      if (_clusterName == null) {
+        throw new IllegalArgumentException("Cluster name is missing!");
       }
 
       if (_zkClient != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 9cac992..adb4812 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -47,6 +47,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -295,12 +296,12 @@ public class TestHelper {
     zkClient.close();
   }
 
-  public static void dropCluster(String clusterName, HelixZkClient zkClient) throws Exception
{
+  public static void dropCluster(String clusterName, RealmAwareZkClient zkClient) {
     ClusterSetup setupTool = new ClusterSetup(zkClient);
     dropCluster(clusterName, zkClient, setupTool);
   }
 
-  public static void dropCluster(String clusterName, HelixZkClient zkClient, ClusterSetup
setup) {
+  public static void dropCluster(String clusterName, RealmAwareZkClient zkClient, ClusterSetup
setup) {
     String namespace = "/" + clusterName;
     if (zkClient.exists(namespace)) {
       try {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
new file mode 100644
index 0000000..be8bb0b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -0,0 +1,476 @@
+package org.apache.helix.integration.multizk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.InstanceType;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * TestMultiZkHelixJavaApis spins up multiple in-memory ZooKeepers with a pre-configured
+ * cluster-Zk realm routing information.
+ * This test verifies that all Helix Java APIs work as expected.
+ */
+public class TestMultiZkHelixJavaApis {
+  private static final int NUM_ZK = 3;
+  private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
+  private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new HashMap<>();
+  private static final Map<String, ClusterControllerManager> MOCK_CONTROLLERS = new
HashMap<>();
+  private static final Set<MockParticipantManager> MOCK_PARTICIPANTS = new HashSet<>();
+  private static final List<String> CLUSTER_LIST =
+      ImmutableList.of("CLUSTER_1", "CLUSTER_2", "CLUSTER_3");
+
+  private MockMetadataStoreDirectoryServer _msds;
+  private static final Map<String, Collection<String>> _rawRoutingData = new
HashMap<>();
+  private RealmAwareZkClient _zkClient;
+  private HelixAdmin _zkHelixAdmin;
+
+  // Save System property configs from before this test and pass onto after the test
+  private Map<String, String> _configStore = new HashMap<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Create 3 in-memory zookeepers and routing mapping
+    final String zkPrefix = "localhost:";
+    final int zkStartPort = 8777;
+
+    for (int i = 0; i < NUM_ZK; i++) {
+      String zkAddress = zkPrefix + (zkStartPort + i);
+      ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
+      ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+              new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())));
+
+      // One cluster per ZkServer created
+      _rawRoutingData.put(zkAddress, Collections.singletonList("/" + CLUSTER_LIST.get(i)));
+    }
+
+    // Create a Mock MSDS
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    _msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
+        _rawRoutingData);
+    _msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    if (prevMultiZkEnabled != null) {
+      _configStore.put(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint != null) {
+      _configStore
+          .put(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
+    }
+
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+
+    // Create a FederatedZkClient for admin work
+    _zkClient =
+        new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+            new RealmAwareZkClient.RealmAwareZkClientConfig());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    try {
+      // Kill all mock controllers and participants
+      MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
+      MOCK_PARTICIPANTS.forEach(MockParticipantManager::syncStop);
+
+      // Tear down all clusters
+      CLUSTER_LIST.forEach(cluster -> TestHelper.dropCluster(cluster, _zkClient));
+
+      // Verify that all clusters are gone in each zookeeper
+      Assert.assertTrue(TestHelper.verify(() -> {
+        for (Map.Entry<String, HelixZkClient> zkClientEntry : ZK_CLIENT_MAP.entrySet())
{
+          List<String> children = zkClientEntry.getValue().getChildren("/");
+          if (children.stream().anyMatch(CLUSTER_LIST::contains)) {
+            return false;
+          }
+        }
+        return true;
+      }, TestHelper.WAIT_DURATION));
+
+      // Tear down zookeepers
+      ZK_SERVER_MAP.forEach((zkAddress, zkServer) -> zkServer.shutdown());
+
+      // Stop MockMSDS
+      _msds.stopServer();
+    } finally {
+      // Restore System property configs
+      if (_configStore.containsKey(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+        System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
+            _configStore.get(SystemPropertyKeys.MULTI_ZK_ENABLED));
+      } else {
+        System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+      }
+      if (_configStore.containsKey(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY))
{
+        System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+            _configStore.get(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY));
+      } else {
+        System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+      }
+    }
+  }
+
+  /**
+   * Test cluster creation according to the pre-set routing mapping.
+   * Helix Java API tested is ClusterSetup in this method.
+   */
+  @Test
+  public void testCreateClusters() {
+    // Create two ClusterSetups using two different constructors
+    // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
+    ClusterSetup clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
+    ClusterSetup clusterSetupBuilder = new ClusterSetup.Builder().build();
+
+    createClusters(clusterSetupZkAddr);
+    verifyClusterCreation(clusterSetupZkAddr);
+
+    createClusters(clusterSetupBuilder);
+    verifyClusterCreation(clusterSetupBuilder);
+
+    // Create clusters again to continue with testing
+    createClusters(clusterSetupBuilder);
+  }
+
+  private void createClusters(ClusterSetup clusterSetup) {
+    // Create clusters
+    for (String clusterName : CLUSTER_LIST) {
+      clusterSetup.addCluster(clusterName, false);
+    }
+  }
+
+  private void verifyClusterCreation(ClusterSetup clusterSetup) {
+    // Verify that clusters have been created correctly according to routing mapping
+    _rawRoutingData.forEach((zkAddress, cluster) -> {
+      // Note: clusterNamePath already contains "/"
+      String clusterNamePath = cluster.iterator().next();
+
+      // Check with single-realm ZkClients
+      Assert.assertTrue(ZK_CLIENT_MAP.get(zkAddress).exists(clusterNamePath));
+      // Check with realm-aware ZkClient (federated)
+      Assert.assertTrue(_zkClient.exists(clusterNamePath));
+
+      // Remove clusters
+      clusterSetup
+          .deleteCluster(clusterNamePath.substring(1)); // Need to remove "/" at the beginning
+    });
+  }
+
+  /**
+   * Test Helix Participant creation and addition.
+   * Helix Java APIs tested in this method are:
+   * ZkHelixAdmin and ZKHelixManager (mock participant/controller)
+   */
+  @Test(dependsOnMethods = "testCreateClusters")
+  public void testCreateParticipants() throws Exception {
+    // Create two ClusterSetups using two different constructors
+    // Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
+    HelixAdmin helixAdminZkAddr = new ZKHelixAdmin(ZK_SERVER_MAP.keySet().iterator().next());
+    HelixAdmin helixAdminBuilder = new ZKHelixAdmin.Builder().build();
+    _zkHelixAdmin = helixAdminBuilder;
+
+    String participantNamePrefix = "Node_";
+    int numParticipants = 5;
+    createParticipantsAndVerify(helixAdminZkAddr, numParticipants, participantNamePrefix);
+    createParticipantsAndVerify(helixAdminBuilder, numParticipants, participantNamePrefix);
+
+    // Create mock controller and participants for next tests
+    for (String cluster : CLUSTER_LIST) {
+      // Start a controller
+      // Note: in multiZK mode, ZK Addr is ignored
+      ClusterControllerManager mockController =
+          new ClusterControllerManager("DummyZK", cluster, "controller");
+      mockController.syncStart();
+      MOCK_CONTROLLERS.put(cluster, mockController);
+
+      for (int i = 0; i < numParticipants; i++) {
+        // Note: in multiZK mode, ZK Addr is ignored
+        InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
+        helixAdminBuilder.addInstance(cluster, instanceConfig);
+        MockParticipantManager mockNode =
+            new MockParticipantManager("DummyZK", cluster, participantNamePrefix + i);
+
+        // Register task state model for task framework testing in later methods
+        Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+        taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
+        // Register a Task state model factory.
+        StateMachineEngine stateMachine = mockNode.getStateMachineEngine();
+        stateMachine
+            .registerStateModelFactory("Task", new TaskStateModelFactory(mockNode, taskFactoryReg));
+
+        mockNode.syncStart();
+        MOCK_PARTICIPANTS.add(mockNode);
+      }
+      // Check that mockNodes are up
+      Assert.assertTrue(TestHelper
+          .verify(() -> helixAdminBuilder.getInstancesInCluster(cluster).size() == numParticipants,
+              TestHelper.WAIT_DURATION));
+    }
+  }
+
+  private void createParticipantsAndVerify(HelixAdmin admin, int numParticipants,
+      String participantNamePrefix) {
+    // Create participants in clusters
+    Set<String> participantNames = new HashSet<>();
+    CLUSTER_LIST.forEach(cluster -> {
+      for (int i = 0; i < numParticipants; i++) {
+        String participantName = participantNamePrefix + i;
+        participantNames.add(participantName);
+        InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
+        admin.addInstance(cluster, instanceConfig);
+      }
+    });
+
+    // Verify participants have been created properly
+    _rawRoutingData.forEach((zkAddress, cluster) -> {
+      // Note: clusterNamePath already contains "/"
+      String clusterNamePath = cluster.iterator().next();
+
+      // Check with single-realm ZkClients
+      List<String> instances =
+          ZK_CLIENT_MAP.get(zkAddress).getChildren(clusterNamePath + "/INSTANCES");
+      Assert.assertEquals(new HashSet<>(instances), participantNames);
+
+      // Check with realm-aware ZkClient (federated)
+      instances = _zkClient.getChildren(clusterNamePath + "/INSTANCES");
+      Assert.assertEquals(new HashSet<>(instances), participantNames);
+
+      // Remove Participants
+      participantNames.forEach(participant -> {
+        InstanceConfig instanceConfig = new InstanceConfig(participant);
+        admin.dropInstance(clusterNamePath.substring(1), instanceConfig);
+      });
+    });
+  }
+
+  /**
+   * Test that clusters and instances are set up properly.
+   * Helix Java APIs tested in this method is ZkUtil.
+   */
+  @Test(dependsOnMethods = "testCreateParticipants")
+  public void testZkUtil() {
+    CLUSTER_LIST.forEach(cluster -> {
+      _zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
+          .isInstanceSetup("DummyZk", cluster, instance, InstanceType.PARTICIPANT));
+    });
+  }
+
+  /**
+   * Create resources and see if things get rebalanced correctly.
+   * Helix Java API tested in this methods are:
+   * ZkBaseDataAccessor
+   * ZkHelixClusterVerifier (BestPossible)
+   */
+  @Test(dependsOnMethods = "testZkUtil")
+  public void testCreateAndRebalanceResources() {
+    BaseDataAccessor<ZNRecord> dataAccessorZkAddr = new ZkBaseDataAccessor<>("DummyZk");
+    BaseDataAccessor<ZNRecord> dataAccessorBuilder =
+        new ZkBaseDataAccessor.Builder<ZNRecord>().build();
+
+    String resourceNamePrefix = "DB_";
+    int numResources = 5;
+    int numPartitions = 3;
+    Map<String, Map<String, ZNRecord>> idealStateMap = new HashMap<>();
+
+    for (String cluster : CLUSTER_LIST) {
+      Set<String> resourceNames = new HashSet<>();
+      Set<String> liveInstancesNames = new HashSet<>(dataAccessorZkAddr
+          .getChildNames("/" + cluster + "/LIVEINSTANCES", AccessOption.PERSISTENT));
+
+      for (int i = 0; i < numResources; i++) {
+        String resource = cluster + "_" + resourceNamePrefix + i;
+        _zkHelixAdmin.addResource(cluster, resource, numPartitions, "MasterSlave",
+            IdealState.RebalanceMode.FULL_AUTO.name());
+        _zkHelixAdmin.rebalance(cluster, resource, 3);
+        resourceNames.add(resource);
+
+        // Update IdealState fields with ZkBaseDataAccessor
+        String resourcePath = "/" + cluster + "/IDEALSTATES/" + resource;
+        ZNRecord is = dataAccessorZkAddr.get(resourcePath, null, AccessOption.PERSISTENT);
+        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCER_CLASS_NAME.name(),
+            DelayedAutoRebalancer.class.getName());
+        is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCE_STRATEGY.name(),
+            CrushEdRebalanceStrategy.class.getName());
+        dataAccessorZkAddr.set(resourcePath, is, AccessOption.PERSISTENT);
+        idealStateMap.computeIfAbsent(cluster, recordList -> new HashMap<>())
+            .putIfAbsent(is.getId(), is); // Save ZNRecord for comparison later
+      }
+
+      // Create a verifier to make sure all resources have been rebalanced
+      ZkHelixClusterVerifier verifier =
+          new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
+              .setExpectLiveInstances(liveInstancesNames).build();
+      Assert.assertTrue(verifier.verifyByPolling());
+    }
+
+    // Using the ZkBaseDataAccessor created using the Builder, check that the correct IS
is read
+    for (String cluster : CLUSTER_LIST) {
+      Map<String, ZNRecord> savedIdealStates = idealStateMap.get(cluster);
+      List<String> resources = dataAccessorBuilder
+          .getChildNames("/" + cluster + "/IDEALSTATES", AccessOption.PERSISTENT);
+      resources.forEach(resource -> {
+        ZNRecord is = dataAccessorBuilder
+            .get("/" + cluster + "/IDEALSTATES/" + resource, null, AccessOption.PERSISTENT);
+        Assert
+            .assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
+      });
+    }
+  }
+
+  /**
+   * This method tests ConfigAccessor.
+   */
+  @Test(dependsOnMethods = "testCreateAndRebalanceResources")
+  public void testConfigAccessor() {
+    // Build two ConfigAccessors to read and write:
+    // 1. ConfigAccessor using a deprecated constructor
+    // 2. ConfigAccessor using the Builder
+    ConfigAccessor configAccessorZkAddr = new ConfigAccessor("DummyZk");
+    ConfigAccessor configAccessorBuilder = new ConfigAccessor.Builder().build();
+
+    setClusterConfigAndVerify(configAccessorZkAddr);
+    setClusterConfigAndVerify(configAccessorBuilder);
+  }
+
+  private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
+    _rawRoutingData.forEach((zkAddr, clusterNamePathList) -> {
+      // Need to rid of "/" because this is a sharding key
+      String cluster = clusterNamePathList.iterator().next().substring(1);
+      ClusterConfig clusterConfig = new ClusterConfig(cluster);
+      clusterConfig.getRecord().setSimpleField("configAccessor", cluster);
+      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+
+      // Now check with a single-realm ConfigAccessor
+      ConfigAccessor configAccessorSingleZk =
+          new ConfigAccessor.Builder().setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+              .setZkAddress(zkAddr).build();
+      Assert.assertEquals(configAccessorSingleZk.getClusterConfig(cluster), clusterConfig);
+
+      // Also check with a single-realm dedicated ZkClient
+      ZNRecord clusterConfigRecord =
+          ZK_CLIENT_MAP.get(zkAddr).readData("/" + cluster + "/CONFIGS/CLUSTER/" + cluster);
+      Assert.assertEquals(clusterConfigRecord, clusterConfig.getRecord());
+
+      // Clean up
+      clusterConfig = new ClusterConfig(cluster);
+      configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
+    });
+  }
+
+  /**
+   * This test submits multiple tasks to be run.
+   * The Helix Java APIs tested in this method are TaskDriver (HelixManager) and
+   * ZkHelixPropertyStore/ZkCacheBaseDataAccessor.
+   */
+  @Test(dependsOnMethods = "testConfigAccessor")
+  public void testTaskFramework() throws InterruptedException {
+    // Note: TaskDriver is like HelixManager - it only operates on one designated
+    // Create TaskDrivers for all clusters
+    Map<String, TaskDriver> taskDriverMap = new HashMap<>();
+    MOCK_CONTROLLERS
+        .forEach((cluster, controller) -> taskDriverMap.put(cluster, new TaskDriver(controller)));
+
+    // Create a Task Framework workload and start
+    Workflow workflow = WorkflowGenerator.generateNonTargetedSingleWorkflowBuilder("job").build();
+    for (TaskDriver taskDriver : taskDriverMap.values()) {
+      taskDriver.start(workflow);
+    }
+
+    // Use multi-ZK ZkHelixPropertyStore/ZkCacheBaseDataAccessor to query for workflow/job
states
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore.Builder<ZNRecord>().build();
+    for (Map.Entry<String, TaskDriver> entry : taskDriverMap.entrySet()) {
+      String cluster = entry.getKey();
+      TaskDriver driver = entry.getValue();
+      // Wait until workflow has completed
+      TaskState wfStateFromTaskDriver =
+          driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
+      String workflowContextPath =
+          "/" + cluster + "/PROPERTYSTORE/TaskRebalancer/" + workflow.getName() + "/Context";
+      ZNRecord workflowContextRecord =
+          propertyStore.get(workflowContextPath, null, AccessOption.PERSISTENT);
+      WorkflowContext context = new WorkflowContext(workflowContextRecord);
+
+      // Compare the workflow state read from PropertyStore and TaskDriver
+      Assert.assertEquals(context.getWorkflowState(), wfStateFromTaskDriver);
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 40e2dcf..c582ab3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,11 +19,14 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.Workflow;
 
 /**
@@ -35,12 +38,14 @@ public class WorkflowGenerator {
   public static final String JOB_NAME_2 = "SomeJob2";
 
   public static final Map<String, String> DEFAULT_JOB_CONFIG;
+  public static final Map<String, String> DEFAULT_JOB_CONFIG_NOT_TARGETED;
   static {
     Map<String, String> tmpMap = new TreeMap<String, String>();
-    tmpMap.put("TargetResource", DEFAULT_TGT_DB);
-    tmpMap.put("TargetPartitionStates", "MASTER");
     tmpMap.put("Command", MockTask.TASK_COMMAND);
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
+    DEFAULT_JOB_CONFIG_NOT_TARGETED = Collections.unmodifiableMap(new TreeMap<>(tmpMap));
+    tmpMap.put("TargetResource", DEFAULT_TGT_DB);
+    tmpMap.put("TargetPartitionStates", "MASTER");
     DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
   }
 
@@ -57,6 +62,24 @@ public class WorkflowGenerator {
     return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
   }
 
+  public static Workflow.Builder generateNonTargetedSingleWorkflowBuilder(String jobName)
{
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(DEFAULT_JOB_CONFIG_NOT_TARGETED);
+    jobBuilder.setJobCommandConfigMap(DEFAULT_COMMAND_CONFIG);
+
+    // Create 5 TaskConfigs
+    List<TaskConfig> taskConfigs = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
+      taskConfigBuilder.setTaskId("task_" + i);
+      taskConfigBuilder.addConfig("Timeout", String.valueOf(2000));
+      taskConfigBuilder.setCommand(MockTask.TASK_COMMAND);
+      taskConfigs.add(taskConfigBuilder.build());
+    }
+
+    jobBuilder.addTaskConfigs(taskConfigs);
+    return generateSingleJobWorkflowBuilder(jobName, jobBuilder);
+  }
+
   public static Workflow.Builder generateSingleJobWorkflowBuilder(String jobName,
       JobConfig.Builder jobBuilder) {
     return new Workflow.Builder(jobName).addJobConfig(jobName, jobBuilder);


Mime
View raw message