HDDS-7192. EC: ReplicationManager - create handlers to perform various container checks (#3743)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
new file mode 100644
index 0000000..0433bd5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * Simple class to wrap the parameters needed to check a container's health
+ * in ReplicationManager.
+ */
+public final class ContainerCheckRequest {
+
+ private final ContainerInfo containerInfo;
+ private final Set<ContainerReplica> containerReplicas;
+ private final List<ContainerReplicaOp> pendingOps;
+ private final int maintenanceRedundancy;
+ private final ReplicationManagerReport report;
+ private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ underRepQueue;
+ private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
+ overRepQueue;
+
+
+ private ContainerCheckRequest(Builder builder) {
+ this.containerInfo = builder.containerInfo;
+ this.containerReplicas = builder.containerReplicas;
+ this.pendingOps = builder.pendingOps;
+ this.maintenanceRedundancy = builder.maintenanceRedundancy;
+ this.report = builder.report;
+ this.overRepQueue = builder.overRepQueue;
+ this.underRepQueue = builder.underRepQueue;
+ }
+
+ public List<ContainerReplicaOp> getPendingOps() {
+ return pendingOps;
+ }
+
+ public int getMaintenanceRedundancy() {
+ return maintenanceRedundancy;
+ }
+
+ public Set<ContainerReplica> getContainerReplicas() {
+ return containerReplicas;
+ }
+
+ public ContainerInfo getContainerInfo() {
+ return containerInfo;
+ }
+
+ public ReplicationManagerReport getReport() {
+ return report;
+ }
+
+ public Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ getUnderRepQueue() {
+ return underRepQueue;
+ }
+
+ public Queue<ContainerHealthResult.OverReplicatedHealthResult>
+ getOverRepQueue() {
+ return overRepQueue;
+ }
+
+ /**
+ * Builder class for ContainerCheckRequest.
+ */
+ public static class Builder {
+
+ private ContainerInfo containerInfo;
+ private Set<ContainerReplica> containerReplicas;
+ private List<ContainerReplicaOp> pendingOps;
+ private int maintenanceRedundancy;
+ private ReplicationManagerReport report;
+ private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ underRepQueue;
+ private Queue<ContainerHealthResult.OverReplicatedHealthResult>
+ overRepQueue;
+
+ public Builder setContainerInfo(ContainerInfo containerInfo) {
+ this.containerInfo = containerInfo;
+ return this;
+ }
+
+ public Builder setContainerReplicas(
+ Set<ContainerReplica> containerReplicas) {
+ this.containerReplicas = containerReplicas;
+ return this;
+ }
+
+ public Builder setPendingOps(List<ContainerReplicaOp> pendingOps) {
+ this.pendingOps = pendingOps;
+ return this;
+ }
+
+ public Builder setMaintenanceRedundancy(int maintenanceRedundancy) {
+ this.maintenanceRedundancy = maintenanceRedundancy;
+ return this;
+ }
+
+ public Builder setUnderRepQueue(
+ Queue<ContainerHealthResult.UnderReplicatedHealthResult> queue) {
+ this.underRepQueue = queue;
+ return this;
+ }
+
+ public Builder setOverRepQueue(
+ Queue<ContainerHealthResult.OverReplicatedHealthResult> queue) {
+ this.overRepQueue = queue;
+ return this;
+ }
+
+ public Builder setReport(ReplicationManagerReport report) {
+ this.report = report;
+ return this;
+ }
+
+ public ContainerCheckRequest build() {
+ return new ContainerCheckRequest(this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
deleted file mode 100644
index 5d14d04..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthCheck.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Interface used by ReplicationManager to check if containers are healthy or
- * not.
- */
-public interface ContainerHealthCheck {
-
- ContainerHealthResult checkHealth(
- ContainerInfo container, Set<ContainerReplica> replicas,
- List<ContainerReplicaOp> replicaPendingOps,
- int remainingRedundancyForMaintenance);
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index 70ec74b..de29029 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -41,7 +41,8 @@
private final HealthState healthState;
private final List<SCMCommand> commands = new ArrayList<>();
- ContainerHealthResult(ContainerInfo containerInfo, HealthState healthState) {
+ public ContainerHealthResult(ContainerInfo containerInfo,
+ HealthState healthState) {
this.containerInfo = containerInfo;
this.healthState = healthState;
}
@@ -67,7 +68,7 @@
*/
public static class HealthyResult extends ContainerHealthResult {
- HealthyResult(ContainerInfo containerInfo) {
+ public HealthyResult(ContainerInfo containerInfo) {
super(containerInfo, HealthState.HEALTHY);
}
}
@@ -108,7 +109,7 @@
private final boolean unrecoverable;
private int requeueCount = 0;
- UnderReplicatedHealthResult(ContainerInfo containerInfo,
+ public UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission,
boolean replicatedOkWithPending, boolean unrecoverable) {
super(containerInfo, HealthState.UNDER_REPLICATED);
@@ -207,7 +208,7 @@
private final boolean sufficientlyReplicatedAfterPending;
- OverReplicatedHealthResult(ContainerInfo containerInfo,
+ public OverReplicatedHealthResult(ContainerInfo containerInfo,
int excessRedundancy, boolean replicatedOkWithPending) {
super(containerInfo, HealthState.OVER_REPLICATED);
this.excessRedundancy = excessRedundancy;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
deleted file mode 100644
index 092cc13..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerHealthCheck.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Class to determine the health state of an EC Container. Given the container
- * and current replica details, along with replicas pending add and delete,
- * this class will return a ContainerHealthResult indicating if the container
- * is healthy, or under / over replicated etc.
- *
- * For EC Containers, it is possible for a container to be both under and over
- * replicated, if there are multiple copies of one index and no copies of
- * another. This class only returns a single status, keeping the container in a
- * single health state at any given time. Under replicated is a more serious
- * state than over replicated, so it will take precedence over any over
- * replication.
- */
-public class ECContainerHealthCheck implements ContainerHealthCheck {
-
- // TODO - mis-replicated containers are not yet handled.
- // TODO - should this class handle empty / deleting containers, or would it
- // be better handled elsewhere?
-
- @Override
- public ContainerHealthResult checkHealth(ContainerInfo container,
- Set<ContainerReplica> replicas,
- List<ContainerReplicaOp> replicaPendingOps,
- int remainingRedundancyForMaintenance) {
- ECContainerReplicaCount replicaCount =
- new ECContainerReplicaCount(container, replicas, replicaPendingOps,
- remainingRedundancyForMaintenance);
-
- ECReplicationConfig repConfig =
- (ECReplicationConfig) container.getReplicationConfig();
-
- if (!replicaCount.isSufficientlyReplicated(false)) {
- List<Integer> missingIndexes = replicaCount.unavailableIndexes(false);
- int remainingRedundancy = repConfig.getParity();
- boolean dueToDecommission = true;
- if (missingIndexes.size() > 0) {
- // The container has reduced redundancy and will need reconstructed
- // via an EC reconstruction command. Note that it may also have some
- // replicas in decommission / maintenance states, but as the under
- // replication is not caused only by decommission, we say it is not
- // due to decommission/
- dueToDecommission = false;
- remainingRedundancy = repConfig.getParity() - missingIndexes.size();
- }
- return new ContainerHealthResult.UnderReplicatedHealthResult(
- container, remainingRedundancy, dueToDecommission,
- replicaCount.isSufficientlyReplicated(true),
- replicaCount.isUnrecoverable());
- }
-
- if (replicaCount.isOverReplicated(false)) {
- List<Integer> overRepIndexes = replicaCount.overReplicatedIndexes(false);
- return new ContainerHealthResult
- .OverReplicatedHealthResult(container, overRepIndexes.size(),
- !replicaCount.isOverReplicated(true));
- }
-
- // No issues detected, so return healthy.
- return new ContainerHealthResult.HealthyResult(container);
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
index 98dda59..4fba8be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECOverReplicationHandler.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -47,14 +48,15 @@
public static final Logger LOG =
LoggerFactory.getLogger(ECOverReplicationHandler.class);
- private final ECContainerHealthCheck ecContainerHealthCheck =
- new ECContainerHealthCheck();
+ private final ECReplicationCheckHandler ecReplicationCheck;
private final NodeManager nodeManager;
- public ECOverReplicationHandler(PlacementPolicy placementPolicy,
- NodeManager nodeManager) {
+ public ECOverReplicationHandler(ECReplicationCheckHandler ecReplicationCheck,
+ PlacementPolicy placementPolicy, NodeManager nodeManager) {
super(placementPolicy);
+ this.ecReplicationCheck = ecReplicationCheck;
this.nodeManager = nodeManager;
+
}
/**
@@ -74,9 +76,15 @@
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
ContainerHealthResult result, int remainingMaintenanceRedundancy) {
ContainerInfo container = result.getContainerInfo();
- ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck
- .checkHealth(container, replicas, pendingOps,
- remainingMaintenanceRedundancy);
+
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setContainerInfo(container)
+ .setContainerReplicas(replicas)
+ .setPendingOps(pendingOps)
+ .setMaintenanceRedundancy(remainingMaintenanceRedundancy)
+ .build();
+ ContainerHealthResult currentUnderRepRes = ecReplicationCheck
+ .checkHealth(request);
LOG.debug("Handling over-replicated EC container: {}", container);
//sanity check
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 9b8014f..5feb160 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
@@ -57,15 +58,15 @@
public static final Logger LOG =
LoggerFactory.getLogger(ECUnderReplicationHandler.class);
- private final ECContainerHealthCheck ecContainerHealthCheck =
- new ECContainerHealthCheck();
+ private final ECReplicationCheckHandler ecReplicationCheck;
private final PlacementPolicy containerPlacement;
private final long currentContainerSize;
private final NodeManager nodeManager;
- public ECUnderReplicationHandler(
+ public ECUnderReplicationHandler(ECReplicationCheckHandler ecReplicationCheck,
final PlacementPolicy containerPlacement, final ConfigurationSource conf,
- NodeManager nodeManager) {
+ NodeManager nodeManager) {
+ this.ecReplicationCheck = ecReplicationCheck;
this.containerPlacement = containerPlacement;
this.currentContainerSize = (long) conf
.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -102,9 +103,15 @@
new ECContainerReplicaCount(container, replicas, pendingOps,
remainingMaintenanceRedundancy);
- ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck
- .checkHealth(container, replicas, pendingOps,
- remainingMaintenanceRedundancy);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setContainerInfo(container)
+ .setContainerReplicas(replicas)
+ .setPendingOps(pendingOps)
+ .setMaintenanceRedundancy(remainingMaintenanceRedundancy)
+ .build();
+
+ ContainerHealthResult currentUnderRepRes = ecReplicationCheck
+ .checkHealth(request);
LOG.debug("Handling under-replicated EC container: {}", container);
if (currentUnderRepRes
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 3017a71..699d523 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -36,7 +35,10 @@
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
-import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState;
+import org.apache.hadoop.hdds.scm.container.replication.health.ClosedWithMismatchedReplicasHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
+import org.apache.hadoop.hdds.scm.container.replication.health.HealthCheck;
+import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -57,7 +59,6 @@
import java.time.Clock;
import java.time.Duration;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -69,7 +70,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
@@ -143,7 +143,7 @@
private long lastTimeToBeReadyInMillis = 0;
private final Clock clock;
private final ContainerReplicaPendingOps containerReplicaPendingOps;
- private final ContainerHealthCheck ecContainerHealthCheck;
+ private final ECReplicationCheckHandler ecReplicationCheckHandler;
private final EventPublisher eventPublisher;
private final ReentrantLock lock = new ReentrantLock();
private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
@@ -157,6 +157,7 @@
private Thread overReplicatedProcessorThread;
private final UnderReplicatedProcessor underReplicatedProcessor;
private final OverReplicatedProcessor overReplicatedProcessor;
+ private final HealthCheck containerCheckChain;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -191,21 +192,28 @@
TimeUnit.MILLISECONDS);
this.containerReplicaPendingOps = replicaPendingOps;
this.legacyReplicationManager = legacyReplicationManager;
- this.ecContainerHealthCheck = new ECContainerHealthCheck();
+ this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
this.nodeManager = nodeManager;
this.underRepQueue = createUnderReplicatedQueue();
this.overRepQueue = new LinkedList<>();
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
ecUnderReplicationHandler = new ECUnderReplicationHandler(
- containerPlacement, conf, nodeManager);
+ ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
ecOverReplicationHandler =
- new ECOverReplicationHandler(containerPlacement, nodeManager);
+ new ECOverReplicationHandler(ecReplicationCheckHandler,
+ containerPlacement, nodeManager);
underReplicatedProcessor =
new UnderReplicatedProcessor(this, containerReplicaPendingOps,
eventPublisher, rmConf.getUnderReplicatedInterval());
overReplicatedProcessor =
new OverReplicatedProcessor(this, containerReplicaPendingOps,
eventPublisher, rmConf.getOverReplicatedInterval());
+
+ // Chain together the series of checks that are needed to validate the
+ // containers when they are checked by RM.
+ containerCheckChain = new OpenContainerHandler(this);
+ containerCheckChain.addNext(new ClosedWithMismatchedReplicasHandler(this))
+ .addNext(ecReplicationCheckHandler);
start();
}
@@ -358,6 +366,10 @@
}
}
+ public void sendCloseContainerEvent(ContainerID containerID) {
+ eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+ }
+
/**
* Add an under replicated container back to the queue if it was unable to
* be processed. Its retry count will be incremented before it is re-queued,
@@ -419,7 +431,7 @@
return scmContext.getTermOfLeader();
}
- protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+ protected void processContainer(ContainerInfo containerInfo,
Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep,
ReplicationManagerReport report) throws ContainerNotFoundException {
@@ -427,77 +439,25 @@
ContainerID containerID = containerInfo.containerID();
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
containerID);
-
- if (containerInfo.getState() == HddsProtos.LifeCycleState.OPEN) {
- if (!isOpenContainerHealthy(containerInfo, replicas)) {
- report.incrementAndSample(
- HealthState.OPEN_UNHEALTHY, containerID);
- eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
- return new ContainerHealthResult.UnHealthyResult(containerInfo);
- }
- return new ContainerHealthResult.HealthyResult(containerInfo);
- }
-
- if (containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED) {
- List<ContainerReplica> unhealthyReplicas = replicas.stream()
- .filter(r -> !compareState(containerInfo.getState(), r.getState()))
- .collect(Collectors.toList());
-
- if (unhealthyReplicas.size() > 0) {
- handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
- }
- }
-
List<ContainerReplicaOp> pendingOps =
containerReplicaPendingOps.getPendingOps(containerID);
- ContainerHealthResult health = ecContainerHealthCheck.checkHealth(
- containerInfo, replicas, pendingOps, maintenanceRedundancy);
- // TODO - should the report have a HEALTHY state, rather than just bad
- // states? It would need to be added to legacy RM too.
- if (health.getHealthState()
- == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
- report.incrementAndSample(HealthState.UNDER_REPLICATED, containerID);
- ContainerHealthResult.UnderReplicatedHealthResult underHealth
- = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
- if (underHealth.isUnrecoverable()) {
- // TODO - do we need a new health state for unrecoverable EC?
- report.incrementAndSample(HealthState.MISSING, containerID);
- }
- if (!underHealth.isSufficientlyReplicatedAfterPending() &&
- !underHealth.isUnrecoverable()) {
- underRep.add(underHealth);
- }
- } else if (health.getHealthState()
- == ContainerHealthResult.HealthState.OVER_REPLICATED) {
- report.incrementAndSample(HealthState.OVER_REPLICATED, containerID);
- ContainerHealthResult.OverReplicatedHealthResult overHealth
- = ((ContainerHealthResult.OverReplicatedHealthResult) health);
- if (!overHealth.isSufficientlyReplicatedAfterPending()) {
- overRep.add(overHealth);
- }
- }
- return health;
- }
- /**
- * Handles unhealthy container.
- * A container is inconsistent if any of the replica state doesn't
- * match the container state. We have to take appropriate action
- * based on state of the replica.
- *
- * @param container ContainerInfo
- * @param unhealthyReplicas List of ContainerReplica
- */
- private void handleUnhealthyReplicas(final ContainerInfo container,
- List<ContainerReplica> unhealthyReplicas) {
- Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
- while (iterator.hasNext()) {
- final ContainerReplica replica = iterator.next();
- final ContainerReplicaProto.State state = replica.getState();
- if (state == State.OPEN || state == State.CLOSING) {
- sendCloseCommand(container, replica.getDatanodeDetails(), true);
- iterator.remove();
- }
+ ContainerCheckRequest checkRequest = new ContainerCheckRequest.Builder()
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(replicas)
+ .setMaintenanceRedundancy(maintenanceRedundancy)
+ .setReport(report)
+ .setPendingOps(pendingOps)
+ .setUnderRepQueue(underRep)
+ .setOverRepQueue(overRep)
+ .build();
+ // This will call the chain of container health handlers in turn which
+ // will issue commands as needed, update the report and perhaps add
+ // containers to the over and under replicated queue.
+ boolean handled = containerCheckChain.handleChain(checkRequest);
+ if (!handled) {
+ LOG.debug("Container {} had no actions after passing through the " +
+ "check chain", containerInfo.containerID());
}
}
@@ -510,7 +470,7 @@
* has to be closed
* @param force Should be set to true if we want to force close.
*/
- private void sendCloseCommand(final ContainerInfo container,
+ public void sendCloseContainerReplicaCommand(final ContainerInfo container,
final DatanodeDetails datanode, final boolean force) {
ContainerID containerID = container.containerID();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java
new file mode 100644
index 0000000..5fd83ef
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/AbstractCheck.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+
+/**
+ * Abstract base class for Container Health Checks to extend.
+ */
+public abstract class AbstractCheck implements HealthCheck {
+
+ private HealthCheck successor = null;
+
+ /**
+ * Handle the container and apply the given check to it. This may result in
+ * commands getting generated to correct container states, or nothing
+ * happening if the container is healthy.
+ * @param request ContainerCheckRequest object representing the container
+ * @return True if the request was handled or false if it was not and should
+ * be handled by the next handler in the chain.
+ */
+ public boolean handleChain(ContainerCheckRequest request) {
+ boolean result = handle(request);
+ if (!result && successor != null) {
+ return successor.handleChain(request);
+ }
+ return result;
+ }
+
+ /**
+ * Add a subsequent HealthCheck that will be tried only if the current check
+ * returns false. This allows handlers to be chained together, and each will
+ * be tried in turn until one succeeds.
+ * @param healthCheck The HealthCheck to add to the successor, which will be
+ * tried if the current check returns false.
+ * @return The passed in health check, so the next check in the chain can be
+ * easily added to the successor.
+ */
+ @Override
+ public HealthCheck addNext(HealthCheck healthCheck) {
+ successor = healthCheck;
+ return healthCheck;
+ }
+
+ /**
+ * Handle the container and apply the given check to it. This may result in
+ * commands getting generated to correct container states, or nothing
+ * happening if the container is healthy.
+ * @param request ContainerCheckRequest object representing the container
+ * @return True if the request was handled or false if it was not and should
+ * be handled by the next handler in the chain.
+ */
+ @Override
+ public abstract boolean handle(ContainerCheckRequest request);
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java
new file mode 100644
index 0000000..f5bb193
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ClosedWithMismatchedReplicasHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handler to process containers which are closed, but some replicas are still
+ * open or closing. This handler will send a command to the datanodes for each
+ * mis-matched replica to close it.
+ */
+public class ClosedWithMismatchedReplicasHandler extends AbstractCheck {
+
+ private ReplicationManager replicationManager;
+
+ public ClosedWithMismatchedReplicasHandler(
+ ReplicationManager replicationManager) {
+ this.replicationManager = replicationManager;
+ }
+
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ ContainerInfo containerInfo = request.getContainerInfo();
+ Set<ContainerReplica> replicas = request.getContainerReplicas();
+ if (containerInfo.getState() != HddsProtos.LifeCycleState.CLOSED) {
+ // Handler is only relevant for CLOSED containers.
+ return false;
+ }
+ List<ContainerReplica> unhealthyReplicas = replicas.stream()
+ .filter(r -> !ReplicationManager
+ .compareState(containerInfo.getState(), r.getState()))
+ .collect(Collectors.toList());
+
+ if (unhealthyReplicas.size() > 0) {
+ handleUnhealthyReplicas(containerInfo, unhealthyReplicas);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Handles unhealthy container.
+ * A container is inconsistent if any of the replica state doesn't
+ * match the container state. We have to take appropriate action
+ * based on state of the replica.
+ *
+ * @param container ContainerInfo
+ * @param unhealthyReplicas List of ContainerReplica
+ */
+ private void handleUnhealthyReplicas(final ContainerInfo container,
+ List<ContainerReplica> unhealthyReplicas) {
+ Iterator<ContainerReplica> iterator = unhealthyReplicas.iterator();
+ while (iterator.hasNext()) {
+ final ContainerReplica replica = iterator.next();
+ final ContainerReplicaProto.State state = replica.getState();
+ if (state == ContainerReplicaProto.State.OPEN
+ || state == ContainerReplicaProto.State.CLOSING) {
+ replicationManager.sendCloseContainerReplicaCommand(
+ container, replica.getDatanodeDetails(), true);
+ iterator.remove();
+ }
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
new file mode 100644
index 0000000..67936c8
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ECContainerReplicaCount;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+
+/**
+ * Container Check handler to check the under / over replication state for
+ * EC containers. If any containers are found to be over or under replicated
+ * they are added to the queue passed within the request object.
+ */
+public class ECReplicationCheckHandler extends AbstractCheck {
+
+ public ECReplicationCheckHandler() {
+ }
+
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ if (request.getContainerInfo().getReplicationType() != EC) {
+ // This handler is only for EC containers.
+ return false;
+ }
+ ReplicationManagerReport report = request.getReport();
+ ContainerInfo container = request.getContainerInfo();
+ ContainerID containerID = container.containerID();
+ ContainerHealthResult health = checkHealth(request);
+ if (health.getHealthState() == ContainerHealthResult.HealthState.HEALTHY) {
+ // If the container is healthy, there is nothing else to do in this
+ // handler so return as unhandled so any further handlers will be tried.
+ return false;
+ }
+ // TODO - should the report have a HEALTHY state, rather than just bad
+ // states? It would need to be added to legacy RM too.
+ if (health.getHealthState()
+ == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED, containerID);
+ ContainerHealthResult.UnderReplicatedHealthResult underHealth
+ = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+ if (underHealth.isUnrecoverable()) {
+ // TODO - do we need a new health state for unrecoverable EC?
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.MISSING, containerID);
+ }
+ // TODO - if it is unrecoverable, should we return false to other
+ // handlers can be tried?
+ if (!underHealth.isSufficientlyReplicatedAfterPending() &&
+ !underHealth.isUnrecoverable()) {
+ request.getUnderRepQueue().add(underHealth);
+ }
+ return true;
+ } else if (health.getHealthState()
+ == ContainerHealthResult.HealthState.OVER_REPLICATED) {
+ report.incrementAndSample(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED, containerID);
+ ContainerHealthResult.OverReplicatedHealthResult overHealth
+ = ((ContainerHealthResult.OverReplicatedHealthResult) health);
+ if (!overHealth.isSufficientlyReplicatedAfterPending()) {
+ request.getOverRepQueue().add(overHealth);
+ }
+ return true;
+ }
+ // Should not get here, but incase it does the container is not healthy,
+ // but is also not under or over replicated.
+ return false;
+ }
+
+ public ContainerHealthResult checkHealth(ContainerCheckRequest request) {
+ ContainerInfo container = request.getContainerInfo();
+ Set<ContainerReplica> replicas = request.getContainerReplicas();
+ List<ContainerReplicaOp> replicaPendingOps = request.getPendingOps();
+ ECContainerReplicaCount replicaCount =
+ new ECContainerReplicaCount(container, replicas, replicaPendingOps,
+ request.getMaintenanceRedundancy());
+
+ ECReplicationConfig repConfig =
+ (ECReplicationConfig) container.getReplicationConfig();
+
+ if (!replicaCount.isSufficientlyReplicated(false)) {
+ List<Integer> missingIndexes = replicaCount.unavailableIndexes(false);
+ int remainingRedundancy = repConfig.getParity();
+ boolean dueToDecommission = true;
+ if (missingIndexes.size() > 0) {
+ // The container has reduced redundancy and will need reconstructed
+ // via an EC reconstruction command. Note that it may also have some
+ // replicas in decommission / maintenance states, but as the under
+ // replication is not caused only by decommission, we say it is not
+ // due to decommission/
+ dueToDecommission = false;
+ remainingRedundancy = repConfig.getParity() - missingIndexes.size();
+ }
+ return new ContainerHealthResult.UnderReplicatedHealthResult(
+ container, remainingRedundancy, dueToDecommission,
+ replicaCount.isSufficientlyReplicated(true),
+ replicaCount.isUnrecoverable());
+ }
+
+ if (replicaCount.isOverReplicated(false)) {
+ List<Integer> overRepIndexes = replicaCount.overReplicatedIndexes(false);
+ return new ContainerHealthResult
+ .OverReplicatedHealthResult(container, overRepIndexes.size(),
+ !replicaCount.isOverReplicated(true));
+ }
+ // No issues detected, so return healthy.
+ return new ContainerHealthResult.HealthyResult(container);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java
new file mode 100644
index 0000000..a95c0d3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/HealthCheck.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+
+/**
+ * Interface used by Container Health Check Handlers.
+ */
+public interface HealthCheck {
+
+ /**
+ * Handle the container and apply the given check to it. This may result in
+ * commands getting generated to correct container states, or nothing
+ * happening if the container is healthy.
+ * @param request ContainerCheckRequest object representing the container
+ * @return True if the request was handled or false if it was not and should
+ * be handled by the next handler in the chain.
+ */
+ boolean handle(ContainerCheckRequest request);
+
+ /**
+ * Starting from this HealthCheck, call the handle method. If it returns
+ * false, indicating the request was not handled, then forward the request to
+ * the next handler in the chain via its handleChain method. Repeating until
+ * the request is handled, or there are no further handlers to try.
+ * @param request
+ * @return True if the request was handled or false if not handler handled it.
+ */
+ boolean handleChain(ContainerCheckRequest request);
+
+ /**
+ * Add a subsequent HealthCheck that will be tried only if the current check
+ * returns false. This allows handlers to be chained together, and each will
+ * be tried in turn until one succeeds.
+ * @param handler
+ * @return
+ */
+ HealthCheck addNext(HealthCheck handler);
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java
new file mode 100644
index 0000000..666b1a2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/OpenContainerHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.compareState;
+
+/**
+ * Class used in Replication Manager to check open container health for both
+ * EC and Ratis containers. Healthy open containers are skipped, while
+ * containers where some replicas are not in the same state as the container
+ * will be closed.
+ */
+public class OpenContainerHandler extends AbstractCheck {
+
+ private ReplicationManager replicationManager;
+
+ public OpenContainerHandler(ReplicationManager replicationManager) {
+ this.replicationManager = replicationManager;
+ }
+
+ @Override
+ public boolean handle(ContainerCheckRequest request) {
+ ContainerInfo containerInfo = request.getContainerInfo();
+ if (containerInfo.getState() == HddsProtos.LifeCycleState.OPEN) {
+ if (!isOpenContainerHealthy(
+ containerInfo, request.getContainerReplicas())) {
+ // This is an unhealthy open container, so we need to trigger the
+ // close process on it.
+ request.getReport().incrementAndSample(
+ ReplicationManagerReport.HealthState.OPEN_UNHEALTHY,
+ containerInfo.containerID());
+ replicationManager.sendCloseContainerEvent(containerInfo.containerID());
+ return true;
+ }
+ // For open containers we do not want to do any further processing in RM
+ // so return true to stop the command chain.
+ return true;
+ }
+ // The container is not open, so we return false to let the next handler in
+ // the chain process it.
+ return false;
+ }
+
+ /**
+ * An open container is healthy if all its replicas are in the same state as
+ * the container.
+ * @param container The container to check
+ * @param replicas The replicas belonging to the container
+ * @return True if the container is healthy, false otherwise
+ */
+ private boolean isOpenContainerHealthy(
+ ContainerInfo container, Set< ContainerReplica > replicas) {
+ HddsProtos.LifeCycleState state = container.getState();
+ return replicas.stream()
+ .allMatch(r -> compareState(state, r.getState()));
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java
new file mode 100644
index 0000000..cf8396d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+/**
+ * HDDS Container replicaton check classes.
+ */
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
deleted file mode 100644
index d8060da..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerHealthCheck.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
-import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
-import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
-import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
-import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
-
-/**
- * Tests for the ECContainerHealthCheck class.
- */
-public class TestECContainerHealthCheck {
-
- private ECContainerHealthCheck healthCheck;
- private ECReplicationConfig repConfig;
-
- @Before
- public void setup() {
- healthCheck = new ECContainerHealthCheck();
- repConfig = new ECReplicationConfig(3, 2);
- }
-
- @Test
- public void testHealthyContainerIsHealthy() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas
- = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
- ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), 2);
- Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
- }
-
- @Test
- public void testUnderReplicatedContainerIsUnderReplicated() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas
- = createReplicas(container.containerID(), 1, 2, 4, 5);
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(1, result.getRemainingRedundancy());
- Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
- Assert.assertFalse(result.underReplicatedDueToDecommission());
- }
-
- @Test
- public void testUnderReplicatedContainerFixedWithPending() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas
- = createReplicas(container.containerID(), 1, 2, 4, 5);
- List<ContainerReplicaOp> pending = new ArrayList<>();
- pending.add(ContainerReplicaOp.create(
- ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending, 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(1, result.getRemainingRedundancy());
- Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
- Assert.assertFalse(result.underReplicatedDueToDecommission());
- }
-
- @Test
- public void testUnderReplicatedDueToDecommission() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
- Pair.of(DECOMMISSIONED, 5));
-
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, Collections.emptyList(),
- 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(2, result.getRemainingRedundancy());
- Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
- Assert.assertTrue(result.underReplicatedDueToDecommission());
- }
-
- @Test
- public void testUnderReplicatedDueToDecommissionFixedWithPending() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
- Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
- List<ContainerReplicaOp> pending = new ArrayList<>();
- pending.add(ContainerReplicaOp.create(
- ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
-
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending, 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(2, result.getRemainingRedundancy());
- Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
- Assert.assertTrue(result.underReplicatedDueToDecommission());
- }
-
- @Test
- public void testUnderReplicatedDueToDecommissionAndMissing() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
- List<ContainerReplicaOp> pending = new ArrayList<>();
- pending.add(ContainerReplicaOp.create(
- ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
-
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending, 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(1, result.getRemainingRedundancy());
- Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
- Assert.assertFalse(result.underReplicatedDueToDecommission());
- }
-
- @Test
- public void testUnderReplicatedAndUnrecoverable() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
- UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(-1, result.getRemainingRedundancy());
- Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
- Assert.assertFalse(result.underReplicatedDueToDecommission());
- Assert.assertTrue(result.isUnrecoverable());
- }
-
- @Test
- public void testOverReplicatedContainer() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
- Pair.of(IN_SERVICE, 5),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
- List<ContainerReplicaOp> pending = new ArrayList<>();
- pending.add(ContainerReplicaOp.create(
- DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
- pending.add(ContainerReplicaOp.create(
- DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
-
- OverReplicatedHealthResult result = (OverReplicatedHealthResult)
- healthCheck.checkHealth(container, replicas, pending, 2);
- Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
- Assert.assertEquals(2, result.getExcessRedundancy());
- Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
- }
-
- @Test
- public void testOverReplicatedContainerDueToMaintenance() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
- Pair.of(IN_SERVICE, 5),
- Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
-
- ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), 2);
- Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
- }
-
- @Test
- public void testOverAndUnderReplicated() {
- ContainerInfo container = createContainerInfo(repConfig);
- Set<ContainerReplica> replicas = createReplicas(container.containerID(),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
- Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
- Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
-
- ContainerHealthResult result = healthCheck.checkHealth(container, replicas,
- Collections.emptyList(), 2);
- Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
- Assert.assertEquals(1,
- ((UnderReplicatedHealthResult)result).getRemainingRedundancy());
- }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
index 42db8ab..f96790a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECOverReplicationHandler.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -61,6 +62,7 @@
private NodeManager nodeManager;
private OzoneConfiguration conf;
private PlacementPolicy policy;
+ private ECReplicationCheckHandler replicationCheck;
@BeforeEach
public void setup() {
@@ -80,6 +82,7 @@
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
+ replicationCheck = new ECReplicationCheckHandler();
}
@Test
@@ -126,7 +129,7 @@
Set<ContainerReplica> availableReplicas,
Map<Integer, Integer> index2excessNum) {
ECOverReplicationHandler ecORH =
- new ECOverReplicationHandler(policy, nodeManager);
+ new ECOverReplicationHandler(replicationCheck, policy, nodeManager);
ContainerHealthResult.OverReplicatedHealthResult result =
Mockito.mock(ContainerHealthResult.OverReplicatedHealthResult.class);
Mockito.when(result.getContainerInfo()).thenReturn(container);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 87d4144..7c1c442 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.replication.health.ECReplicationCheckHandler;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
@@ -68,6 +69,7 @@
private PlacementPolicy policy;
private static final int DATA = 3;
private static final int PARITY = 2;
+ private ECReplicationCheckHandler replicationCheck;
@BeforeEach
public void setup() {
@@ -87,6 +89,7 @@
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
+ replicationCheck = new ECReplicationCheckHandler();
}
@Test
@@ -260,7 +263,8 @@
int decomIndexes, int maintenanceIndexes,
PlacementPolicy placementPolicy) throws IOException {
ECUnderReplicationHandler ecURH =
- new ECUnderReplicationHandler(placementPolicy, conf, nodeManager);
+ new ECUnderReplicationHandler(replicationCheck,
+ placementPolicy, conf, nodeManager);
ContainerHealthResult.UnderReplicatedHealthResult result =
Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
Mockito.when(result.isUnrecoverable()).thenReturn(false);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 82b49d5..6138176 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -139,10 +139,8 @@
HddsProtos.LifeCycleState.OPEN);
// It is under replicated, but as its still open it is seen as healthy.
addReplicas(container, ContainerReplicaProto.State.OPEN, 1, 2, 3, 4);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
- result.getHealthState());
Assert.assertEquals(0, underRep.size());
Assert.assertEquals(0, overRep.size());
}
@@ -154,10 +152,8 @@
HddsProtos.LifeCycleState.OPEN);
// Container is open but replicas are closed, so it is open but unhealthy.
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.UNHEALTHY,
- result.getHealthState());
Mockito.verify(eventPublisher, Mockito.times(1))
.fireEvent(SCMEvents.CLOSE_CONTAINER, container.containerID());
Assert.assertEquals(1, repReport.getStat(
@@ -172,10 +168,8 @@
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.HEALTHY,
- result.getHealthState());
Assert.assertEquals(0, underRep.size());
Assert.assertEquals(0, overRep.size());
}
@@ -186,10 +180,8 @@
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
- result.getHealthState());
Assert.assertEquals(1, underRep.size());
Assert.assertEquals(0, overRep.size());
Assert.assertEquals(1, repReport.getStat(
@@ -205,18 +197,14 @@
containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
MockDatanodeDetails.randomDatanodeDetails(), 5);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
- result.getHealthState());
// As the pending replication fixes the under replication, nothing is added
// to the under replication list.
Assert.assertEquals(0, underRep.size());
Assert.assertEquals(0, overRep.size());
- Assert.assertTrue(((ContainerHealthResult.UnderReplicatedHealthResult)
- result).isSufficientlyReplicatedAfterPending());
// As the container is still under replicated, as the pending have not
- // completed yet, the container is still marked as under-replciated in the
+ // completed yet, the container is still marked as under-replicated in the
// report.
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -229,10 +217,8 @@
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
- result.getHealthState());
// If it is unrecoverable, there is no point in putting it into the under
// replication list. It will be checked again on the next RM run.
Assert.assertEquals(0, underRep.size());
@@ -250,13 +236,11 @@
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 5, 5);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
// If it is both under and over replicated, we set it to the most important
// state, which is under-replicated. When that is fixed, over replication
// will be handled.
- Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED,
- result.getHealthState());
Assert.assertEquals(1, underRep.size());
Assert.assertEquals(0, overRep.size());
Assert.assertEquals(1, repReport.getStat(
@@ -271,10 +255,8 @@
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED,
1, 2, 3, 4, 5, 5);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
- result.getHealthState());
Assert.assertEquals(0, underRep.size());
Assert.assertEquals(1, overRep.size());
Assert.assertEquals(1, repReport.getStat(
@@ -290,10 +272,8 @@
1, 2, 3, 4, 5, 5);
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
MockDatanodeDetails.randomDatanodeDetails(), 5);
- ContainerHealthResult result = replicationManager.processContainer(
+ replicationManager.processContainer(
container, underRep, overRep, repReport);
- Assert.assertEquals(ContainerHealthResult.HealthState.OVER_REPLICATED,
- result.getHealthState());
Assert.assertEquals(0, underRep.size());
// If the pending replication fixes the over-replication, nothing is added
// to the over replication list.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java
new file mode 100644
index 0000000..5a1e399
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestClosedWithMismatchedReplicasHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests for the ClosedWithMismatchedReplicasHandler.
+ */
+public class TestClosedWithMismatchedReplicasHandler {
+
+ private ReplicationManager replicationManager;
+ private ClosedWithMismatchedReplicasHandler handler;
+ private ECReplicationConfig replicationConfig;
+
+ @BeforeEach
+ public void setup() {
+ replicationConfig = new ECReplicationConfig(3, 2);
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ handler = new ClosedWithMismatchedReplicasHandler(replicationManager);
+ }
+
+ @Test
+ public void testOpenContainerReturnsFalse() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, OPEN);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(Collections.emptySet())
+ .build();
+
+ Assertions.assertFalse(handler.handle(request));
+ Mockito.verify(replicationManager, times(0))
+ .sendCloseContainerReplicaCommand(
+ any(), any(), anyBoolean());
+ }
+
+ @Test
+ public void testClosedHealthyContainerReturnsFalse() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, CLOSED);
+ Set<ContainerReplica> containerReplicas =
+ ReplicationTestUtil.createReplicas(containerInfo.containerID(),
+ ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(containerReplicas)
+ .build();
+ Assertions.assertFalse(handler.handle(request));
+
+ Mockito.verify(replicationManager, times(0))
+ .sendCloseContainerReplicaCommand(
+ any(), any(), anyBoolean());
+ }
+
+ @Test
+ public void testClosedMissMatchContainerReturnsTrue() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, CLOSED);
+ Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+ .createReplicas(containerInfo.containerID(),
+ ContainerReplicaProto.State.CLOSED, 1, 2);
+ ContainerReplica mismatch1 = ReplicationTestUtil.createContainerReplica(
+ containerInfo.containerID(), 4,
+ HddsProtos.NodeOperationalState.IN_SERVICE,
+ ContainerReplicaProto.State.OPEN);
+ ContainerReplica mismatch2 = ReplicationTestUtil.createContainerReplica(
+ containerInfo.containerID(), 5,
+ HddsProtos.NodeOperationalState.IN_SERVICE,
+ ContainerReplicaProto.State.CLOSING);
+ ContainerReplica mismatch3 = ReplicationTestUtil.createContainerReplica(
+ containerInfo.containerID(), 3,
+ HddsProtos.NodeOperationalState.IN_SERVICE,
+ ContainerReplicaProto.State.UNHEALTHY);
+ containerReplicas.add(mismatch1);
+ containerReplicas.add(mismatch2);
+ containerReplicas.add(mismatch3);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(containerReplicas)
+ .build();
+ Assertions.assertTrue(handler.handle(request));
+
+ Mockito.verify(replicationManager, times(1))
+ .sendCloseContainerReplicaCommand(
+ containerInfo, mismatch1.getDatanodeDetails(), true);
+ Mockito.verify(replicationManager, times(1))
+ .sendCloseContainerReplicaCommand(
+ containerInfo, mismatch2.getDatanodeDetails(), true);
+ Mockito.verify(replicationManager, times(0))
+ .sendCloseContainerReplicaCommand(
+ containerInfo, mismatch3.getDatanodeDetails(), true);
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
new file mode 100644
index 0000000..6222c23
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
+
+import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
+import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
+
+/**
+ * Tests for the ECContainerHealthCheck class.
+ */
+public class TestECReplicationCheckHandler {
+
+ private ECReplicationCheckHandler healthCheck;
+ private ECReplicationConfig repConfig;
+ private Queue<UnderReplicatedHealthResult> underRepQueue;
+ private Queue<OverReplicatedHealthResult> overRepQueue;
+ private int maintenanceRedundancy = 2;
+ private ContainerCheckRequest.Builder requestBuilder;
+ private ReplicationManagerReport report;
+
+
+ @Before
+ public void setup() {
+ healthCheck = new ECReplicationCheckHandler();
+ repConfig = new ECReplicationConfig(3, 2);
+ underRepQueue = new LinkedList<>();
+ overRepQueue = new LinkedList<>();
+ report = new ReplicationManagerReport();
+ requestBuilder = new ContainerCheckRequest.Builder()
+ .setOverRepQueue(overRepQueue)
+ .setUnderRepQueue(underRepQueue)
+ .setMaintenanceRedundancy(maintenanceRedundancy)
+ .setPendingOps(Collections.emptyList())
+ .setReport(report);
+ }
+
+ @Test
+ public void testHealthyContainerIsHealthy() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ ContainerHealthResult result = healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+ Assert.assertFalse(healthCheck.handle(request));
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ }
+
+ @Test
+ public void testUnderReplicatedContainerIsUnderReplicated() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 1, 2, 4, 5);
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(1, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedContainerFixedWithPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas
+ = createReplicas(container.containerID(), 1, 2, 4, 5);
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ // Fixed with pending so nothing added to the queue
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ // Still under replicated until the pending complete
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommission() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
+ Pair.of(DECOMMISSIONED, 5));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(1, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ // Still under replicated until the pending complete
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommissionFixedWithPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(DECOMMISSIONING, 4),
+ Pair.of(IN_SERVICE, 4), Pair.of(DECOMMISSIONED, 5));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 5));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getRemainingRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertTrue(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ // Fixed with pending so nothing added to the queue
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ // Still under replicated until the pending complete
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedDueToDecommissionAndMissingReplica() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONED, 5));
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ ADD, MockDatanodeDetails.randomDatanodeDetails(), 3));
+
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(1, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ }
+
+ @Test
+ public void testUnderReplicatedAndUnrecoverable() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ UnderReplicatedHealthResult result = (UnderReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(-1, result.getRemainingRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+ Assert.assertFalse(result.underReplicatedDueToDecommission());
+ Assert.assertTrue(result.isUnrecoverable());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ // Unrecoverable so not added to the queue
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.MISSING));
+ }
+
+ @Test
+ public void testOverReplicatedContainer() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+
+ OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getExcessRedundancy());
+ Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(1, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverReplicatedContainerFixedByPending() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+
+ List<ContainerReplicaOp> pending = new ArrayList<>();
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 1));
+ pending.add(ContainerReplicaOp.create(
+ DELETE, MockDatanodeDetails.randomDatanodeDetails(), 2));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .setPendingOps(pending)
+ .build();
+
+ OverReplicatedHealthResult result = (OverReplicatedHealthResult)
+ healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.OVER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(2, result.getExcessRedundancy());
+ Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
+
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverReplicatedContainerDueToMaintenance() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 5),
+ Pair.of(IN_MAINTENANCE, 1), Pair.of(IN_MAINTENANCE, 2));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+ ContainerHealthResult result = healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
+
+ // As it is maintenance replicas causing the over replication, the container
+ // is not really over-replicated.
+ Assert.assertFalse(healthCheck.handle(request));
+ Assert.assertEquals(0, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+ @Test
+ public void testOverAndUnderReplicated() {
+ ContainerInfo container = createContainerInfo(repConfig);
+ Set<ContainerReplica> replicas = createReplicas(container.containerID(),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
+ Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
+ Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2));
+ ContainerCheckRequest request = requestBuilder
+ .setContainerReplicas(replicas)
+ .setContainerInfo(container)
+ .build();
+ ContainerHealthResult result = healthCheck.checkHealth(request);
+ Assert.assertEquals(HealthState.UNDER_REPLICATED, result.getHealthState());
+ Assert.assertEquals(1,
+ ((UnderReplicatedHealthResult)result).getRemainingRedundancy());
+
+ // Under-replicated takes precedence and the over-replication is ignored
+ // for now.
+ Assert.assertTrue(healthCheck.handle(request));
+ Assert.assertEquals(1, underRepQueue.size());
+ Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assert.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.OVER_REPLICATED));
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java
new file mode 100644
index 0000000..1acb4ec
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestOpenContainerHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests for the OpenContainerHandler class.
+ */
+public class TestOpenContainerHandler {
+
+ private ReplicationManager replicationManager;
+ private OpenContainerHandler openContainerHandler;
+ private ECReplicationConfig replicationConfig;
+
+ @BeforeEach
+ public void setup() {
+ replicationConfig = new ECReplicationConfig(3, 2);
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ openContainerHandler = new OpenContainerHandler(replicationManager);
+ }
+
+ @Test
+ public void testClosedContainerReturnsTrue() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, CLOSED);
+ Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+ .createReplicas(containerInfo.containerID(),
+ ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(containerReplicas)
+ .build();
+ Assertions.assertFalse(openContainerHandler.handle(request));
+ Mockito.verify(replicationManager, times(0))
+ .sendCloseContainerEvent(Mockito.any());
+ }
+
+ @Test
+ public void testOpenContainerReturnsFalse() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, OPEN);
+ Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+ .createReplicas(containerInfo.containerID(),
+ ContainerReplicaProto.State.OPEN, 1, 2, 3, 4, 5);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(containerReplicas)
+ .build();
+ Assertions.assertTrue(openContainerHandler.handle(request));
+ Mockito.verify(replicationManager, times(0))
+ .sendCloseContainerEvent(Mockito.any());
+ }
+
+ @Test
+ public void testOpenUnhealthyContainerIsClosed() {
+ ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+ replicationConfig, 1, OPEN);
+ Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+ .createReplicas(containerInfo.containerID(),
+ ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
+ ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+ .setPendingOps(Collections.EMPTY_LIST)
+ .setReport(new ReplicationManagerReport())
+ .setContainerInfo(containerInfo)
+ .setContainerReplicas(containerReplicas)
+ .build();
+ Assertions.assertTrue(openContainerHandler.handle(request));
+ Mockito.verify(replicationManager, times(1))
+ .sendCloseContainerEvent(containerInfo.containerID());
+ }
+
+}