Add REST API endpoints for WAGED Rebalancer (#611)

We want to make WAGED rebalancer (weight-aware) easier to use. One way to do this is to allow the user to easily add resources with weight configuration set by providing REST endpoints. This change adds the relevant REST endpoints based on the HelixAdmin APIs added in (#570).

Basically, this commit uses existing REST endpoints whose hierarchy is defined by REST resource. What this commit does to the existing endpoints is 1) Add extra commands 2) Add a WAGED command as a QueryParam so that WAGED logic could be included.

This change is backward-compatible because it keeps the original behavior when no commands are provided by using @DefaultValue annotation.
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 78bfd77..8e47b77 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -67,7 +67,15 @@
     rebalance,
     reset,
     resetPartitions,
-    removeInstanceTag
+    removeInstanceTag,
+    addResource,
+    addWagedResource,
+    getResource,
+    validateWeight,
+    enableWagedRebalance,
+    enableWagedRebalanceForAllResources,
+    getInstance,
+    getAllInstances
   }
 
   @Context
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index e4f0358..f7f9af5 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -238,7 +238,15 @@
       helixAdmin.manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode,
           content, customFieldsMap);
       break;
-
+    case enableWagedRebalanceForAllResources:
+      // Enable WAGED rebalance for all resources in the cluster
+      List<String> resources = helixAdmin.getResourcesInCluster(clusterId);
+      try {
+        helixAdmin.enableWagedRebalance(clusterId, resources);
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      break;
     default:
       return badRequest("Unsupported command " + command);
     }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 9aedddd..4578172 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -1,5 +1,24 @@
 package org.apache.helix.rest.server.resources.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -8,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -61,41 +81,65 @@
   }
 
   @GET
