[apache/helix] -- Add SetPartitionToError for participants to self annotate a node to ERROR state (#2792)

Co-authored-by: Charanya Sudharsanan <csudhars@csudhars-mn2.linkedin.biz>

What: An API endpoint that validates the incoming request and sends a state transition message to sets one or more partitions from any current state to ERROR state.

Why: Currently, the participants are unable to set a partition to an ERROR state explicitly when they seem to be stuck in a specific current state. The only way a replica can be set to ERROR is from within a state model. Having an endpoint to allow this behavior would allow the clients to call the resetPartition endpoint to set it back to INIT state and recover the replica. resetPartition works only on partitions in error state.
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index d2e0c26..84a7154 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -422,6 +422,18 @@
   ClusterManagementMode getClusterManagementMode(String clusterName);
 
   /**
+   * Set a list of partitions for an instance to ERROR state from any state.
+   * The partitions could be in any state and setPartitionsToError will bring them to ERROR
+   * state. ANY to ERROR state transition is required for this.
+   * @param clusterName
+   * @param instanceName
+   * @param resourceName
+   * @param partitionNames
+   */
+  void setPartitionsToError(String clusterName, String instanceName, String resourceName,
+      List<String> partitionNames);
+
+  /**
    * Reset a list of partitions in error state for an instance
    * The partitions are assume to be in error state and reset will bring them from error
    * to initial state. An error to initial state transition is required for reset.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index c7fe086..8c873b4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1035,6 +1035,136 @@
         : new ClusterManagementMode(status.getManagementMode(), status.getManagementModeStatus());
   }
 
+  @Override
+  public void setPartitionsToError(String clusterName, String instanceName, String resourceName,
+      List<String> partitionNames) {
+    logger.info("Set partitions {} for resource {} on instance {} in cluster {} to ERROR state.",
+        partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName,
+        instanceName, clusterName);
+    sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames,
+        StateTransitionType.SET_TO_ERROR);
+  }
+
+  private void sendStateTransitionMessage(String clusterName, String instanceName,
+      String resourceName, List<String> partitionNames, StateTransitionType stateTransitionType) {
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // check the instance is alive
+    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      // check if the instance exists in the cluster
+      String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+      throw new HelixException(String.format(
+          (_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE
+              : SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName,
+                  partitionNames, instanceName, instanceName, clusterName, stateTransitionType)));
+    }
+
+    // check resource exists in ideal state
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    if (idealState == null) {
+      throw new HelixException(
+          String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName,
+              partitionNames, instanceName, resourceName, clusterName, stateTransitionType)));
+    }
+
+    // check partition exists in resource
+    Set<String> partitionsNames = new HashSet<String>(partitionNames);
+    Set<String> partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
+        ? idealState.getRecord().getMapFields().keySet()
+        : idealState.getRecord().getListFields().keySet();
+    if (!partitions.containsAll(partitionsNames)) {
+      throw new HelixException(
+          String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName,
+              partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
+    }
+
+    // check partition is in ERROR state if reset is set to True
+    String sessionId = liveInstance.getEphemeralOwner();
+    CurrentState curState =
+        accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
+    if (stateTransitionType.equals(StateTransitionType.RESET)) {
+      for (String partitionName : partitionNames) {
+        if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
+          throw new HelixException(String.format(
+              SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName, partitionNames,
+                  instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
+        }
+      }
+    }
+
+    // check stateModelDef exists
+    String stateModelDef = idealState.getStateModelDefRef();
+    StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
+    if (stateModel == null) {
+      throw new HelixException(
+          String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName,
+              partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType)));
+    }
+
+    // check there is no pending messages for the partitions exist
+    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
+    for (Message message : messages) {
+      if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
+          || !sessionId.equals(message.getTgtSessionId())
+          || !resourceName.equals(message.getResourceName())
+          || !partitionsNames.contains(message.getPartitionName())) {
+        continue;
+      }
+
+      throw new HelixException(String.format(
+          "Can't %s state for %s.%s on %s, because a pending message %s exists for resource %s",
+          stateTransitionType.name(), resourceName, partitionNames, instanceName, message,
+          message.getResourceName()));
+    }
+
+    String adminName = null;
+    try {
+      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+    } catch (UnknownHostException e) {
+      logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
+      adminName = "UNKNOWN";
+    }
+
+    List<Message> stateTransitionMessages = new ArrayList<Message>();
+    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+    for (String partitionName : partitionNames) {
+      String msgId = UUID.randomUUID().toString();
+      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+      message.setSrcName(adminName);
+      message.setTgtName(instanceName);
+      message.setMsgState(MessageState.NEW);
+      message.setPartitionName(partitionName);
+      message.setResourceName(resourceName);
+      message.setTgtSessionId(sessionId);
+      message.setStateModelDef(stateModelDef);
+      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+      // if reset == TRUE, send ERROR to initialState message
+      // else, send * to ERROR state message
+      if (stateTransitionType.equals(StateTransitionType.RESET)) {
+        message.setFromState(HelixDefinedState.ERROR.toString());
+        message.setToState(stateModel.getInitialState());
+      }
+      if (stateTransitionType.equals(StateTransitionType.SET_TO_ERROR)) {
+        message.setFromState("*");
+        message.setToState(HelixDefinedState.ERROR.toString());
+      }
+      if (idealState.getResourceGroupName() != null) {
+        message.setResourceGroupName(idealState.getResourceGroupName());
+      }
+      if (idealState.getInstanceGroupTag() != null) {
+        message.setResourceTag(idealState.getInstanceGroupTag());
+      }
+
+      stateTransitionMessages.add(message);
+      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
+    }
+
+    accessor.setChildren(messageKeys, stateTransitionMessages);
+  }
+
   private void enableClusterPauseMode(String clusterName, boolean cancelPendingST, String reason) {
     String hostname = NetworkUtil.getLocalhostName();
     logger.info(
@@ -1180,7 +1310,7 @@
     }
   }
 
-  private enum ResetPartitionFailureReason {
+  private enum SetPartitionFailureReason {
     INSTANCE_NOT_ALIVE("%s is not alive in cluster %s"),
     INSTANCE_NON_EXISTENT("%s does not exist in cluster %s"),
     RESOURCE_NON_EXISTENT("resource %s is not added to cluster %s"),
@@ -1190,129 +1320,33 @@
 
     private String message;
 
-    ResetPartitionFailureReason(String message) {
+    SetPartitionFailureReason(String message) {
       this.message = message;
     }
 
     public String getMessage(String resourceName, List<String> partitionNames, String instanceName,
-        String errorStateEntity, String clusterName) {
-      return String.format("Can't reset state for %s.%s on %s, because " + message, resourceName,
-          partitionNames, instanceName, errorStateEntity, clusterName);
+        String errorStateEntity, String clusterName, StateTransitionType stateTransitionType) {
+      return String.format("Can't %s state for %s.%s on %s, because " + message,
+          stateTransitionType.name(), resourceName, partitionNames, instanceName, errorStateEntity,
+          clusterName);
     }
   }
 
+  private enum StateTransitionType {
+    // sets state from ERROR to INIT.
+    RESET,
+    // sets state from ANY to ERROR.
+    SET_TO_ERROR,
+    // Unknown StateTransitionType
+    UNDEFINED
+  }
   @Override
   public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
     logger.info("Reset partitions {} for resource {} on instance {} in cluster {}.",
         partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName,
         instanceName, clusterName);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    // check the instance is alive
-    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
-    if (liveInstance == null) {
-      // check if the instance exists in the cluster
-      String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
-      throw new HelixException(String.format(
-          (_zkClient.exists(instanceConfigPath) ? ResetPartitionFailureReason.INSTANCE_NOT_ALIVE
-              : ResetPartitionFailureReason.INSTANCE_NON_EXISTENT)
-              .getMessage(resourceName, partitionNames, instanceName, instanceName, clusterName)));
-    }
-
-    // check resource group exists
-    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
-    if (idealState == null) {
-      throw new HelixException(String.format(ResetPartitionFailureReason.RESOURCE_NON_EXISTENT
-          .getMessage(resourceName, partitionNames, instanceName, resourceName, clusterName)));
-    }
-
-    // check partition exists in resource group
-    Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
-    Set<String> partitions =
-        (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ? idealState.getRecord()
-            .getMapFields().keySet() : idealState.getRecord().getListFields().keySet();
-    if (!partitions.containsAll(resetPartitionNames)) {
-      throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NON_EXISTENT
-          .getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(),
-              clusterName)));
-    }
-
-    // check partition is in ERROR state
-    String sessionId = liveInstance.getEphemeralOwner();
-    CurrentState curState =
-        accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
-    for (String partitionName : resetPartitionNames) {
-      if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
-        throw new HelixException(String.format(ResetPartitionFailureReason.PARTITION_NOT_ERROR
-            .getMessage(resourceName, partitionNames, instanceName, partitionNames.toString(),
-                clusterName)));
-      }
-    }
-
-    // check stateModelDef exists and get initial state
-    String stateModelDef = idealState.getStateModelDefRef();
-    StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
-    if (stateModel == null) {
-      throw new HelixException(String.format(ResetPartitionFailureReason.STATE_MODEL_NON_EXISTENT
-          .getMessage(resourceName, partitionNames, instanceName, stateModelDef, clusterName)));
-    }
-
-    // check there is no pending messages for the partitions exist
-    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
-    for (Message message : messages) {
-      if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId
-          .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName())
-          || !resetPartitionNames.contains(message.getPartitionName())) {
-        continue;
-      }
-
-      throw new HelixException(String.format(
-          "Can't reset state for %s.%s on %s, because a pending message %s exists for resource %s",
-          resourceName, partitionNames, instanceName, message.toString(),
-          message.getResourceName()));
-    }
-
-    String adminName = null;
-    try {
-      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
-    } catch (UnknownHostException e) {
-      // can ignore it
-      logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
-      adminName = "UNKNOWN";
-    }
-
-    List<Message> resetMessages = new ArrayList<Message>();
-    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
-    for (String partitionName : resetPartitionNames) {
-      // send ERROR to initialState message
-      String msgId = UUID.randomUUID().toString();
-      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
-      message.setSrcName(adminName);
-      message.setTgtName(instanceName);
-      message.setMsgState(MessageState.NEW);
-      message.setPartitionName(partitionName);
-      message.setResourceName(resourceName);
-      message.setTgtSessionId(sessionId);
-      message.setStateModelDef(stateModelDef);
-      message.setFromState(HelixDefinedState.ERROR.toString());
-      message.setToState(stateModel.getInitialState());
-      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
-
-      if (idealState.getResourceGroupName() != null) {
-        message.setResourceGroupName(idealState.getResourceGroupName());
-      }
-      if (idealState.getInstanceGroupTag() != null) {
-        message.setResourceTag(idealState.getInstanceGroupTag());
-      }
-
-      resetMessages.add(message);
-      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
-    }
-
-    accessor.setChildren(messageKeys, resetMessages);
+    sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, StateTransitionType.RESET);
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 0d67ced..0a91370 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -176,7 +176,8 @@
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
         _stateModelFactory.removeStateModel(_message.getResourceName(), partitionKey);
-      } else if (_stateModel.getCurrentState().equals(_message.getFromState())) {
+      } else if (_message.getFromState().equals("*")
+          || _stateModel.getCurrentState().equals(_message.getFromState())) {
         // if the partition is not to be dropped, update _stateModel to the TO_STATE
         // need this check because TaskRunner may change _stateModel before reach here.
         _stateModel.updateState(toState);
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 4470b99..6a9473e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -323,6 +323,7 @@
         String fromState = message.getFromState();
         String toState = message.getToState();
         String transition = fromState + "--" + toState;
+        transition = transition.replaceAll("\\*", "ANY");
 
         StateTransitionContext cxt =
             new StateTransitionContext(manager.getClusterName(), manager.getInstanceName(),
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 143c14a..5bb2a19 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -115,4 +115,15 @@
   public boolean isCancelled() {
     return _cancelled;
   }
+
+  /*
+   * default transition to set partition in any state to error state
+   * @param message
+   * @param context
+   * @throws InterruptedException
+   */
+  @Transition(to = "ERROR", from = "*")
+  public void onBecomeErrorFromAny(Message message, NotificationContext context) throws Exception {
+    logger.info("Default *->ERROR transition invoked.");
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 633ce03..a957863 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -134,6 +134,9 @@
   public static final String resetInstance = "resetInstance";
   public static final String resetResource = "resetResource";
 
+  // set partitions to ERROR
+  public static final String setPartitionsToError = "setPartitionsToError";
+
   // help
   public static final String help = "help";
 
@@ -1114,6 +1117,13 @@
     removeCloudConfigOption.setRequired(false);
     removeCloudConfigOption.setArgName("clusterName");
 
+    Option setPartitionsToErrorOption =
+        OptionBuilder.withLongOpt(setPartitionsToError)
+            .withDescription("Set a Partition to Error State").create();
+    setPartitionsToErrorOption.setArgs(4);
+    setPartitionsToErrorOption.setRequired(false);
+    setPartitionsToErrorOption.setArgName("clusterName instanceName resourceName partitionName");
+
     OptionGroup group = new OptionGroup();
     group.setRequired(true);
     group.addOption(rebalanceOption);
@@ -1153,6 +1163,7 @@
     group.addOption(listStateModelOption);
     group.addOption(addResourcePropertyOption);
     group.addOption(removeResourcePropertyOption);
+    group.addOption(setPartitionsToErrorOption);
 
     // set/get/remove config options
     group.addOption(setConfOption);
@@ -1561,6 +1572,16 @@
       String newInstanceName = cmd.getOptionValues(swapInstance)[2];
 
       setupTool.swapInstance(clusterName, oldInstanceName, newInstanceName);
+    } else if (cmd.hasOption(setPartitionsToError)) {
+      String[] args = cmd.getOptionValues(setPartitionsToError);
+
+      String clusterName = args[0];
+      String instanceName = args[1];
+      String resourceName = args[2];
+      List<String> partitionNames = Arrays.asList(Arrays.copyOfRange(args, 3, args.length));
+
+      setupTool.getClusterManagementTool().setPartitionsToError(clusterName, instanceName, resourceName, partitionNames);
+      return 0;
     }
     // set/get/remove config options
     else if (cmd.hasOption(setConfig)) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java
new file mode 100644
index 0000000..5b13703
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java
@@ -0,0 +1,99 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSetPartitionsToErrorState extends ZkTestBase {
+
+  @Test()
+  public void testSetPartitionsToErrorState() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start mock participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    // verify cluster
+    HashMap<String, Map<String, String>> errStateMap = new HashMap<>();
+    errStateMap.put("TestDB0", new HashMap<>());
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+
+    // set a non exist partition to ERROR, should throw exception
+    try {
+      String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+          + " localhost_12918 TestDB0 TestDB0_nonExist";
+      ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+      Assert.fail("Should throw exception on setting a non-exist partition to error");
+    } catch (Exception e) {
+      // OK
+    }
+
+    // set one partition not in ERROR state to ERROR
+    String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+        + " localhost_12918 TestDB0 TestDB0_4";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+    result = ClusterStateVerifier.verifyByZkCallback(
+        (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+
+    // set another partition not in ERROR state to ERROR
+    command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+        + " localhost_12918 TestDB0 TestDB0_7";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    errStateMap.get("TestDB0").put("TestDB0_7", "localhost_12918");
+    result = ClusterStateVerifier.verifyByZkCallback(
+        (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+
+    // setting a partition already in ERROR state to ERROR - message does not get processed
+    command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+        + " localhost_12918 TestDB0 TestDB0_7";
+    ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+    result = ClusterStateVerifier.verifyByZkCallback(
+        (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStateMap)));
+    Assert.assertTrue(result, "Cluster verification fails");
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+    deleteCluster(clusterName);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 59decd9..5581108 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -589,6 +589,117 @@
         2);
   }
 
+  @Test(description = "Unit test for sanity check in setPartitionsToError()")
+  public void testSetPartitionsToError() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    String instanceName = "TestInstance";
+    String testResource = "TestResource";
+    String wrongTestInstance = "WrongTestInstance";
+    String wrongTestResource = "WrongTestResource";
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    admin.addInstance(clusterName, new InstanceConfig(instanceName));
+    admin.enableInstance(clusterName, instanceName, true);
+    InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName);
+
+    IdealState idealState = new IdealState(testResource);
+    idealState.setNumPartitions(3);
+    admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+    idealState.setStateModelDefRef("MasterSlave");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    admin.addResource(clusterName, testResource, idealState);
+    admin.enableResource(clusterName, testResource, true);
+
+    /*
+     * This is a unit test for sanity check in setPartitionsToError().
+     * There is no running controller in this test. We have end-to-end tests for
+     * setPartitionsToError()
+     * under integration/TestSetPartitionsToError.
+     */
+    // setPartitionsToError is expected to throw an exception when provided with a nonexistent
+    // instance.
+    try {
+      admin.setPartitionsToError(clusterName, wrongTestInstance, testResource,
+          Arrays.asList("1", "2"));
+      Assert.fail("Should throw HelixException");
+    } catch (HelixException expected) {
+      // This exception is expected because the instance name is made up.
+      Assert.assertEquals(expected.getMessage(), String.format(
+          "Can't SET_TO_ERROR state for %s.[1, 2] on WrongTestInstance, because %s does not exist in cluster %s",
+          testResource, wrongTestInstance, clusterName));
+    }
+
+    // setPartitionsToError is expected to throw an exception when provided with a non-live
+    // instance.
+    try {
+      admin.setPartitionsToError(clusterName, instanceName, testResource, Arrays.asList("1", "2"));
+      Assert.fail("Should throw HelixException");
+    } catch (HelixException expected) {
+      // This exception is expected because the instance is not alive.
+      Assert.assertEquals(expected.getMessage(),
+          String.format(
+              "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because %s is not alive in cluster %s",
+              testResource, instanceName, instanceName, clusterName));
+    }
+
+    HelixManager manager = initializeHelixManager(clusterName, instanceConfig.getInstanceName());
+    manager.connect();
+
+    // setPartitionsToError is expected to throw an exception when provided with a nonexistent
+    // resource.
+    try {
+      admin.setPartitionsToError(clusterName, instanceName, wrongTestResource,
+          Arrays.asList("1", "2"));
+      Assert.fail("Should throw HelixException");
+    } catch (HelixException expected) {
+      // This exception is expected because the resource is not added.
+      Assert.assertEquals(expected.getMessage(), String.format(
+          "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because resource %s is not added to cluster %s",
+          wrongTestResource, instanceName, wrongTestResource, clusterName));
+    }
+
+    // setPartitionsToError is expected to throw an exception when partition does not exist.
+    try {
+      admin.setPartitionsToError(clusterName, instanceName, testResource, Arrays.asList("1", "2"));
+      Assert.fail("Should throw HelixException");
+    } catch (HelixException expected) {
+      // This exception is expected because partitions do not exist.
+      Assert.assertEquals(expected.getMessage(), String.format(
+          "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because not all [1, 2] exist in cluster %s",
+          testResource, instanceName, clusterName));
+    }
+
+    // clean up
+    manager.disconnect();
+    admin.dropCluster(clusterName);
+
+    // verify the cluster has been removed successfully
+    HelixDataAccessor dataAccessor =
+        new ZKHelixDataAccessor(className, new ZkBaseDataAccessor<>(_gZkClient));
+    try {
+      Assert.assertTrue(TestHelper.verify(
+          () -> dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()).isEmpty(),
+          1000));
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println("There're live instances not cleaned up yet");
+      assert false;
+    }
+
+    try {
+      Assert.assertTrue(TestHelper.verify(
+          () -> dataAccessor.getChildNames(dataAccessor.keyBuilder().clusterConfig()).isEmpty(),
+          1000));
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println("The cluster is not cleaned up yet");
+      assert false;
+    }
+  }
+
   @Test
   public void testResetPartition() throws Exception {
     String className = TestHelper.getTestClassName();
@@ -625,7 +736,7 @@
     } catch (HelixException expected) {
       // This exception is expected because the instance name is made up.
       Assert.assertEquals(expected.getMessage(), String.format(
-          "Can't reset state for %s.[1, 2] on WrongTestInstance, because %s does not exist in cluster %s",
+          "Can't RESET state for %s.[1, 2] on WrongTestInstance, because %s does not exist in cluster %s",
           testResource, wrongTestInstance, clusterName));
     }
 
