blob: 66863ce55366a8f1c4f61b5dcfe58ef6e1f801f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.primitives.Longs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.container.SimpleMockNodeManager;
import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager.MoveResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_NUM_KEYS_DEFAULT;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.CONTAINER_USED_BYTES_DEFAULT;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.mockito.Mockito.when;
/**
* Test cases to verify the functionality of ReplicationManager.
*/
public class TestLegacyReplicationManager {
private ReplicationManager replicationManager;
private ContainerStateManager containerStateManager;
private PlacementPolicy containerPlacementPolicy;
private EventQueue eventQueue;
private DatanodeCommandHandler datanodeCommandHandler;
private SimpleMockNodeManager nodeManager;
private ContainerManager containerManager;
private GenericTestUtils.LogCapturer scmLogs;
private SCMServiceManager serviceManager;
private TestClock clock;
private File testDir;
private DBStore dbStore;
private PipelineManager pipelineManager;
private SCMHAManager scmhaManager;
private ContainerReplicaPendingOps containerReplicaPendingOps;
int getInflightCount(InflightType type) {
return replicationManager.getLegacyReplicationManager()
.getInflightCount(type);
}
@BeforeEach
public void setup()
throws IOException, InterruptedException,
NodeNotFoundException, InvalidStateTransitionException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
scmLogs = GenericTestUtils.LogCapturer.
captureLogs(LegacyReplicationManager.LOG);
containerManager = Mockito.mock(ContainerManager.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
scmhaManager = SCMHAManagerStub.getInstance(true);
testDir = GenericTestUtils.getTestDir(
TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
dbStore = DBStoreBuilder.createDBStore(
conf, new SCMDBDefinition());
pipelineManager = Mockito.mock(PipelineManager.class);
when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
.thenReturn(true);
containerStateManager = ContainerStateManagerImpl.newBuilder()
.setConfiguration(conf)
.setPipelineManager(pipelineManager)
.setRatisServer(scmhaManager.getRatisServer())
.setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
.setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
.build();
serviceManager = new SCMServiceManager();
datanodeCommandHandler = new DatanodeCommandHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
Mockito.when(containerManager.getContainers())
.thenAnswer(invocation -> {
Set<ContainerID> ids = containerStateManager.getContainerIDs();
List<ContainerInfo> containers = new ArrayList<>();
for (ContainerID id : ids) {
containers.add(containerStateManager.getContainer(
id));
}
return containers;
});
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer(((ContainerID)invocation
.getArguments()[0])));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas(((ContainerID)invocation
.getArguments()[0])));
containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
Mockito.when(containerPlacementPolicy.chooseDatanodes(
Mockito.any(), Mockito.any(), Mockito.anyInt(),
Mockito.anyLong(), Mockito.anyLong()))
.thenAnswer(invocation -> {
int count = (int) invocation.getArguments()[2];
return IntStream.range(0, count)
.mapToObj(i -> randomDatanodeDetails())
.collect(Collectors.toList());
});
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.any(),
Mockito.anyInt()
)).thenAnswer(invocation ->
new ContainerPlacementStatusDefault(2, 2, 3));
clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
containerReplicaPendingOps = new ContainerReplicaPendingOps(conf, clock);
createReplicationManager(new ReplicationManagerConfiguration());
}
void createReplicationManager(int replicationLimit, int deletionLimit)
throws Exception {
replicationManager.stop();
dbStore.close();
final LegacyReplicationManager.ReplicationManagerConfiguration conf
= new LegacyReplicationManager.ReplicationManagerConfiguration();
conf.setContainerInflightReplicationLimit(replicationLimit);
conf.setContainerInflightDeletionLimit(deletionLimit);
createReplicationManager(conf);
}
void createReplicationManager(
LegacyReplicationManager.ReplicationManagerConfiguration conf)
throws Exception {
createReplicationManager(null, conf);
}
private void createReplicationManager(ReplicationManagerConfiguration rmConf)
throws InterruptedException, IOException {
createReplicationManager(rmConf, null);
}
void createReplicationManager(ReplicationManagerConfiguration rmConf,
LegacyReplicationManager.ReplicationManagerConfiguration lrmConf)
throws InterruptedException, IOException {
OzoneConfiguration config = new OzoneConfiguration();
testDir = GenericTestUtils
.getTestDir(TestContainerManagerImpl.class.getSimpleName());
config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
testDir.getAbsolutePath());
config.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
Optional.ofNullable(rmConf).ifPresent(config::setFromObject);
Optional.ofNullable(lrmConf).ifPresent(config::setFromObject);
SCMHAManager scmHAManager = SCMHAManagerStub
.getInstance(true, new SCMDBTransactionBufferImpl());
dbStore = DBStoreBuilder.createDBStore(
config, new SCMDBDefinition());
LegacyReplicationManager legacyRM = new LegacyReplicationManager(
config, containerManager, containerPlacementPolicy, eventQueue,
SCMContext.emptyContext(), nodeManager, scmHAManager, clock,
SCMDBDefinition.MOVE.getTable(dbStore));
replicationManager = new ReplicationManager(
config,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
nodeManager,
clock,
legacyRM,
containerReplicaPendingOps);
serviceManager.register(replicationManager);
serviceManager.notifyStatusChanged();
scmLogs.clearOutput();
Thread.sleep(100L);
}
@AfterEach
public void tearDown() throws Exception {
containerStateManager.close();
if (dbStore != null) {
dbStore.close();
}
FileUtil.fullyDelete(testDir);
}
/**
* Checks if restarting of replication manager works.
*/
@Test
public void testReplicationManagerRestart() throws InterruptedException {
Assertions.assertTrue(replicationManager.isRunning());
replicationManager.stop();
// Stop is a non-blocking call, it might take sometime for the
// ReplicationManager to shutdown
Thread.sleep(500);
Assertions.assertFalse(replicationManager.isRunning());
replicationManager.start();
Assertions.assertTrue(replicationManager.isRunning());
}
/**
* Open containers are not handled by ReplicationManager.
* This test-case makes sure that ReplicationManages doesn't take
* any action on OPEN containers.
*/
@Test
public void testOpenContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
containerStateManager.addContainer(container.getProtobuf());
replicationManager.processAll();
eventQueue.processAll(1000);
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
}
/**
* If the container is in CLOSING state we resend close container command
* to all the datanodes.
*/
@Test
public void testClosingContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
containerStateManager.addContainer(container.getProtobuf());
// Two replicas in CLOSING state
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
randomDatanodeDetails(),
randomDatanodeDetails());
// One replica in OPEN state
final DatanodeDetails datanode = randomDatanodeDetails();
replicas.addAll(getReplicas(id, State.OPEN, datanode));
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
// Update the OPEN to CLOSING
for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSING));
}
/**
* The container is QUASI_CLOSED but two of the replica is still in
* open state. ReplicationManager should resend close command to those
* datanodes.
*/
@Test
public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.OPEN, 1000L, originNodeId, randomDatanodeDetails());
final DatanodeDetails datanodeDetails = randomDatanodeDetails();
final ContainerReplica replicaThree = getReplicas(
id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
// Two of the replicas are in OPEN state
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaTwo.getDatanodeDetails()));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.closeContainerCommand,
replicaThree.getDatanodeDetails()));
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
}
/**
* When the container is in QUASI_CLOSED state and all the replicas are
* also in QUASI_CLOSED state and doesn't have a quorum to force close
* the container, ReplicationManager will not do anything.
*/
@Test
public void testHealthyQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
}
/**
* When a container is QUASI_CLOSED and we don't have quorum to force close
* the container, the container should have all the replicas in QUASI_CLOSED
* state, else ReplicationManager will take action.
*
* In this test case we make one of the replica unhealthy, replication manager
* will send delete container command to the datanode which has the unhealthy
* replica.
*/
@Test
public void testQuasiClosedContainerWithUnhealthyReplica()
throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
// Make the first replica unhealthy
final ContainerReplica unhealthyReplica = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId,
replicaOne.getDatanodeDetails());
containerStateManager.updateContainerReplica(
id, unhealthyReplica);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
// Now we will delete the unhealthy replica from in-memory.
containerStateManager.removeContainerReplica(id, replicaOne);
final long currentBytesToReplicate = replicationManager.getMetrics()
.getNumReplicationBytesTotal();
// The container is under replicated as unhealthy replica is removed
replicationManager.processAll();
eventQueue.processAll(1000);
// We should get replicate command
Assertions.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + 1,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100L,
replicationManager.getMetrics().getNumReplicationBytesTotal());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
// We should have one under replicated and one quasi_closed_stuck
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
// Now we add the missing replica back
DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
.getFirstDatanode(InflightType.REPLICATION, id);
final ContainerReplica replicatedReplicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
containerStateManager.updateContainerReplica(
id, replicatedReplicaOne);
final long currentReplicationCommandCompleted = replicationManager
.getMetrics().getNumReplicationCmdsCompleted();
final long currentBytesCompleted = replicationManager.getMetrics()
.getNumReplicationBytesCompleted();
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightReplication());
Assertions.assertEquals(currentReplicationCommandCompleted + 1,
replicationManager.getMetrics().getNumReplicationCmdsCompleted());
Assertions.assertEquals(currentBytesCompleted + 100L,
replicationManager.getMetrics().getNumReplicationBytesCompleted());
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas.
*/
@Test
public void testOverReplicatedQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(101);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
// Now we remove the replica according to inflight
DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
.getFirstDatanode(InflightType.DELETION, id);
if (targetDn.equals(replicaOne.getDatanodeDetails())) {
containerStateManager.removeContainerReplica(
id, replicaOne);
} else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
containerStateManager.removeContainerReplica(
id, replicaTwo);
} else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
containerStateManager.removeContainerReplica(
id, replicaThree);
} else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
containerStateManager.removeContainerReplica(
id, replicaFour);
}
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
.getNumDeletionCmdsCompleted();
final long deleteBytesCompleted =
replicationManager.getMetrics().getNumDeletionBytesCompleted();
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
replicationManager.getMetrics().getNumDeletionCmdsCompleted());
Assertions.assertEquals(deleteBytesCompleted + 101,
replicationManager.getMetrics().getNumDeletionBytesCompleted());
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
/**
* When a QUASI_CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas. While choosing the replica for deletion
* ReplicationManager should prioritize unhealthy replica over QUASI_CLOSED
* replica.
*/
@Test
public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaOne.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
.getNumDeletionCmdsCompleted();
// Now we remove the replica to simulate deletion complete
containerStateManager.removeContainerReplica(id, replicaOne);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
replicationManager.getMetrics().getNumDeletionCmdsCompleted());
Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
/**
* ReplicationManager should replicate an QUASI_CLOSED replica if it is
* under replicated.
*/
@Test
public void testUnderReplicatedQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final long currentBytesToReplicate = replicationManager.getMetrics()
.getNumReplicationBytesTotal();
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + 1,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100,
replicationManager.getMetrics().getNumReplicationBytesTotal());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
final long currentReplicateCommandCompleted = replicationManager
.getMetrics().getNumReplicationCmdsCompleted();
final long currentReplicateBytesCompleted = replicationManager
.getMetrics().getNumReplicationBytesCompleted();
// Now we add the replicated new replica
DatanodeDetails targetDn = replicationManager.getLegacyReplicationManager()
.getFirstDatanode(InflightType.REPLICATION, id);
final ContainerReplica replicatedReplicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
containerStateManager.updateContainerReplica(
id, replicatedReplicaThree);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentReplicateCommandCompleted + 1,
replicationManager.getMetrics().getNumReplicationCmdsCompleted());
Assertions.assertEquals(currentReplicateBytesCompleted + 100,
replicationManager.getMetrics().getNumReplicationBytesCompleted());
Assertions.assertEquals(0, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightReplication());
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
/**
* When a QUASI_CLOSED container is under replicated, ReplicationManager
* should re-replicate it. If there are any unhealthy replica, it has to
* be deleted.
*
* In this test case, the container is QUASI_CLOSED and is under replicated
* and also has an unhealthy replica.
*
* In the first iteration of ReplicationManager, it should re-replicate
* the container so that it has enough replicas.
*
* In the second iteration, ReplicationManager should delete the unhealthy
* replica.
*
* In the third iteration, ReplicationManager will re-replicate as the
* container has again become under replicated after the unhealthy
* replica has been deleted.
*
*/
@Test
public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
throws IOException, InterruptedException,
TimeoutException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(99);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
final long currentBytesToDelete = replicationManager.getMetrics()
.getNumDeletionBytesTotal();
replicationManager.processAll();
GenericTestUtils.waitFor(
() -> (currentReplicateCommandCount + 1) == datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand),
50, 5000);
Optional<CommandForDatanode> replicateCommand = datanodeCommandHandler
.getReceivedCommands().stream()
.filter(c -> c.getCommand().getType()
.equals(SCMCommandProto.Type.replicateContainerCommand))
.findFirst();
Assertions.assertTrue(replicateCommand.isPresent());
DatanodeDetails newNode = createDatanodeDetails(
replicateCommand.get().getDatanodeId());
ContainerReplica newReplica = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
containerStateManager.updateContainerReplica(id, newReplica);
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNHEALTHY));
/*
* We have report the replica to SCM, in the next ReplicationManager
* iteration it should delete the unhealthy replica.
*/
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
// ReplicaTwo should be deleted, that is the unhealthy one
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaTwo.getDatanodeDetails()));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(currentBytesToDelete + 99,
replicationManager.getMetrics().getNumDeletionBytesTotal());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
containerStateManager.removeContainerReplica(id, replicaTwo);
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
.getNumDeletionCmdsCompleted();
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNHEALTHY));
/*
* We have now removed unhealthy replica, next iteration of
* ReplicationManager should re-replicate the container as it
* is under replicated now
*/
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(0, replicationManager.getMetrics()
.getInflightDeletion());
Assertions.assertEquals(currentDeleteCommandCompleted + 1,
replicationManager.getMetrics().getNumDeletionCmdsCompleted());
Assertions.assertEquals(currentReplicateCommandCount + 2,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + 2,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.UNHEALTHY));
}
/**
* When a container is QUASI_CLOSED and it has >50% of its replica
* in QUASI_CLOSED state with unique origin node id,
* ReplicationManager should force close the replica(s) with
* highest BCSID.
*/
@Test
public void testQuasiClosedToClosed() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
// All the replicas have same BCSID, so all of them will be closed.
Assertions.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand));
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED));
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
}
/**
* ReplicationManager should not take any action if the container is
* CLOSED and healthy.
*/
@Test
public void testHealthyClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(0, datanodeCommandHandler.getInvocation());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
for (ReplicationManagerReport.HealthState s :
ReplicationManagerReport.HealthState.values()) {
Assertions.assertEquals(0, report.getStat(s));
}
}
/**
* ReplicationManager should close the unhealthy OPEN container.
*/
@Test
public void testUnhealthyOpenContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.OPEN,
randomDatanodeDetails(),
randomDatanodeDetails());
replicas.addAll(getReplicas(id, State.UNHEALTHY, randomDatanodeDetails()));
containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
final CloseContainerEventHandler closeContainerHandler =
Mockito.mock(CloseContainerEventHandler.class);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
replicationManager.processAll();
eventQueue.processAll(1000);
Mockito.verify(closeContainerHandler, Mockito.times(1))
.onMessage(id, eventQueue);
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.OPEN));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
}
/**
* ReplicationManager should skip send close command to unhealthy replica.
*/
@Test
public void testCloseUnhealthyReplica() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
final Set<ContainerReplica> replicas = getReplicas(id, State.UNHEALTHY,
randomDatanodeDetails());
replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
containerStateManager.updateContainerReplica(id, replica);
}
replicationManager.processAll();
// Wait for EventQueue to call the event handler
eventQueue.processAll(1000);
Assertions.assertEquals(2, datanodeCommandHandler.getInvocation());
}
@Test
public void testGeneratedConfig() {
ReplicationManagerConfiguration rmc =
OzoneConfiguration.newInstanceOf(ReplicationManagerConfiguration.class);
//default is not included in ozone-site.xml but generated from annotation
//to the ozone-site-generated.xml which should be loaded by the
// OzoneConfiguration.
Assertions.assertEquals(1800000, rmc.getEventTimeout());
}
@Test
public void additionalReplicaScheduledWhenMisReplicated() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
// Ensure a mis-replicated status is returned for any containers in this
// test where there are 3 replicas. When there are 2 or 4 replicas
// the status returned will be healthy.
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list.size() == 3),
Mockito.anyInt()
)).thenAnswer(invocation -> {
return new ContainerPlacementStatusDefault(1, 2, 3);
});
int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
final long currentBytesToReplicate = replicationManager.getMetrics()
.getNumReplicationBytesTotal();
replicationManager.processAll();
eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validateContainerPlacement
// the policy will not be satisfied, and replication will be triggered.
Assertions.assertEquals(currentReplicateCommandCount + 1,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + 1,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(currentBytesToReplicate + 100,
replicationManager.getMetrics().getNumReplicationBytesTotal());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(1, report.getStat(LifeCycleState.CLOSED));
Assertions.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.MIS_REPLICATED));
// Now make it so that all containers seem mis-replicated no matter how
// many replicas. This will test replicas are not scheduled if the new
// replica does not fix the mis-replication.
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.anyList(),
Mockito.anyInt()
)).thenAnswer(invocation -> {
return new ContainerPlacementStatusDefault(1, 2, 3);
});
currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
// At this stage, due to the mocked calls to validateContainerPlacement
// the mis-replicated racks will not have improved, so expect to see nothing
// scheduled.
Assertions.assertEquals(currentReplicateCommandCount, datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount,
replicationManager.getMetrics().getNumReplicationCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightReplication());
}
@Test
public void overReplicatedButRemovingMakesMisReplicated() throws IOException {
// In this test, the excess replica should not be removed.
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFive = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
containerStateManager.updateContainerReplica(id, replicaFive);
// Ensure a mis-replicated status is returned for any containers in this
// test where there are exactly 3 replicas checked.
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list.size() == 3),
Mockito.anyInt()
)).thenAnswer(
invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
// The unhealthy replica should be removed, but not the other replica
// as each time we test with 3 replicas, Mockito ensures it returns
// mis-replicated
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
replicaFive.getDatanodeDetails()));
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
assertOverReplicatedCount(1);
}
@Test
public void testOverReplicatedAndPolicySatisfied() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list.size() == 3),
Mockito.anyInt()
)).thenAnswer(
invocation -> new ContainerPlacementStatusDefault(2, 2, 3));
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 1,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 1,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
assertOverReplicatedCount(1);
}
@Test
public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
final ContainerReplica replicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaTwo = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
final ContainerReplica replicaFive = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
containerStateManager.addContainer(container.getProtobuf());
containerStateManager.updateContainerReplica(id, replicaOne);
containerStateManager.updateContainerReplica(id, replicaTwo);
containerStateManager.updateContainerReplica(
id, replicaThree);
containerStateManager.updateContainerReplica(id, replicaFour);
containerStateManager.updateContainerReplica(id, replicaFive);
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list != null && list.size() <= 4),
Mockito.anyInt()
)).thenAnswer(
invocation -> new ContainerPlacementStatusDefault(1, 2, 3));
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 2,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
}
/**
* ReplicationManager should replicate an additional replica if there are
* decommissioned replicas.
*/
@Test
public void testUnderReplicatedDueToDecommission() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
assertReplicaScheduled(2);
assertUnderReplicatedCount(1);
}
/**
* ReplicationManager should replicate an additional replica when all copies
* are decommissioning.
*/
@Test
public void testUnderReplicatedDueToAllDecommission() throws IOException {
runTestUnderReplicatedDueToAllDecommission(3);
}
Void runTestUnderReplicatedDueToAllDecommission(int expectedReplication)
throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
assertReplicaScheduled(expectedReplication);
assertUnderReplicatedCount(1);
return null;
}
@Test
public void testReplicationLimit() throws Exception {
runTestLimit(1, 0, 2, 0,
() -> runTestUnderReplicatedDueToAllDecommission(1));
}
void runTestLimit(int replicationLimit, int deletionLimit,
int expectedReplicationSkipped, int expectedDeletionSkipped,
Callable<Void> testcase) throws Exception {
createReplicationManager(replicationLimit, deletionLimit);
final ReplicationManagerMetrics metrics = replicationManager.getMetrics();
final long replicationSkipped = metrics.getInflightReplicationSkipped();
final long deletionSkipped = metrics.getInflightDeletionSkipped();
testcase.call();
Assertions.assertEquals(replicationSkipped + expectedReplicationSkipped,
metrics.getInflightReplicationSkipped());
Assertions.assertEquals(deletionSkipped + expectedDeletionSkipped,
metrics.getInflightDeletionSkipped());
//reset limits for other tests.
createReplicationManager(0, 0);
}
/**
* ReplicationManager should not take any action when the container is
* correctly replicated with decommissioned replicas still present.
*/
@Test
public void testCorrectlyReplicatedWithDecommission() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
assertReplicaScheduled(0);
assertUnderReplicatedCount(0);
}
/**
* ReplicationManager should replicate an additional replica when min rep
* is not met for maintenance.
*/
@Test
public void testUnderReplicatedDueToMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(1);
assertUnderReplicatedCount(1);
}
/**
* ReplicationManager should not replicate an additional replica when if
* min replica for maintenance is 1 and another replica is available.
*/
@Test
public void testNotUnderReplicatedDueToMaintenanceMinRepOne()
throws Exception {
replicationManager.stop();
ReplicationManagerConfiguration newConf =
new ReplicationManagerConfiguration();
newConf.setMaintenanceReplicaMinimum(1);
dbStore.close();
createReplicationManager(newConf);
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(0);
assertUnderReplicatedCount(0);
}
/**
* ReplicationManager should replicate an additional replica when all copies
* are going off line and min rep is 1.
*/
@Test
public void testUnderReplicatedDueToMaintenanceMinRepOne()
throws Exception {
replicationManager.stop();
ReplicationManagerConfiguration newConf =
new ReplicationManagerConfiguration();
newConf.setMaintenanceReplicaMinimum(1);
dbStore.close();
createReplicationManager(newConf);
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(1);
assertUnderReplicatedCount(1);
}
/**
* ReplicationManager should replicate additional replica when all copies
* are going into maintenance.
*/
@Test
public void testUnderReplicatedDueToAllMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(2);
assertUnderReplicatedCount(1);
}
/**
* ReplicationManager should not replicate additional replica sufficient
* replica are available.
*/
@Test
public void testCorrectlyReplicatedWithMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(0);
assertUnderReplicatedCount(0);
}
/**
* ReplicationManager should replicate additional replica when all copies
* are decommissioning or maintenance.
*/
@Test
public void testUnderReplicatedWithDecommissionAndMaintenance()
throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
assertReplicaScheduled(2);
assertUnderReplicatedCount(1);
}
/**
* ReplicationManager should replicate zero replica when all copies
* are missing.
*/
@Test
public void testContainerWithMissingReplicas()
throws IOException {
createContainer(LifeCycleState.CLOSED);
assertReplicaScheduled(0);
assertUnderReplicatedCount(1);
assertMissingCount(1);
}
/**
* When a CLOSED container is over replicated, ReplicationManager
* deletes the excess replicas. While choosing the replica for deletion
* ReplicationManager should not attempt to remove a DECOMMISSION or
* MAINTENANCE replica.
*/
@Test
public void testOverReplicatedClosedContainerWithDecomAndMaint()
throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + 2,
datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + 2,
replicationManager.getMetrics().getNumDeletionCmdsSent());
Assertions.assertEquals(1, getInflightCount(InflightType.DELETION));
Assertions.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
// Get the DECOM and Maint replica and ensure none of them are scheduled
// for removal
Set<ContainerReplica> decom =
containerStateManager.getContainerReplicas(
container.containerID())
.stream()
.filter(r -> r.getDatanodeDetails().getPersistedOpState() != IN_SERVICE)
.collect(Collectors.toSet());
for (ContainerReplica r : decom) {
Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand,
r.getDatanodeDetails()));
}
assertOverReplicatedCount(1);
}
/**
* Replication Manager should not attempt to replicate from an unhealthy
* (stale or dead) node. To test this, setup a scenario where a replia needs
* to be created, but mark all nodes stale. That way, no new replica will be
* scheduled.
*/
@Test
public void testUnderReplicatedNotHealthySource() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceStale(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
// There should be replica scheduled, but as all nodes are stale, nothing
// gets scheduled.
assertReplicaScheduled(0);
assertUnderReplicatedCount(1);
}
/**
* if all the prerequisites are satisfied, move should work as expected.
*/
@Test
public void testMove() throws IOException, NodeNotFoundException,
InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.replicateContainerCommand, dn3));
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
//replicate container to dn3
addReplicaToDn(container, dn3, CLOSED);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
containerStateManager.removeContainerReplica(id, dn1);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED);
}
/**
* if crash happened and restarted, move option should work as expected.
*/
@Test
public void testMoveCrashAndRestart() throws IOException,
NodeNotFoundException, InterruptedException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.replicateContainerCommand, dn3));
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
//crash happens, restart scm.
//clear current inflight actions and reload inflightMove from DBStore.
resetReplicationManager();
replicationManager.getMoveScheduler()
.reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
Assertions.assertTrue(replicationManager.getMoveScheduler()
.getInflightMove().containsKey(id));
MoveDataNodePair kv = replicationManager.getMoveScheduler()
.getInflightMove().get(id);
Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
Assertions.assertEquals(kv.getTgt(), dn3);
serviceManager.notifyStatusChanged();
Thread.sleep(100L);
// now, the container is not over-replicated,
// so no deleteContainerCommand will be sent
Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
//replica does not exist in target datanode, so a replicateContainerCommand
//will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
//replicate container to dn3, now, over-replicated
addReplicaToDn(container, dn3, CLOSED);
replicationManager.processAll();
eventQueue.processAll(1000);
//deleteContainerCommand is sent, but the src replica is not deleted now
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
//crash happens, restart scm.
//clear current inflight actions and reload inflightMove from DBStore.
resetReplicationManager();
replicationManager.getMoveScheduler()
.reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
Assertions.assertTrue(replicationManager.getMoveScheduler()
.getInflightMove().containsKey(id));
kv = replicationManager.getMoveScheduler()
.getInflightMove().get(id);
Assertions.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
Assertions.assertEquals(kv.getTgt(), dn3);
serviceManager.notifyStatusChanged();
//after restart and the container is over-replicated now,
//deleteContainerCommand will be sent again
Assertions.assertEquals(2, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
containerStateManager.removeContainerReplica(id, dn1);
//replica in src datanode is deleted now
containerStateManager.removeContainerReplica(id, dn1);
replicationManager.processAll();
eventQueue.processAll(1000);
//since the move is complete,so after scm crash and restart
//inflightMove should not contain the container again
resetReplicationManager();
replicationManager.getMoveScheduler()
.reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
Assertions.assertFalse(replicationManager.getMoveScheduler()
.getInflightMove().containsKey(id));
//completeableFuture is not stored in DB, so after scm crash and
//restart ,completeableFuture is missing
}
/**
* make sure RM does not delete replica if placement policy is not satisfied.
*/
@Test
public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
throws IOException, NodeNotFoundException,
InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
ContainerReplica dn2 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
Assertions.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
Assertions.assertTrue(datanodeCommandHandler.received(
SCMCommandProto.Type.replicateContainerCommand, dn4));
Assertions.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
//replicate container to dn4
addReplicaToDn(container, dn4, CLOSED);
//now, replication succeeds, but replica in dn2 lost,
//and there are only tree replicas totally, so rm should
//not delete the replica on dn1
containerStateManager.removeContainerReplica(id, dn2);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertFalse(datanodeCommandHandler.received(
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
Assertions.assertTrue(cf.isDone() &&
cf.get() == MoveResult.DELETE_FAIL_POLICY);
}
/**
* test src and target datanode become unhealthy when moving.
*/
@Test
public void testDnBecameUnhealthyWhenMoving() throws IOException,
NodeNotFoundException, InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE));
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
addReplicaToDn(container, dn3, CLOSED);
replicationManager.processAll();
eventQueue.processAll(1000);
nodeManager.setNodeStatus(dn1.getDatanodeDetails(),
new NodeStatus(IN_SERVICE, STALE));
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
}
/**
* before Replication Manager generates a completablefuture for a move option,
* some Prerequisites should be satisfied.
*/
@Test
public void testMovePrerequisites() throws IOException, NodeNotFoundException,
InterruptedException, ExecutionException,
InvalidStateTransitionException {
//all conditions is met
final ContainerInfo container = createContainer(LifeCycleState.OPEN);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
ContainerReplica dn2 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
ContainerReplica dn4 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
CompletableFuture<MoveResult> cf;
//the above move is executed successfully, so there may be some item in
//inflightReplication or inflightDeletion. here we stop replication manager
//to clear these states, which may impact the tests below.
//we don't need a running replicationManamger now
replicationManager.stop();
Thread.sleep(100L);
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.FAIL_NOT_RUNNING);
replicationManager.start();
Thread.sleep(100L);
//container in not in OPEN state
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
//open -> closing
containerStateManager.updateContainerState(id.getProtobuf(),
LifeCycleEvent.FINALIZE);
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
//closing -> quasi_closed
containerStateManager.updateContainerState(id.getProtobuf(),
LifeCycleEvent.QUASI_CLOSE);
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
//quasi_closed -> closed
containerStateManager.updateContainerState(id.getProtobuf(),
LifeCycleEvent.FORCE_CLOSE);
Assertions.assertSame(LifeCycleState.CLOSED,
containerStateManager.getContainer(id).getState());
//Node is not in healthy state
for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
if (state != HEALTHY) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(IN_SERVICE, state));
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
}
}
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
//Node is not in IN_SERVICE state
for (HddsProtos.NodeOperationalState state :
HddsProtos.NodeOperationalState.values()) {
if (state != IN_SERVICE) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(state, HEALTHY));
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
}
}
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
//container exists in target datanode
cf = replicationManager.move(id, dn1.getDatanodeDetails(),
dn2.getDatanodeDetails());
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
//container does not exist in source datanode
cf = replicationManager.move(id, dn3, dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
//make container over relplicated to test the
// case that container is in inflightDeletion
ContainerReplica dn5 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED);
replicationManager.processAll();
//waiting for inflightDeletion generation
eventQueue.processAll(1000);
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
resetReplicationManager();
//make the replica num be 2 to test the case
//that container is in inflightReplication
containerStateManager.removeContainerReplica(id, dn5);
containerStateManager.removeContainerReplica(id, dn4);
//replication manager should generate inflightReplication
replicationManager.processAll();
//waiting for inflightReplication generation
eventQueue.processAll(1000);
cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
Assertions.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
}
@Test
public void testReplicateCommandTimeout() throws IOException {
long timeout = new ReplicationManagerConfiguration().getEventTimeout();
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
assertReplicaScheduled(1);
// Already a pending replica, so nothing scheduled
assertReplicaScheduled(0);
// Advance the clock past the timeout, and there should be a replica
// scheduled
clock.fastForward(timeout + 1000);
assertReplicaScheduled(1);
Assertions.assertEquals(1, replicationManager.getMetrics()
.getNumReplicationCmdsTimeout());
}
@Test
public void testDeleteCommandTimeout() throws
IOException, InterruptedException {
long timeout = new ReplicationManagerConfiguration().getEventTimeout();
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
assertDeleteScheduled(1);
// Already a pending replica, so nothing scheduled
assertReplicaScheduled(0);
// Advance the clock past the timeout, and there should be a replica
// scheduled
clock.fastForward(timeout + 1000);
assertDeleteScheduled(1);
Assertions.assertEquals(1, replicationManager.getMetrics()
.getNumDeletionCmdsTimeout());
}
/**
* A closed empty container with all the replicas also closed and empty
* should be deleted.
* A container/ replica should be deemed empty when it has 0 keyCount even
* if the usedBytes is not 0 (usedBytes should not be used to determine if
* the container or replica is empty).
*/
@Test
public void testDeleteEmptyContainer() throws Exception {
runTestDeleteEmptyContainer(3);
}
Void runTestDeleteEmptyContainer(int expectedDelete) throws Exception {
// Create container with usedBytes = 1000 and keyCount = 0
final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 1000,
0);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
// Create a replica with usedBytes != 0 and keyCount = 0
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 0);
assertDeleteScheduled(expectedDelete);
return null;
}
@Test
public void testDeletionLimit() throws Exception {
runTestLimit(0, 2, 0, 1,
() -> runTestDeleteEmptyContainer(2));
}
/**
* A closed empty container with a non-empty replica should not be deleted.
*/
@Test
public void testDeleteEmptyContainerNonEmptyReplica() throws Exception {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED, 0,
0);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
// Create the 3rd replica with non-zero key count and used bytes
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED, 100, 1);
assertDeleteScheduled(0);
}
private ContainerInfo createContainer(LifeCycleState containerState)
throws IOException {
return createContainer(containerState, CONTAINER_USED_BYTES_DEFAULT,
CONTAINER_NUM_KEYS_DEFAULT);
}
private ContainerInfo createContainer(LifeCycleState containerState,
long usedBytes, long numKeys) throws IOException {
final ContainerInfo container = getContainer(containerState);
container.setUsedBytes(usedBytes);
container.setNumberOfKeys(numKeys);
containerStateManager.addContainer(container.getProtobuf());
return container;
}
private DatanodeDetails addNode(NodeStatus nodeStatus) {
DatanodeDetails dn = randomDatanodeDetails();
dn.setPersistedOpState(nodeStatus.getOperationalState());
dn.setPersistedOpStateExpiryEpochSec(
nodeStatus.getOpStateExpiryEpochSeconds());
nodeManager.register(dn, nodeStatus);
return dn;
}
private void resetReplicationManager() throws InterruptedException {
replicationManager.stop();
Thread.sleep(100L);
replicationManager.start();
Thread.sleep(100L);
}
private ContainerReplica addReplica(ContainerInfo container,
NodeStatus nodeStatus, State replicaState)
throws ContainerNotFoundException {
DatanodeDetails dn = addNode(nodeStatus);
return addReplicaToDn(container, dn, replicaState);
}
private ContainerReplica addReplica(ContainerInfo container,
NodeStatus nodeStatus, State replicaState, long usedBytes, long numOfKeys)
throws ContainerNotFoundException {
DatanodeDetails dn = addNode(nodeStatus);
return addReplicaToDn(container, dn, replicaState, usedBytes, numOfKeys);
}
private ContainerReplica addReplicaToDn(ContainerInfo container,
DatanodeDetails dn, State replicaState)
throws ContainerNotFoundException {
// Using the same originID for all replica in the container set. If each
// replica has a unique originID, it causes problems in ReplicationManager
// when processing over-replicated containers.
final UUID originNodeId =
UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
final ContainerReplica replica = getReplicas(container.containerID(),
replicaState, container.getUsedBytes(), container.getNumberOfKeys(),
1000L, originNodeId, dn);
containerStateManager
.updateContainerReplica(container.containerID(), replica);
return replica;
}
private ContainerReplica addReplicaToDn(ContainerInfo container,
DatanodeDetails dn, State replicaState, long usedBytes, long numOfKeys)
throws ContainerNotFoundException {
// Using the same originID for all replica in the container set. If each
// replica has a unique originID, it causes problems in ReplicationManager
// when processing over-replicated containers.
final UUID originNodeId =
UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID()));
final ContainerReplica replica = getReplicas(container.containerID(),
replicaState, usedBytes, numOfKeys, 1000L, originNodeId, dn);
containerStateManager
.updateContainerReplica(container.containerID(), replica);
return replica;
}
private void assertReplicaScheduled(int delta) {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentReplicateCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.replicateContainerCommand));
Assertions.assertEquals(currentReplicateCommandCount + delta,
replicationManager.getMetrics().getNumReplicationCmdsSent());
}
private void assertDeleteScheduled(int delta) {
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
replicationManager.processAll();
eventQueue.processAll(1000);
Assertions.assertEquals(currentDeleteCommandCount + delta,
datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
Assertions.assertEquals(currentDeleteCommandCount + delta,
replicationManager.getMetrics().getNumDeletionCmdsSent());
}
private void assertUnderReplicatedCount(int count) {
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(count, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
private void assertMissingCount(int count) {
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(count, report.getStat(
ReplicationManagerReport.HealthState.MISSING));
}
private void assertOverReplicatedCount(int count) {
ReplicationManagerReport report = replicationManager.getContainerReport();
Assertions.assertEquals(count, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@AfterEach
public void teardown() throws Exception {
containerStateManager.close();
replicationManager.stop();
if (dbStore != null) {
dbStore.close();
}
FileUtils.deleteDirectory(testDir);
}
private static class DatanodeCommandHandler implements
EventHandler<CommandForDatanode> {
private AtomicInteger invocation = new AtomicInteger(0);
private Map<SCMCommandProto.Type, AtomicInteger> commandInvocation =
new HashMap<>();
private List<CommandForDatanode> commands = new ArrayList<>();
@Override
public void onMessage(final CommandForDatanode command,
final EventPublisher publisher) {
final SCMCommandProto.Type type = command.getCommand().getType();
commandInvocation.computeIfAbsent(type, k -> new AtomicInteger(0));
commandInvocation.get(type).incrementAndGet();
invocation.incrementAndGet();
commands.add(command);
}
private int getInvocation() {
return invocation.get();
}
private int getInvocationCount(SCMCommandProto.Type type) {
return commandInvocation.containsKey(type) ?
commandInvocation.get(type).get() : 0;
}
private List<CommandForDatanode> getReceivedCommands() {
return commands;
}
/**
* Returns true if the command handler has received the given
* command type for the provided datanode.
*
* @param type Command Type
* @param datanode DatanodeDetails
* @return True if command was received, false otherwise
*/
private boolean received(final SCMCommandProto.Type type,
final DatanodeDetails datanode) {
return commands.stream().anyMatch(dc ->
dc.getCommand().getType().equals(type) &&
dc.getDatanodeId().equals(datanode.getUuid()));
}
}
}