-  public Response getAllInstances(@PathParam("clusterId") String clusterId) {
+  public Response getAllInstances(@PathParam("clusterId") String clusterId,
+      @DefaultValue("getAllInstances") @QueryParam("command") String command) {
+    // Get the command. If not provided, the default would be "getAllInstances"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+
     HelixDataAccessor accessor = getDataAccssor(clusterId);
     List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
-
     if (instances == null) {
       return notFound();
     }
 
-    ObjectNode root = JsonNodeFactory.instance.objectNode();
-    root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
+    switch (cmd) {
+    case getAllInstances:
+      ObjectNode root = JsonNodeFactory.instance.objectNode();
+      root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
 
-    ArrayNode instancesNode = root.putArray(InstancesAccessor.InstancesProperties.instances.name());
-    instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
-    ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
-    ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
+      ArrayNode instancesNode =
+          root.putArray(InstancesAccessor.InstancesProperties.instances.name());
+      instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
+      ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
+      ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
 
-    List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
-    ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+      List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
+      ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
 
-    for (String instanceName : instances) {
-      InstanceConfig instanceConfig =
-          accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
-      if (instanceConfig != null) {
-        if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
-            && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
-          disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
-        }
+      for (String instanceName : instances) {
+        InstanceConfig instanceConfig =
+            accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+        if (instanceConfig != null) {
+          if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
+              && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
+            disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          }
 
-        if (liveInstances.contains(instanceName)){
-          onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          if (liveInstances.contains(instanceName)) {
+            onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+          }
         }
       }
+      return JSONRepresentation(root);
+    case validateWeight:
+      // Validate all instances for WAGED rebalance
+      HelixAdmin admin = getHelixAdmin();
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateInstancesForWagedRebalance(clusterId, instances);
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      _logger.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
     }
-
-    return JSONRepresentation(root);
   }
 
   @POST
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index a6dfb9a..368c730 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -20,9 +20,12 @@
  */
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -81,16 +84,41 @@
 
   @GET
   public Response getInstanceById(@PathParam("clusterId") String clusterId,
-      @PathParam("instanceName") String instanceName) throws IOException {
-    ObjectMapper objectMapper = new ObjectMapper();
-    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
-    // TODO reduce GC by dependency injection
-    InstanceService instanceService =
-        new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
-    InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
-        InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
+      @PathParam("instanceName") String instanceName,
+      @DefaultValue("getInstance") @QueryParam("command") String command) throws IOException {
+    // Get the command. If not provided, the default would be "getInstance"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
 
-    return OK(objectMapper.writeValueAsString(instanceInfo));
+    switch (cmd) {
+    case getInstance:
+      ObjectMapper objectMapper = new ObjectMapper();
+      HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+      // TODO reduce GC by dependency injection
+      InstanceService instanceService = new InstanceServiceImpl(
+          new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
+      InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
+          InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
+      return OK(objectMapper.writeValueAsString(instanceInfo));
+    case validateWeight:
+      // Validates instanceConfig for WAGED rebalance
+      HelixAdmin admin = getHelixAdmin();
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateInstancesForWagedRebalance(clusterId,
+            Collections.singletonList(instanceName));
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      LOG.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
+    }
   }
 
   @POST
@@ -345,7 +373,8 @@
     return notFound();
   }
 
-  @GET @Path("errors")
+  @GET
+  @Path("errors")
   public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
       @PathParam("instanceName") String instanceName) throws IOException {
     HelixDataAccessor accessor = getDataAccssor(clusterId);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index 47f7ec9..c41d024 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -49,6 +49,7 @@
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.codehaus.jackson.type.TypeReference;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
@@ -161,33 +162,56 @@
   @GET
   @Path("{resourceName}")
   public Response getResource(@PathParam("clusterId") String clusterId,
-      @PathParam("resourceName") String resourceName) {
+      @PathParam("resourceName") String resourceName,
+      @DefaultValue("getResource") @QueryParam("command") String command) {
+    // Get the command. If not provided, the default would be "getResource"
+    Command cmd;
+    try {
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
     ConfigAccessor accessor = getConfigAccessor();
     HelixAdmin admin = getHelixAdmin();
 
-    ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
-    IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
-    ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+    switch (cmd) {
+    case getResource:
+      ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
+      IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+      ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
 
-    Map<String, ZNRecord> resourceMap = new HashMap<>();
-    if (idealState != null) {
-      resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
-    } else {
-      return notFound();
+      Map<String, ZNRecord> resourceMap = new HashMap<>();
+      if (idealState != null) {
+        resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
+      } else {
+        return notFound();
+      }
+
+      resourceMap.put(ResourceProperties.resourceConfig.name(), null);
+      resourceMap.put(ResourceProperties.externalView.name(), null);
+
+      if (resourceConfig != null) {
+        resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
+      }
+
+      if (externalView != null) {
+        resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
+      }
+      return JSONRepresentation(resourceMap);
+    case validateWeight:
+      // Validate ResourceConfig for WAGED rebalance
+      Map<String, Boolean> validationResultMap;
+      try {
+        validationResultMap = admin.validateResourcesForWagedRebalance(clusterId,
+            Collections.singletonList(resourceName));
+      } catch (HelixException e) {
+        return badRequest(e.getMessage());
+      }
+      return JSONRepresentation(validationResultMap);
+    default:
+      _logger.error("Unsupported command :" + command);
+      return badRequest("Unsupported command :" + command);
     }
-
-    resourceMap.put(ResourceProperties.resourceConfig.name(), null);
-    resourceMap.put(ResourceProperties.externalView.name(), null);
-
-    if (resourceConfig != null) {
-      resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
-    }
-
-    if (externalView != null) {
-      resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
-    }
-
-    return JSONRepresentation(resourceMap);
   }
 
   @PUT
@@ -200,32 +224,81 @@
       @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy,
       @DefaultValue("0") @QueryParam("bucketSize") int bucketSize,
       @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance,
-      String content) {
-
-    HelixAdmin admin = getHelixAdmin();
-
+      @DefaultValue("addResource") @QueryParam("command") String command, String content) {
+    // Get the command. If not provided, the default would be "addResource"
+    Command cmd;
     try {
-      if (content.length() != 0) {
-        ZNRecord record;
-        try {
-          record = toZNRecord(content);
-        } catch (IOException e) {
-          _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
-          return badRequest("Input is not a vaild ZNRecord!");
-        }
+      cmd = Command.valueOf(command);
+    } catch (Exception e) {
+      return badRequest("Invalid command : " + command);
+    }
+    HelixAdmin admin = getHelixAdmin();
+    try {
+      switch (cmd) {
+      case addResource:
+        if (content.length() != 0) {
+          ZNRecord record;
+          try {
+            record = toZNRecord(content);
+          } catch (IOException e) {
+            _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+            return badRequest("Input is not a valid ZNRecord!");
+          }
 
-        if (record.getSimpleFields() != null) {
-          admin.addResource(clusterId, resourceName, new IdealState(record));
+          if (record.getSimpleFields() != null) {
+            admin.addResource(clusterId, resourceName, new IdealState(record));
+          }
+        } else {
+          admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
+              rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
         }
-      } else {
-        admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
-            rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
+        break;
+      case addWagedResource:
+        // Check if content is valid
+        if (content == null || content.length() == 0) {
+          _logger.error("Input is null or empty!");
+          return badRequest("Input is null or empty!");
+        }
+        Map<String, ZNRecord> input;
+        // Content must supply both IdealState and ResourceConfig
+        try {
+          TypeReference<Map<String, ZNRecord>> typeRef =
+              new TypeReference<Map<String, ZNRecord>>() {
+              };
+          input = OBJECT_MAPPER.readValue(content, typeRef);
+        } catch (IOException e) {
+          _logger.error("Failed to deserialize user's input {}, Exception: {}", content, e);
+          return badRequest("Input is not a valid map of String-ZNRecord pairs!");
+        }
+        // Check if the map contains both IdealState and ResourceConfig
+        ZNRecord idealStateRecord =
+            input.get(ResourceAccessor.ResourceProperties.idealState.name());
+        ZNRecord resourceConfigRecord =
+            input.get(ResourceAccessor.ResourceProperties.resourceConfig.name());
+
+        if (idealStateRecord == null || resourceConfigRecord == null) {
+          _logger.error("Input does not contain both IdealState and ResourceConfig!");
+          return badRequest("Input does not contain both IdealState and ResourceConfig!");
+        }
+        // Add using HelixAdmin API
+        try {
+          admin.addResourceWithWeight(clusterId, new IdealState(idealStateRecord),
+              new ResourceConfig(resourceConfigRecord));
+        } catch (HelixException e) {
+          String errMsg = String.format("Failed to add resource %s with weight in cluster %s!",
+              idealStateRecord.getId(), clusterId);
+          _logger.error(errMsg, e);
+          return badRequest(errMsg);
+        }
+        break;
+      default:
+        _logger.error("Unsupported command :" + command);
+        return badRequest("Unsupported command :" + command);
       }
     } catch (Exception e) {
       _logger.error("Error in adding a resource: " + resourceName, e);
       return serverError(e);
     }
-
     return OK();
   }
 
@@ -259,6 +332,13 @@
         keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
         admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
         break;
+      case enableWagedRebalance:
+        try {
+          admin.enableWagedRebalance(clusterId, Collections.singletonList(resourceName));
+        } catch (HelixException e) {
+          return badRequest(e.getMessage());
+        }
+        break;
       default:
         _logger.error("Unsupported command :" + command);
         return badRequest("Unsupported command :" + command);
@@ -318,7 +398,7 @@
       record = toZNRecord(content);
     } catch (IOException e) {
       _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
-      return badRequest("Input is not a vaild ZNRecord!");
+      return badRequest("Input is not a valid ZNRecord!");
     }
     ResourceConfig resourceConfig = new ResourceConfig(record);
     ConfigAccessor configAccessor = getConfigAccessor();
@@ -380,7 +460,7 @@
       record = toZNRecord(content);
     } catch (IOException e) {
       _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
-      return badRequest("Input is not a vaild ZNRecord!");
+      return badRequest("Input is not a valid ZNRecord!");
     }
     IdealState idealState = new IdealState(record);
     HelixAdmin helixAdmin = getHelixAdmin();
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 11a450d..e3b33fb 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -471,8 +471,9 @@
     final Response response = webTarget.request().get();
     Assert.assertEquals(response.getStatus(), expectedReturnStatus);
 
-    // NOT_FOUND will throw text based html
-    if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode()) {
+    // NOT_FOUND and BAD_REQUEST will throw text based html
+    if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode()
+        && expectedReturnStatus != Response.Status.BAD_REQUEST.getStatusCode()) {
       Assert.assertEquals(response.getMediaType().getType(), "application");
     } else {
       Assert.assertEquals(response.getMediaType().getType(), "text");
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index b8d8018..05525c0 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -39,6 +39,7 @@
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKUtil;
@@ -563,6 +564,18 @@
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testActivateSuperCluster")
+  public void testEnableWagedRebalanceForAllResources() {
+    String cluster = "TestCluster_2";
+    post("clusters/" + cluster, ImmutableMap.of("command", "enableWagedRebalanceForAllResources"),
+        Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK.getStatusCode());
+    for (String resource : _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster)) {
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      Assert.assertEquals(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+    }
+  }
+
   private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException {
     String body = get("clusters/" + cluster + "/configs", null, Response.Status.OK.getStatusCode(), true);
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index da8f911..bc13af2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -1,6 +1,26 @@
 package org.apache.helix.rest.server;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -152,6 +172,55 @@
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "testGetAllInstances")
+  public void testValidateWeightForAllInstances() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Empty out ClusterConfig's weight key setting for testing
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.getRecord().setListField(
+        ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), new ArrayList<>());
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    // Issue a validate call
+    String body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME).get(this);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because there's no capacity keys set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.booleanValue()));
+
+    clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Since instances do not have weight-related configs, the result should return error
+    Assert.assertTrue(node.has("error"));
+
+    // Now set weight-related configs in InstanceConfigs
+    List<String> instances =
+        _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME);
+    for (String instance : instances) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(ImmutableMap.of("FOO", 1000, "BAR", 1000));
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME)
+        .expectedReturnStatusCode(Response.Status.OK.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because capacity keys are set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.booleanValue()));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
   private Set<String> getStringSet(JsonNode jsonNode, String key) {
     Set<String> result = new HashSet<>();
     jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index cc56ef2..aa46c00 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -37,6 +37,7 @@
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.rest.server.resources.AbstractResource;
@@ -398,4 +399,51 @@
         .format(CLUSTER_NAME, instanceName).post(this, entity);
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
+
+  /**
+   * Check that validateWeightForInstance() works by
+   * 1. First call validate -> We should get "true" because nothing is set in ClusterConfig.
+   * 2. Define keys in ClusterConfig and call validate -> We should get BadRequest.
+   * 3. Define weight configs in InstanceConfig and call validate -> We should get OK with "true".
+   */
+  @Test(dependsOnMethods = "checkUpdateFails")
+  public void testValidateWeightForInstance() throws IOException {
+    // Get one instance in the cluster
+    String instance = _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME)
+        .iterator().next();
+
+    // Issue a validate call
+    String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance).get(this);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    // Must have the result saying (true) because there's no capacity keys set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.getBooleanValue()));
+
+    // Define keys in ClusterConfig
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance)
+        .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Since instance does not have weight-related configs, the result should return error
+    Assert.assertTrue(node.has("error"));
+
+    // Now set weight-related config in InstanceConfig
+    InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+    instanceConfig.setInstanceCapacityMap(ImmutableMap.of("FOO", 1000, "BAR", 1000));
+    _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+
+    body = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=validateWeight")
+        .isBodyReturnExpected(true).format(CLUSTER_NAME, instance)
+        .expectedReturnStatusCode(Response.Status.OK.getStatusCode()).get(this);
+    node = OBJECT_MAPPER.readTree(body);
+    // Must have the results saying they are all valid (true) because capacity keys are set
+    // in ClusterConfig
+    node.iterator().forEachRemaining(child -> Assert.assertTrue(child.getBooleanValue()));
+  }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index 62e68c5..f865674 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -41,8 +41,11 @@
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.helix.rest.server.resources.helix.ResourceAccessor;
@@ -429,10 +432,30 @@
   }
 
   /**
+   * Test "enableWagedRebalance" command of updateResource.
+   */
+  @Test(dependsOnMethods = "updateResourceIdealState")
+  public void testEnableWagedRebalance() {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    Assert.assertNotSame(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+
+    // Enable waged rebalance, which should change the rebalancer class name
+    Entity entity = Entity.entity(null, MediaType.APPLICATION_JSON_TYPE);
+    post("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME,
+        Collections.singletonMap("command", "enableWagedRebalance"), entity,
+        Response.Status.OK.getStatusCode());
+
+    idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    Assert.assertEquals(idealState.getRebalancerClassName(), WagedRebalancer.class.getName());
+  }
+
+  /**
    * Test "delete" command of updateResourceIdealState.
    * @throws Exception
    */
-  @Test(dependsOnMethods = "updateResourceIdealState")
+  @Test(dependsOnMethods = "testEnableWagedRebalance")
   public void deleteFromResourceIdealState() throws Exception {
     String zkPath = PropertyPathBuilder.idealState(CLUSTER_NAME, RESOURCE_NAME);
     ZNRecord record = new ZNRecord(RESOURCE_NAME);
@@ -470,6 +493,98 @@
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }
 
+  @Test(dependsOnMethods = "deleteFromResourceIdealState")
+  public void testAddResourceWithWeight() throws IOException {
+    // Test case 1: Add a valid resource with valid weights
+    // Create a resource with IdealState and ResourceConfig
+    String wagedResourceName = "newWagedResource";
+
+    // Create an IdealState on full-auto with 1 partition
+    IdealState idealState = new IdealState(wagedResourceName);
+    idealState.getRecord().getSimpleFields().putAll(_gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME).getRecord().getSimpleFields());
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    idealState.setNumPartitions(1); // 1 partition for convenience of testing
+
+    // Create a ResourceConfig with FOO and BAR at 100 respectively
+    ResourceConfig resourceConfig = new ResourceConfig(wagedResourceName);
+    Map<String, Map<String, Integer>> partitionCapacityMap = new HashMap<>();
+    Map<String, Integer> partitionCapacity = ImmutableMap.of("FOO", 100, "BAR", 100);
+    partitionCapacityMap.put(wagedResourceName + "_0", partitionCapacity);
+    // Also add a default key
+    partitionCapacityMap.put(ResourceConfig.DEFAULT_PARTITION_KEY, partitionCapacity);
+    resourceConfig.setPartitionCapacityMap(partitionCapacityMap);
+
+    // Put both IdealState and ResourceConfig into a map as required
+    Map<String, ZNRecord> inputMap = ImmutableMap.of(
+        ResourceAccessor.ResourceProperties.idealState.name(), idealState.getRecord(),
+        ResourceAccessor.ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
+
+    // Create an entity using the inputMap
+    Entity entity =
+        Entity.entity(OBJECT_MAPPER.writeValueAsString(inputMap), MediaType.APPLICATION_JSON_TYPE);
+
+    // Make a HTTP call to the REST endpoint
+    put("clusters/" + CLUSTER_NAME + "/resources/" + wagedResourceName,
+        ImmutableMap.of("command", "addWagedResource"), entity, Response.Status.OK.getStatusCode());
+
+    // Test case 2: Add a resource with invalid weights
+    String invalidResourceName = "invalidWagedResource";
+    ResourceConfig invalidWeightResourceConfig = new ResourceConfig(invalidResourceName);
+    IdealState invalidWeightIdealState = new IdealState(invalidResourceName);
+
+    Map<String, ZNRecord> invalidInputMap = ImmutableMap.of(
+        ResourceAccessor.ResourceProperties.idealState.name(), invalidWeightIdealState.getRecord(),
+        ResourceAccessor.ResourceProperties.resourceConfig.name(),
+        invalidWeightResourceConfig.getRecord());
+
+    // Create an entity using invalidInputMap
+    entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(invalidInputMap),
+        MediaType.APPLICATION_JSON_TYPE);
+
+    // Make a HTTP call to the REST endpoint
+    put("clusters/" + CLUSTER_NAME + "/resources/" + invalidResourceName,
+        ImmutableMap.of("command", "addWagedResource"), entity,
+        Response.Status.BAD_REQUEST.getStatusCode());
+  }
+
+  @Test(dependsOnMethods = "testAddResourceWithWeight")
+  public void testValidateResource() throws IOException {
+    // Define weight keys in ClusterConfig
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setInstanceCapacityKeys(Arrays.asList("FOO", "BAR"));
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Remove all weight configs in InstanceConfig for testing
+    for (String instance : _instancesMap.get(CLUSTER_NAME)) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(Collections.emptyMap());
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    // Validate the resource added in testAddResourceWithWeight()
+    String resourceToValidate = "newWagedResource";
+    // This should fail because none of the instances have weight configured
+    get("clusters/" + CLUSTER_NAME + "/resources/" + resourceToValidate,
+        ImmutableMap.of("command", "validateWeight"), Response.Status.BAD_REQUEST.getStatusCode(),
+        true);
+
+    // Add back weight configurations to all instance configs
+    Map<String, Integer> instanceCapacityMap = ImmutableMap.of("FOO", 1000, "BAR", 1000);
+    for (String instance : _instancesMap.get(CLUSTER_NAME)) {
+      InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance);
+      instanceConfig.setInstanceCapacityMap(instanceCapacityMap);
+      _configAccessor.setInstanceConfig(CLUSTER_NAME, instance, instanceConfig);
+    }
+
+    // Now try validating again - it should go through and return a 200
+    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + resourceToValidate,
+        ImmutableMap.of("command", "validateWeight"), Response.Status.OK.getStatusCode(), true);
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    Assert.assertEquals(node.get(resourceToValidate).toString(), "true");
+  }
+
   /**
    * Creates a setup where the health API can be tested.
    * @param clusterName
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
index 359999e..4552240 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/JerseyUriRequestBuilder.java
@@ -76,7 +76,8 @@
     Assert.assertEquals(response.getStatus(), _expectedStatusCode);
 
     // NOT_FOUND will throw text based html
-    if (_expectedStatusCode != Response.Status.NOT_FOUND.getStatusCode()) {
+    if (_expectedStatusCode != Response.Status.NOT_FOUND.getStatusCode()
+        && _expectedStatusCode != Response.Status.BAD_REQUEST.getStatusCode()) {
       Assert.assertEquals(response.getMediaType().getType(), "application");
     } else {
       Assert.assertEquals(response.getMediaType().getType(), "text");