@@ -636,7 +747,7 @@
     } catch (HelixException expected) {
       // This exception is expected because the instance is not alive.
       Assert.assertEquals(expected.getMessage(), String
-          .format("Can't reset state for %s.[1, 2] on %s, because %s is not alive in cluster %s",
+          .format("Can't RESET state for %s.[1, 2] on %s, because %s is not alive in cluster %s",
               testResource, instanceName, instanceName, clusterName));
     }
 
@@ -650,7 +761,7 @@
     } catch (HelixException expected) {
       // This exception is expected because the resource is not added.
       Assert.assertEquals(expected.getMessage(), String.format(
-          "Can't reset state for %s.[1, 2] on %s, because resource %s is not added to cluster %s",
+          "Can't RESET state for %s.[1, 2] on %s, because resource %s is not added to cluster %s",
           wrongTestResource, instanceName, wrongTestResource, clusterName));
     }
 
@@ -660,7 +771,7 @@
     } catch (HelixException expected) {
       // This exception is expected because partitions do not exist.
       Assert.assertEquals(expected.getMessage(), String.format(
-          "Can't reset state for %s.[1, 2] on %s, because not all [1, 2] exist in cluster %s",
+          "Can't RESET state for %s.[1, 2] on %s, because not all [1, 2] exist in cluster %s",
           testResource, instanceName, clusterName));
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 9a1311b..d9bc5d7 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -364,6 +364,12 @@
     return null;
   }
 
