helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch wagedRebalancer updated: Add REST API endpoints for WAGED Rebalancer (#611)
Date Tue, 26 Nov 2019 06:23:57 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 83efcfa  Add REST API endpoints for WAGED Rebalancer (#611)
83efcfa is described below

commit 83efcfa91a7069af7414e27ba515eef1cbb43f5c
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Mon Nov 25 22:23:47 2019 -0800

    Add REST API endpoints for WAGED Rebalancer (#611)
    
    We want to make WAGED rebalancer (weight-aware) easier to use. One way to do this is to
allow the user to easily add resources with weight configuration set by providing REST endpoints.
This change adds the relevant REST endpoints based on the HelixAdmin APIs added in (#570).
    
    Basically, this commit uses existing REST endpoints whose hierarchy is defined by REST
resource. What this commit does to the existing endpoints is 1) Add extra commands 2) Add
a WAGED command as a QueryParam so that WAGED logic could be included.
    
    This change is backward-compatible because it keeps the original behavior when no commands
are provided by using @DefaultValue annotation.
---
 .../rest/server/resources/AbstractResource.java    |  10 +-
 .../server/resources/helix/ClusterAccessor.java    |  10 +-
 .../server/resources/helix/InstancesAccessor.java  |  88 +++++++++---
 .../resources/helix/PerInstanceAccessor.java       |  48 +++++--
 .../server/resources/helix/ResourceAccessor.java   | 152 ++++++++++++++++-----
 .../helix/rest/server/AbstractTestClass.java       |   5 +-
 .../helix/rest/server/TestClusterAccessor.java     |  13 ++
 .../helix/rest/server/TestInstancesAccessor.java   |  69 ++++++++++
 .../helix/rest/server/TestPerInstanceAccessor.java |  48 +++++++
 .../helix/rest/server/TestResourceAccessor.java    | 117 +++++++++++++++-
 .../rest/server/util/JerseyUriRequestBuilder.java  |   3 +-
 11 files changed, 489 insertions(+), 74 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 5128493..90d4add 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -66,7 +66,15 @@ public class AbstractResource {
     rebalance,
     reset,
     resetPartitions,
-    removeInstanceTag
+    removeInstanceTag,
+    addResource,
+    addWagedResource,
+    getResource,
+    validateWeight,
+    enableWagedRebalance,
+    enableWagedRebalanceForAllResources,
+    getInstance,
+    getAllInstances
   }
 
   @Context
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index edfc847..a0f2b55 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -240,7 +240,15 @@ public class ClusterAccessor extends AbstractHelixResource {
       helixAdmin.manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode,
           content, customFieldsMap);
       break;
-
+    case enableWagedRebalanceForAllResources:
+      // Enable WAGED rebalance for all resources in the cluster
+      List<String> resources = helixAdmin.getResourcesInCluster(clusterId);
+      try {
+        helixAdmin.enableWagedRebalance(clusterId, resources);
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      break;
     default:
       return badRequest("Unsupported command " + command);
     }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 7191e517..94a44bc 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -1,5 +1,24 @@
 package org.apache.helix.rest.server.resources.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -8,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -60,41 +80,65 @@ public class InstancesAccessor extends AbstractHelixResource {
   }
 
   @GET
-  public Response getAllInstances(@PathParam("clusterId") String clusterId) {
+  public Response getAllInstances(@PathParam("clusterId") String clusterId,
+      @DefaultValue("getAllInstances") @QueryParam("command") String command) {
+    // Get the command. If not provided, the default would be "getAllInstances"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+
     HelixDataAccessor accessor = getDataAccssor(clusterId);
     List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
-
     if (instances == null) {
       return notFound();
     }
 
-    ObjectNode root = JsonNodeFactory.instance.objectNode();
-    root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
+    switch (cmd) {
+    case getAllInstances:
+      ObjectNode root = JsonNodeFactory.instance.objectNode();
+      root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
 
-    ArrayNode instancesNode = root.putArray(InstancesAccessor.InstancesProperties.instances.name());
-    instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
-    ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
-    ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
+      ArrayNode instancesNode =
+          root.putArray(InstancesAccessor.InstancesProperties.instances.name());
+      instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
+      ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
+      ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
 
-    List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
-    ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+      List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
+      ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
 
-    for (String instanceName : instances) {
-      InstanceConfig instanceConfig =
-          accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
-      if (instanceConfig != null) {
-        if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances()
!= null
-            && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
-          disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
-        }
+      for (String instanceName : instances) {
+        InstanceConfig instanceConfig =
+            accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+        if (instanceConfig != null) {
+          if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances()
!= null
+              && clusterConfig.getDisabledInstances().containsKey(instanceName)))
{
+            disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          }
 
-        if (liveInstances.contains(instanceName)){
-          onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          if (liveInstances.contains(instanceName)) {
+            onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          }
         }
       }
+      return JSONRepresentation(root);
+    case validateWeight:
+      // Validate all instances for WAGED rebalance
+      HelixAdmin admin = getHelixAdmin();
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateInstancesForWagedRebalance(clusterId, instances);
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      _logger.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
     }
-
-    return JSONRepresentation(root);
   }
 
   @POST
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 6b615ee..ac67350 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -20,7 +20,9 @@ package org.apache.helix.rest.server.resources.helix;
  */
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -67,16 +69,41 @@ public class PerInstanceAccessor extends AbstractHelixResource {
 
   @GET
   public Response getInstanceById(@PathParam("clusterId") String clusterId,
-      @PathParam("instanceName") String instanceName) throws IOException {
-    ObjectMapper objectMapper = new ObjectMapper();
-    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
-    // TODO reduce GC by dependency injection
-    InstanceService instanceService =
-        new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor),
getConfigAccessor());
-    InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
-        InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
+      @PathParam("instanceName") String instanceName,
+      @DefaultValue("getInstance") @QueryParam("command") String command) throws IOException
{
+    // Get the command. If not provided, the default would be "getInstance"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
 
-    return OK(objectMapper.writeValueAsString(instanceInfo));
+    switch (cmd) {
+    case getInstance:
+      ObjectMapper objectMapper = new ObjectMapper();
+      HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+      // TODO reduce GC by dependency injection
+      InstanceService instanceService = new InstanceServiceImpl(
+          new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
+      InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
+          InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
+      return OK(objectMapper.writeValueAsString(instanceInfo));
+    case validateWeight:
+      // Validates instanceConfig for WAGED rebalance
+      HelixAdmin admin = getHelixAdmin();
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateInstancesForWagedRebalance(clusterId,
+            Collections.singletonList(instanceName));
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      LOG.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
+    }
   }
 
   @POST
@@ -331,7 +358,8 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     return notFound();
   }
 
-  @GET @Path("errors")
+  @GET
+  @Path("errors")
   public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
       @PathParam("instanceName") String instanceName) throws IOException {
     HelixDataAccessor accessor = getDataAccssor(clusterId);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index cce88bf..61cf88a 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -48,6 +48,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.node.ArrayNode;
@@ -160,33 +161,56 @@ public class ResourceAccessor extends AbstractHelixResource {
   @GET
   @Path("{resourceName}")
   public Response getResource(@PathParam("clusterId") String clusterId,
-      @PathParam("resourceName") String resourceName) {
+      @PathParam("resourceName") String resourceName,
+      @DefaultValue("getResource") @QueryParam("command") String command) {
+    // Get the command. If not provided, the default would be "getResource"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
     ConfigAccessor accessor = getConfigAccessor();
     HelixAdmin admin = getHelixAdmin();
 
-    ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
-    IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
-    ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+    switch (cmd) {
+    case getResource:
+      ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
+      IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+      ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
 
-    Map<String, ZNRecord> resourceMap = new HashMap<>();
-    if (idealState != null) {
-      resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
-    } else {
-      return notFound();
-    }
+      Map<String, ZNRecord> resourceMap = new HashMap<>();
+      if (idealState != null) {
+        resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
+      } else {
+        return notFound();
+      }
 
-    resourceMap.put(ResourceProperties.resourceConfig.name(), null);
-    resourceMap.put(ResourceProperties.externalView.name(), null);
+      resourceMap.put(ResourceProperties.resourceConfig.name(), null);
+      resourceMap.put(ResourceProperties.externalView.name(), null);
 
-    if (resourceConfig != null) {
-      resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
-    }
+      if (resourceConfig != null) {
+        resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
+      }
 
-    if (externalView != null) {
-      resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
+      if (externalView != null) {
+        resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
+      }
+      return JSONRepresentation(resourceMap);
+    case validateWeight:
+      // Validate ResourceConfig for WAGED rebalance
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateResourcesForWagedRebalance(clusterId,
+            Collections.singletonList(resourceName));
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      _logger.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
     }
-
-    return JSONRepresentation(resourceMap);
   }
 
   @PUT
@@ -199,32 +223,81 @@ public class ResourceAccessor extends AbstractHelixResource {
       @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy,
       @DefaultValue("0") @QueryParam("bucketSize") int bucketSize,
       @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance,
-      String content) {
-
+      @DefaultValue("addResource") @QueryParam("command") String command, String content)
{
+    // Get the command. If not provided, the default would be "addResource"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
     HelixAdmin admin = getHelixAdmin();
-
     try {
-      if (content.length() != 0) {
-        ZNRecord record;
+      switch (cmd) {
+      case addResource:
+        if (content.length() != 0) {
+          ZNRecord record;
+          try {
+            record = toZNRecord(content);
+          } catch (IOException e) {
+            _logger.error("Failed to deserialize user's input " + content + ", Exception:
" + e);
+            return badRequest("Input is not a valid ZNRecord!");
+          }
+
+          if (record.getSimpleFields() != null) {
+            admin.addResource(clusterId, resourceName, new IdealState(record));
+          }
+        } else {
+          admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
+              rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
+        }
+        break;
+      case addWagedResource:
+        // Check if content is valid
+        if (content == null || content.length() == 0) {
+          _logger.error("Input is null or empty!");
+          return badRequest("Input is null or empty!");
+        }
+        Map<String, ZNRecord> input;
+        // Content must supply both IdealState and ResourceConfig
         try {
-          record = toZNRecord(content);
+          TypeReference<Map<String, ZNRecord>> typeRef =
+              new TypeReference<Map<String, ZNRecord>>() {
+              };
+          input = OBJECT_MAPPER.readValue(content, typeRef);
         } catch (IOException e) {
-          _logger.error("Failed to deserialize user's input " + content + ", Exception: "
+ e);
-          return badRequest("Input is not a vaild ZNRecord!");
+          _logger.error("Failed to deserialize user's input {}, Exception: {}", content,
e);
+          return badRequest("Input is not a valid map of String-ZNRecord pairs!");
         }
-
-        if (record.getSimpleFields() != null) {
-          admin.addResource(clusterId, resourceName, new IdealState(record));
+        // Check if the map contains both IdealState and ResourceConfig
+        ZNRecord idealStateRecord =
+            input.get(ResourceAccessor.ResourceProperties.idealState.name());
+        ZNRecord resourceConfigRecord =
+            input.get(ResourceAccessor.ResourceProperties.resourceConfig.name());
+
+        if (idealStateRecord == null || resourceConfigRecord == null) {
+          _logger.error("Input does not contain both IdealState and ResourceConfig!");
+          return badRequest("Input does not contain both IdealState and ResourceConfig!");
         }
-      } else {
-        admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
-            rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
+        // Add using HelixAdmin API
+        try {
+          admin.addResourceWithWeight(clusterId, new IdealState(idealStateRecord),
+              new ResourceConfig(resourceConfigRecord));
+        } catch (HelixException e) {
+          String errMsg = String.format("Failed to add resource %s with weight in cluster
%s!",
+              idealStateRecord.getId(), clusterId);
+          _logger.error(errMsg, e);
+          return badRequest(errMsg);
+        }
+        break;
+      default:
+        _logger.error("Unsupported command :" + command);
+        return badRequest("Unsupported command :" + command);
       }
     } catch (Exception e) {
       _logger.error("Error in adding a resource: " + resourceName, e);
       return serverError(e);
     }
-
     return OK();
   }
 
@@ -258,6 +331,13 @@ public class ResourceAccessor extends AbstractHelixResource {
         keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
         admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
         break;
+      case enableWagedRebalance:
+        try {
+          admin.enableWagedRebalance(clusterId, Collections.singletonList(resourceName));
+        } catch (HelixException e) {
+          return badRequest(e.getMessage());
+        }
+        break;
       default:
         _logger.error("Unsupported command :" + command);
         return badRequest("Unsupported command :" + command);
@@ -317,7 +397,7 @@ public class ResourceAccessor extends AbstractHelixResource {
       record = toZNRecord(content);
     } catch (IOException e) {
       _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
-      return badRequest("Input is not a vaild ZNRecord!");
+      return badRequest("Input is not a valid ZNRecord!");
     }
     ResourceConfig resourceConfig = new ResourceConfig(record);
     ConfigAccessor configAccessor = getConfigAccessor();
@@ -379,7 +459,7 @@ public class ResourceAccessor extends AbstractHelixResource {
       record = toZNRecord(content);
     } catch (IOException e) {
       _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
-      return badRequest("Input is not a vaild ZNRecord!");
+      return badRequest("Input is not a valid ZNRecord!");
     }
     IdealState idealState = new IdealState(record);
     HelixAdmin helixAdmin = getHelixAdmin();
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 347be89..5e73c37 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -473,8 +473,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
     final Response response = webTarget.request().get();
     Assert.assertEquals(response.getStatus(), expectedReturnStatus);
 
-    // NOT_FOUND will throw text based html
-    if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode()) {
+    // NOT_FOUND and BAD_REQUEST will throw text based html
+    if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode()
+        && expectedReturnStatus != Response.Status.BAD_REQUEST.getStatusCode()) {
       Assert.assertEquals(response.getMediaType().getType(), "application");
     } else {
       Assert.assertEquals(response.getMediaType().getType(), "text");
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 32ea5ed..d39b2d8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKUtil;
@@ -565,6 +566,18 @@ public class TestClusterAccessor extends AbstractTestClass {
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testActivateSuperCluster")
+  public void testEnableWagedRebalanceForAllResources() {
+    String cluster = "TestCluster_2";
+    post("clusters/" + cluster, ImmutableMap.of("command", "enableWagedRebalanceForAllResources"),
+        Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
+    for (String resource : _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster))
{
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      Assert.assertEquals(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+    }
+  }
+
   private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException {
     String body = get("clusters/" + cluster + "/configs", null, Response.Status.OK.getStatusCode(),
true);
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 645866e..c5b28d5 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -1,6 +1,26 @@
 package org.apache.helix.rest.server;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -154,6 +174,55 @@ public class TestInstancesAccessor extends AbstractTestClass {
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testGetAllInstances")
+  public void testValidateWeightForAllInstances() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Empty out ClusterConfig's weight key setting for testing
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.getRecord().setListField(
+        ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), new ArrayList<>());
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    // Issue a validate call
+    String body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME).get(this);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because there's no capacity
keys set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.booleanValue()));
+
+    clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Since instances do not have weight-related configs, the result should return error
+    Assert.assertTrue(node.has("error"));
+
+    // Now set weight-related configs in InstanceConfigs
+    List<String> instances =
+        _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME);
+    for (String instance : instances) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(ImmutableMap.of("FOO", 1000, "BAR", 1000));
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME)
+        .expectedReturnStatusCode(Response.Status.OK.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because capacity keys are set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.booleanValue()));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
   private Set<String> getStringSet(JsonNode jsonNode, String key) {
     Set<String> result = new HashSet<>();
     jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 9a368fc..2062933 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -36,6 +36,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.rest.server.resources.AbstractResource;
@@ -400,4 +401,51 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
         .format(CLUSTER_NAME, instanceName).post(this, entity);
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
+
+  /**
+   * Check that validateWeightForInstance() works by
+   * 1. First call validate -> We should get "true" because nothing is set in ClusterConfig.
+   * 2. Define keys in ClusterConfig and call validate -> We should get BadRequest.
+   * 3. Define weight configs in InstanceConfig and call validate -> We should get OK
with "true".
+   */
+  @Test(dependsOnMethods = "checkUpdateFails")
+  public void testValidateWeightForInstance() throws IOException {
+    // Get one instance in the cluster
+    String instance = _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME)
+        .iterator().next();
+
+    // Issue a validate call
+    String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance).get(this);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    // Must have the result saying (true) because there's no capacity keys set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.getBooleanValue()));
+
+    // Define keys in ClusterConfig
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Since instance does not have weight-related configs, the result should return error
+    Assert.assertTrue(node.has("error"));
+
+    // Now set weight-related config in InstanceConfig
+    InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+    instanceConfig.setInstanceCapacityMap(ImmutableMap.of("FOO", 1000, "BAR", 1000));
+    _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance)
+        .expectedReturnStatusCode(Response.Status.OK.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because capacity keys are set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.getBooleanValue()));
+  }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index acc6ce4..4f75442 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -41,8 +41,11 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.helix.rest.server.resources.helix.ResourceAccessor;
@@ -431,10 +434,30 @@ public class TestResourceAccessor extends AbstractTestClass {
   }
 
   /**
+   * Test "enableWagedRebalance" command of updateResource.
+   */
+  @Test(dependsOnMethods = "updateResourceIdealState")
+  public void testEnableWagedRebalance() {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    Assert.assertNotSame(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+
+    // Enable waged rebalance, which should change the rebalancer class name
+    Entity entity = Entity.entity(null, MediaType.APPLICATION_JSON_TYPE);
+    post("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME,
+        Collections.singletonMap("command", "enableWagedRebalance"), entity,
+        Response.Status.OK.getStatusCode());
+
+    idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    Assert.assertEquals(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+  }
+
+  /**
    * Test "delete" command of updateResourceIdealState.
    * @throws Exception
    */
-  @Test(dependsOnMethods = "updateResourceIdealState")
+  @Test(dependsOnMethods = "testEnableWagedRebalance")
   public void deleteFromResourceIdealState() throws Exception {
     String zkPath = PropertyPathBuilder.idealState(CLUSTER_NAME, RESOURCE_NAME);
     ZNRecord record = new ZNRecord(RESOURCE_NAME);
@@ -472,6 +495,98 @@ public class TestResourceAccessor extends AbstractTestClass {
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "deleteFromResourceIdealState")
+  public void testAddResourceWithWeight() throws IOException {
+    // Test case 1: Add a valid resource with valid weights
+    // Create a resource with IdealState and ResourceConfig
+    String wagedResourceName = "newWagedResource";
+
+    // Create an IdealState on full-auto with 1 partition
+    IdealState idealState = new IdealState(wagedResourceName);
+    idealState.getRecord().getSimpleFields().putAll(_gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME).getRecord().getSimpleFields());
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    idealState.setNumPartitions(1); // 1 partition for convenience of testing
+
+    // Create a ResourceConfig with FOO and BAR at 100 respectively
+    ResourceConfig resourceConfig = new ResourceConfig(wagedResourceName);
+    Map<String, Map<String, Integer>> partitionCapacityMap = new HashMap<>();
+    Map<String, Integer> partitionCapacity = ImmutableMap.of("FOO", 100, "BAR", 100);
+    partitionCapacityMap.put(wagedResourceName + "_0", partitionCapacity);
+    // Also add a default key
+    partitionCapacityMap.put(ResourceConfig.DEFAULT_PARTITION_KEY, partitionCapacity);
+    resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
+
+    // Put both IdealState and ResourceConfig into a map as required
+    Map<String, ZNRecord> inputMap = ImmutableMap.of(
+        ResourceAccessor.ResourceProperties.idealState.name(), idealState.getRecord(),
+        ResourceAccessor.ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
+
+    // Create an entity using the inputMap
+    Entity entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(inputMap), MediaType.APPLICATION_JSON_TYPE);
+
+    // Make a HTTP call to the REST endpoint
+    put("clusters/" + CLUSTER_NAME + "/resources/" + wagedResourceName,
+        ImmutableMap.of("command", "addWagedResource"), entity, Response.Status.OK.getStatusCode());
+
+    // Test case 2: Add a resource with invalid weights
+    String invalidResourceName = "invalidWagedResource";
+    ResourceConfig invalidWeightResourceConfig = new ResourceConfig(invalidResourceName);
+    IdealState invalidWeightIdealState = new IdealState(invalidResourceName);
+
+    Map<String, ZNRecord> invalidInputMap = ImmutableMap.of(
+        ResourceAccessor.ResourceProperties.idealState.name(), invalidWeightIdealState.getRecord(),
+        ResourceAccessor.ResourceProperties.resourceConfig.name(),
+        invalidWeightResourceConfig.getRecord());
+
+    // Create an entity using invalidInputMap
+    entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(invalidInputMap),
+        MediaType.APPLICATION_JSON_TYPE);
+
+    // Make a HTTP call to the REST endpoint
+    put("clusters/" + CLUSTER_NAME + "/resources/" + invalidResourceName,
+        ImmutableMap.of("command", "addWagedResource"), entity,
+        Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test(dependsOnMethods = "testAddResourceWithWeight")
+  public void testValidateResource() throws IOException {
+    // Define weight keys in ClusterConfig
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Remove all weight configs in InstanceConfig for testing
+    for (String instance : _instancesMap.get(CLUSTER_NAME)) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(Collections.emptyMap());
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    // Validate the resource added in testAddResourceWithWeight()
+    String resourceToValidate = "newWagedResource";
+    // This should fail because none of the instances have weight configured
+    get("clusters/" + CLUSTER_NAME + "/resources/" + resourceToValidate,
+        ImmutableMap.of("command", "validateWeight"), Response.Status.BAD_REQUEST.getStatusCode(),
+        true);
+
+    // Add back weight configurations to all instance configs
+    Map<String, Integer> instanceCapacityMap = ImmutableMap.of("FOO", 1000, "BAR",
1000);
+    for (String instance : _instancesMap.get(CLUSTER_NAME)) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(instanceCapacityMap);
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    // Now try validating again - it should go through and return a 200
+    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + resourceToValidate,
+        ImmutableMap.of("command", "validateWeight"), Response.Status.OK.getStatusCode(),
true);
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    Assert.assertEquals(node.get(resourceToValidate).toString(), "true");
+  }
+
   /**
    * Creates a setup where the health API can be tested.
    * @param clusterName
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
index e0642b3..5398e36 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
@@ -78,7 +78,8 @@ public class JerseyUriRequestBuilder {
     Assert.assertEquals(response.getStatus(), _expectedStatusCode);
 
     // NOT_FOUND will throw text based html
-    if (_expectedStatusCode != Response.Status.NOT_FOUND.getStatusCode()) {
+    if (_expectedStatusCode != Response.Status.NOT_FOUND.getStatusCode()
+        && _expectedStatusCode != Response.Status.BAD_REQUEST.getStatusCode()) {
       Assert.assertEquals(response.getMediaType().getType(), "application");
     } else {
       Assert.assertEquals(response.getMediaType().getType(), "text");


Mime
View raw message