| package org.apache.helix.rest.server.resources.helix; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.Response; |
| |
| import com.codahale.metrics.annotation.ResponseMetered; |
| import com.codahale.metrics.annotation.Timed; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import com.fasterxml.jackson.databind.node.JsonNodeFactory; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.PropertyPathBuilder; |
| import org.apache.helix.model.CustomizedView; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.ResourceConfig; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.rest.common.HttpConstants; |
| import org.apache.helix.rest.server.filters.ClusterAuth; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @ClusterAuth |
| @Path("/clusters/{clusterId}/resources") |
| public class ResourceAccessor extends AbstractHelixResource { |
| private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class); |
| |
| public enum ResourceProperties { |
| idealState, |
| idealStates, |
| externalView, |
| externalViews, |
| resourceConfig, |
| } |
| |
| public enum HealthStatus { |
| HEALTHY, |
| PARTIAL_HEALTHY, |
| UNHEALTHY |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| public Response getResources(@PathParam("clusterId") String clusterId) { |
| ObjectNode root = JsonNodeFactory.instance.objectNode(); |
| root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId)); |
| |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| |
| ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name()); |
| ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name()); |
| |
| List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId)); |
| List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId)); |
| |
| if (idealStates != null) { |
| idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates)); |
| } else { |
| return notFound(); |
| } |
| |
| if (externalViews != null) { |
| externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews)); |
| } |
| |
| return JSONRepresentation(root); |
| } |
| |
| /** |
| * Returns health profile of all resources in the cluster |
| * @param clusterId |
| * @return JSON result |
| */ |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("health") |
| public Response getResourceHealth(@PathParam("clusterId") String clusterId) { |
| |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| |
| List<String> resourcesInIdealState = |
| zkClient.getChildren(PropertyPathBuilder.idealState(clusterId)); |
| List<String> resourcesInExternalView = |
| zkClient.getChildren(PropertyPathBuilder.externalView(clusterId)); |
| |
| Map<String, String> resourceHealthResult = new HashMap<>(); |
| |
| for (String resourceName : resourcesInIdealState) { |
| if (resourcesInExternalView.contains(resourceName)) { |
| Map<String, String> partitionHealth = computePartitionHealth(clusterId, resourceName); |
| |
| if (partitionHealth.isEmpty() |
| || partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) { |
| // No partitions for a resource or there exists one or more UNHEALTHY partitions in this |
| // resource, UNHEALTHY |
| resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name()); |
| } else if (partitionHealth.values().contains(HealthStatus.PARTIAL_HEALTHY.name())) { |
| // No UNHEALTHY partition, but one or more partially healthy partitions, resource is |
| // partially healthy |
| resourceHealthResult.put(resourceName, HealthStatus.PARTIAL_HEALTHY.name()); |
| } else { |
| // No UNHEALTHY or partially healthy partitions and non-empty, resource is healthy |
| resourceHealthResult.put(resourceName, HealthStatus.HEALTHY.name()); |
| } |
| } else { |
| // If a resource is not in ExternalView, then it is UNHEALTHY |
| resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name()); |
| } |
| } |
| |
| return JSONRepresentation(resourceHealthResult); |
| } |
| |
| /** |
| * Returns health profile of all partitions for the corresponding resource in the cluster |
| * @param clusterId |
| * @param resourceName |
| * @return JSON result |
| * @throws IOException |
| */ |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}/health") |
| public Response getPartitionHealth(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName) { |
| |
| return JSONRepresentation(computePartitionHealth(clusterId, resourceName)); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}") |
| public Response getResource(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, |
| @DefaultValue("getResource") @QueryParam("command") String command) { |
| // Get the command. If not provided, the default would be "getResource" |
| Command cmd; |
| try { |
| cmd = Command.valueOf(command); |
| } catch (Exception e) { |
| return badRequest("Invalid command : " + command); |
| } |
| ConfigAccessor accessor = getConfigAccessor(); |
| HelixAdmin admin = getHelixAdmin(); |
| |
| switch (cmd) { |
| case getResource: |
| ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); |
| IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); |
| ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); |
| |
| Map<String, ZNRecord> resourceMap = new HashMap<>(); |
| if (idealState != null) { |
| resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord()); |
| } else { |
| return notFound(); |
| } |
| |
| resourceMap.put(ResourceProperties.resourceConfig.name(), null); |
| resourceMap.put(ResourceProperties.externalView.name(), null); |
| |
| if (resourceConfig != null) { |
| resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord()); |
| } |
| |
| if (externalView != null) { |
| resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord()); |
| } |
| return JSONRepresentation(resourceMap); |
| case validateWeight: |
| // Validate ResourceConfig for WAGED rebalance |
| Map<String, Boolean> validationResultMap; |
| try { |
| validationResultMap = admin.validateResourcesForWagedRebalance(clusterId, |
| Collections.singletonList(resourceName)); |
| } catch (HelixException e) { |
| return badRequest(e.getMessage()); |
| } |
| return JSONRepresentation(validationResultMap); |
| default: |
| _logger.error("Unsupported command :" + command); |
| return badRequest("Unsupported command :" + command); |
| } |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{resourceName}") |
| public Response addResource(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, |
| @DefaultValue("-1") @QueryParam("numPartitions") int numPartitions, |
| @DefaultValue("") @QueryParam("stateModelRef") String stateModelRef, |
| @DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode, |
| @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy, |
| @DefaultValue("0") @QueryParam("bucketSize") int bucketSize, |
| @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance, |
| @DefaultValue("addResource") @QueryParam("command") String command, String content) { |
| // Get the command. If not provided, the default would be "addResource" |
| Command cmd; |
| try { |
| cmd = Command.valueOf(command); |
| } catch (Exception e) { |
| return badRequest("Invalid command : " + command); |
| } |
| HelixAdmin admin = getHelixAdmin(); |
| try { |
| switch (cmd) { |
| case addResource: |
| if (content.length() != 0) { |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| if (record.getSimpleFields() != null) { |
| admin.addResource(clusterId, resourceName, new IdealState(record)); |
| } |
| } else { |
| admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode, |
| rebalanceStrategy, bucketSize, maxPartitionsPerInstance); |
| } |
| break; |
| case addWagedResource: |
| // Check if content is valid |
| if (content == null || content.length() == 0) { |
| _logger.error("Input is null or empty!"); |
| return badRequest("Input is null or empty!"); |
| } |
| Map<String, ZNRecord> input; |
| // Content must supply both IdealState and ResourceConfig |
| try { |
| TypeReference<Map<String, ZNRecord>> typeRef = |
| new TypeReference<Map<String, ZNRecord>>() { |
| }; |
| input = ZNRECORD_READER.forType(typeRef).readValue(content); |
| } catch (IOException e) { |
| _logger.error("Failed to deserialize user's input {}, Exception: {}", content, e); |
| return badRequest("Input is not a valid map of String-ZNRecord pairs!"); |
| } |
| // Check if the map contains both IdealState and ResourceConfig |
| ZNRecord idealStateRecord = |
| input.get(ResourceAccessor.ResourceProperties.idealState.name()); |
| ZNRecord resourceConfigRecord = |
| input.get(ResourceAccessor.ResourceProperties.resourceConfig.name()); |
| |
| if (idealStateRecord == null || resourceConfigRecord == null) { |
| _logger.error("Input does not contain both IdealState and ResourceConfig!"); |
| return badRequest("Input does not contain both IdealState and ResourceConfig!"); |
| } |
| // Add using HelixAdmin API |
| try { |
| admin.addResourceWithWeight(clusterId, new IdealState(idealStateRecord), |
| new ResourceConfig(resourceConfigRecord)); |
| } catch (HelixException e) { |
| String errMsg = String.format("Failed to add resource %s with weight in cluster %s!", |
| idealStateRecord.getId(), clusterId); |
| _logger.error(errMsg, e); |
| return badRequest(errMsg); |
| } |
| break; |
| default: |
| _logger.error("Unsupported command :" + command); |
| return badRequest("Unsupported command :" + command); |
| } |
| } catch (Exception e) { |
| _logger.error("Error in adding a resource: " + resourceName, e); |
| return serverError(e); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{resourceName}") |
| public Response updateResource(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, @QueryParam("command") String command, |
| @DefaultValue("-1") @QueryParam("replicas") int replicas, |
| @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix, |
| @DefaultValue("") @QueryParam("group") String group) { |
| Command cmd; |
| try { |
| cmd = Command.valueOf(command); |
| } catch (Exception e) { |
| return badRequest("Invalid command : " + command); |
| } |
| |
| HelixAdmin admin = getHelixAdmin(); |
| try { |
| switch (cmd) { |
| case enable: |
| admin.enableResource(clusterId, resourceName, true); |
| break; |
| case disable: |
| admin.enableResource(clusterId, resourceName, false); |
| break; |
| case rebalance: |
| if (replicas == -1) { |
| return badRequest("Number of replicas is needed for rebalancing!"); |
| } |
| keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix; |
| admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group); |
| break; |
| case enableWagedRebalance: |
| try { |
| admin.enableWagedRebalance(clusterId, Collections.singletonList(resourceName)); |
| } catch (HelixException e) { |
| return badRequest(e.getMessage()); |
| } |
| break; |
| default: |
| _logger.error("Unsupported command :" + command); |
| return badRequest("Unsupported command :" + command); |
| } |
| } catch (Exception e) { |
| _logger.error("Failed in updating resource : " + resourceName, e); |
| return badRequest(e.getMessage()); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{resourceName}") |
| public Response deleteResource(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName) { |
| HelixAdmin admin = getHelixAdmin(); |
| try { |
| admin.dropResource(clusterId, resourceName); |
| } catch (Exception e) { |
| _logger.error("Error in deleting a resource: " + resourceName, e); |
| return serverError(); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}/configs") |
| public Response getResourceConfig(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName) { |
| ConfigAccessor accessor = getConfigAccessor(); |
| ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName); |
| if (resourceConfig != null) { |
| return JSONRepresentation(resourceConfig.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{resourceName}/configs") |
| public Response updateResourceConfig(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, @QueryParam("command") String commandStr, |
| String content) { |
| Command command; |
| if (commandStr == null || commandStr.isEmpty()) { |
| command = Command.update; // Default behavior to keep it backward-compatible |
| } else { |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| } |
| |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| if (!resourceName.equals(record.getId())) { |
| return badRequest("ID does not match the resourceName name in input!"); |
| } |
| |
| ResourceConfig resourceConfig = new ResourceConfig(record); |
| ConfigAccessor configAccessor = getConfigAccessor(); |
| try { |
| switch (command) { |
| case update: |
| configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig); |
| break; |
| case delete: |
| HelixConfigScope resourceScope = |
| new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE) |
| .forCluster(clusterId).forResource(resourceName).build(); |
| configAccessor.remove(resourceScope, record); |
| break; |
| default: |
| return badRequest(String.format("Unsupported command: %s", command)); |
| } |
| } catch (HelixException ex) { |
| return notFound(ex.getMessage()); |
| } catch (Exception ex) { |
| _logger.error(String.format("Error in update resource config for resource: %s", resourceName), |
| ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}/idealState") |
| public Response getResourceIdealState(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName) { |
| HelixAdmin admin = getHelixAdmin(); |
| IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); |
| if (idealState != null) { |
| return JSONRepresentation(idealState.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{resourceName}/idealState") |
| public Response updateResourceIdealState(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, @QueryParam("command") String commandStr, |
| String content) { |
| Command command; |
| if (commandStr == null || commandStr.isEmpty()) { |
| command = Command.update; // Default behavior is update |
| } else { |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| } |
| |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| IdealState idealState = new IdealState(record); |
| HelixAdmin helixAdmin = getHelixAdmin(); |
| try { |
| switch (command) { |
| case update: |
| helixAdmin.updateIdealState(clusterId, resourceName, idealState); |
| break; |
| case delete: { |
| helixAdmin.removeFromIdealState(clusterId, resourceName, idealState); |
| } |
| break; |
| default: |
| return badRequest(String.format("Unsupported command: %s", command)); |
| } |
| } catch (HelixException ex) { |
| return notFound(ex.getMessage()); // HelixAdmin throws a HelixException if it doesn't |
| // exist already |
| } catch (Exception ex) { |
| _logger.error(String.format("Failed to update the IdealState for resource: %s", resourceName), |
| ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}/externalView") |
| public Response getResourceExternalView(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName) { |
| HelixAdmin admin = getHelixAdmin(); |
| ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); |
| if (externalView != null) { |
| return JSONRepresentation(externalView.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{resourceName}/{customizedStateType}/customizedView") |
| public Response getResourceCustomizedView(@PathParam("clusterId") String clusterId, |
| @PathParam("resourceName") String resourceName, |
| @PathParam("customizedStateType") String customizedStateType) { |
| HelixAdmin admin = getHelixAdmin(); |
| CustomizedView customizedView = |
| admin.getResourceCustomizedView(clusterId, resourceName, customizedStateType); |
| if (customizedView != null) { |
| return JSONRepresentation(customizedView.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| private Map<String, String> computePartitionHealth(String clusterId, String resourceName) { |
| HelixAdmin admin = getHelixAdmin(); |
| IdealState idealState = admin.getResourceIdealState(clusterId, resourceName); |
| ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName); |
| StateModelDefinition stateModelDef = |
| admin.getStateModelDef(clusterId, idealState.getStateModelDefRef()); |
| String initialState = stateModelDef.getInitialState(); |
| List<String> statesPriorityList = stateModelDef.getStatesPriorityList(); |
| statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim |
| // stateList |
| // to |
| // initialState |
| // and |
| // above |
| int minActiveReplicas = idealState.getMinActiveReplicas(); |
| |
| // Start the logic that determines the health status of each partition |
| Map<String, String> partitionHealthResult = new HashMap<>(); |
| Set<String> allPartitionNames = idealState.getPartitionSet(); |
| if (!allPartitionNames.isEmpty()) { |
| for (String partitionName : allPartitionNames) { |
| int replicaCount = |
| idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size()); |
| // Simplify expectedStateCountMap by assuming that all instances are available to reduce |
| // computation load on this REST endpoint |
| LinkedHashMap<String, Integer> expectedStateCountMap = |
| stateModelDef.getStateCountMap(replicaCount, replicaCount); |
| // Extract all states into Collections from ExternalView |
| Map<String, String> stateMapInExternalView = externalView.getStateMap(partitionName); |
| Collection<String> allReplicaStatesInExternalView = |
| (stateMapInExternalView != null && !stateMapInExternalView.isEmpty()) |
| ? stateMapInExternalView.values() |
| : Collections.<String> emptyList(); |
| int numActiveReplicasInExternalView = 0; |
| HealthStatus status = HealthStatus.HEALTHY; |
| |
| // Go through all states that are "active" states (higher priority than InitialState) |
| for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList |
| .size(); statePriorityIndex++) { |
| String currentState = statesPriorityList.get(statePriorityIndex); |
| int currentStateCountInIdealState = expectedStateCountMap.get(currentState); |
| int currentStateCountInExternalView = |
| Collections.frequency(allReplicaStatesInExternalView, currentState); |
| numActiveReplicasInExternalView += currentStateCountInExternalView; |
| // Top state counts must match, if not, unhealthy |
| if (statePriorityIndex == 0 |
| && currentStateCountInExternalView != currentStateCountInIdealState) { |
| status = HealthStatus.UNHEALTHY; |
| break; |
| } else if (currentStateCountInExternalView < currentStateCountInIdealState) { |
| // For non-top states, if count in ExternalView is less than count in IdealState, |
| // partially healthy |
| status = HealthStatus.PARTIAL_HEALTHY; |
| } |
| } |
| if (numActiveReplicasInExternalView < minActiveReplicas) { |
| // If this partition does not satisfy the number of minimum active replicas, unhealthy |
| status = HealthStatus.UNHEALTHY; |
| } |
| partitionHealthResult.put(partitionName, status.name()); |
| } |
| } |
| return partitionHealthResult; |
| } |
| } |