+  @Override
+  public void setPartitionsToError(String clusterName, String instanceName, String resourceName,
+      List<String> partitionNames) {
+
+  }
+
   @Override public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index ce3d272..fdad634 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -89,7 +89,8 @@
     canCompleteSwap,
     completeSwapIfPossible,
     onDemandRebalance,
-    isEvacuateFinished
+    isEvacuateFinished,
+    setPartitionsToError
   }
 
   @Context
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index efeeee7..ea98f66 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -434,6 +434,16 @@
                   OBJECT_MAPPER.getTypeFactory()
                       .constructCollectionType(List.class, String.class)));
           break;
+        case setPartitionsToError:
+          if (!validInstance(node, instanceName)) {
+            return badRequest("Instance names are not a match!");
+          }
+          admin.setPartitionsToError(clusterId, instanceName,
+              node.get(PerInstanceProperties.resource.name()).textValue(),
+              (List<String>) OBJECT_MAPPER.readValue(
+                  node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER
+                      .getTypeFactory().constructCollectionType(List.class, String.class)));
+          break;
         case setInstanceOperation:
           admin.setInstanceOperation(clusterId, instanceName, state);
           break;
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 943444c..395f9bf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -37,11 +37,13 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
@@ -377,7 +379,7 @@
   }
 
   @Test(dependsOnMethods = "testDeleteInstance")
-  public void updateInstance() throws IOException {
+  public void updateInstance() throws Exception {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     // Disable instance
     Entity entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
@@ -461,11 +463,11 @@
     String dbName = "_db_0_";
     List<String> partitionsToDisable = Arrays.asList(CLUSTER_NAME + dbName + "0",
         CLUSTER_NAME + dbName + "1", CLUSTER_NAME + dbName + "3");
+    String RESOURCE_NAME = CLUSTER_NAME + dbName.substring(0, dbName.length() - 1);
 
     entity = Entity.entity(
         OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
-            INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(),
-            CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
+            INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME,
             PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToDisable)),
         MediaType.APPLICATION_JSON_TYPE);
 
@@ -474,13 +476,11 @@
 
     InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
     Assert.assertEquals(
-        new HashSet<>(instanceConfig.getDisabledPartitionsMap()
-            .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))),
+        new HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)),
         new HashSet<>(partitionsToDisable));
     entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(ImmutableMap
         .of(AbstractResource.Properties.id.name(), INSTANCE_NAME,
-            PerInstanceAccessor.PerInstanceProperties.resource.name(),
-            CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
+            PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME,
             PerInstanceAccessor.PerInstanceProperties.partitions.name(),
             ImmutableList.of(CLUSTER_NAME + dbName + "1"))), MediaType.APPLICATION_JSON_TYPE);
 
@@ -488,8 +488,7 @@
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
 
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
-    Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap()
-            .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))),
+    Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)),
         new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "3")));
 
     // test set instance operation
@@ -595,6 +594,32 @@
     evacuateFinishedResult = OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class);
     Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
     Assert.assertTrue(evacuateFinishedResult.get("successful"));
+
+    // test setPartitionsToError
+    List<String> partitionsToSetToError = Arrays.asList(CLUSTER_NAME + dbName + "7");
+
+    entity = Entity.entity(
+        OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
+            INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME,
+            PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToSetToError)),
+        MediaType.APPLICATION_JSON_TYPE);
+
+    response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setPartitionsToError")
+        .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
+
+    Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+
+    TestHelper.verify(() -> {
+      ExternalView externalView = _gSetupTool.getClusterManagementTool()
+          .getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
+      Set responseForAllPartitions = new HashSet();
+      for (String partition : partitionsToSetToError) {
+        responseForAllPartitions.add(externalView.getStateMap(partition)
+            .get(INSTANCE_NAME) == HelixDefinedState.ERROR.toString());
+      }
+      return !responseForAllPartitions.contains(Boolean.FALSE);
+    }, TestHelper.WAIT_DURATION);
+
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }