blob: b17f5d630d52bc109c57f027fedcf3392de7c703 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
import static org.junit.Assert.assertThrows;
/**
* Tests the ECUnderReplicationHandling functionality.
*/
public class TestECUnderReplicationHandler {
private ECReplicationConfig repConfig;
private ContainerInfo container;
private NodeManager nodeManager;
private OzoneConfiguration conf;
private PlacementPolicy policy;
@BeforeEach
public void setup() {
nodeManager = new MockNodeManager(true, 10) {
@Override
public NodeStatus getNodeStatus(DatanodeDetails dd) {
return NodeStatus.inServiceHealthy();
}
};
conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(3, 2);
container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
policy = ReplicationTestUtil
.getSimpleTestPlacementPolicy(nodeManager, conf);
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
}
@Test
public void testUnderReplicationWithMissingParityIndex5() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 0, policy);
}
@Test
public void testUnderReplicationWithMissingIndex34() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 5));
testUnderReplicationWithMissingIndexes(ImmutableList.of(3, 4),
availableReplicas, 0, policy);
}
@Test
public void testUnderReplicationWithMissingIndex2345() throws IOException {
Set<ContainerReplica> availableReplicas =
ReplicationTestUtil.createReplicas(Pair.of(IN_SERVICE, 1));
testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5),
availableReplicas, 0, policy);
}
@Test
public void testUnderReplicationWithMissingIndex12345() throws IOException {
Set<ContainerReplica> availableReplicas = new HashSet<>();
testUnderReplicationWithMissingIndexes(ImmutableList.of(1, 2, 3, 4, 5),
availableReplicas, 0, policy);
}
@Test
public void testUnderReplicationWithDecomIndex1() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
Map<DatanodeDetails, SCMCommand<?>> cmds =
testUnderReplicationWithMissingIndexes(
Lists.emptyList(), availableReplicas, 1, policy);
Assertions.assertEquals(1, cmds.size());
// Check the replicate command has index 1 set
ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds.values()
.iterator().next();
Assertions.assertEquals(1, cmd.getReplicaIndex());
}
@Test
public void testUnderReplicationWithDecomIndex12() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas,
2, policy);
}
@Test
public void testUnderReplicationWithMixedDecomAndMissingIndexes()
throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 2, policy);
}
@Test
public void testExceptionIfNoNodesFound() {
PlacementPolicy noNodesPolicy = ReplicationTestUtil
.getNoNodesTestPlacementPolicy(nodeManager, conf);
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
assertThrows(SCMException.class, () ->
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 2, noNodesPolicy));
}
@Test
public void testSameNodeIsNotReused() throws IOException {
DatanodeDetails newDn = MockDatanodeDetails.randomDatanodeDetails();
PlacementPolicy sameNodePolicy = ReplicationTestUtil
.getSameNodeTestPlacementPolicy(nodeManager, conf, newDn);
// Just have a missing index, this should return OK.
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
// Passing zero for decommIndexes, as we don't expect the decom command to
// get created due to the placement policy returning an already used node
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
availableReplicas, 0, sameNodePolicy);
// Now add a decommissioning index - we should get an exception as the
// placement policy should throw. It will have used up the node for
// reconstruction, and hence no nodes will be found to fix the decommission
// index.
Set<ContainerReplica> replicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
assertThrows(SCMException.class, () ->
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
replicas, 1, sameNodePolicy));
}
public Map<DatanodeDetails, SCMCommand<?>>
testUnderReplicationWithMissingIndexes(
List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
int decomIndexes, PlacementPolicy placementPolicy) throws IOException {
ECUnderReplicationHandler ecURH =
new ECUnderReplicationHandler(placementPolicy, conf, nodeManager);
ContainerHealthResult.UnderReplicatedHealthResult result =
Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
Mockito.when(result.isUnrecoverable()).thenReturn(false);
Mockito.when(result.getContainerInfo()).thenReturn(container);
Map<DatanodeDetails, SCMCommand<?>> datanodeDetailsSCMCommandMap = ecURH
.processAndCreateCommands(availableReplicas, ImmutableList.of(),
result, 1);
Set<Map.Entry<DatanodeDetails, SCMCommand<?>>> entries =
datanodeDetailsSCMCommandMap.entrySet();
int replicateCommand = 0;
int reconstructCommand = 0;
byte[] missingIndexesByteArr = new byte[missingIndexes.size()];
for (int i = 0; i < missingIndexes.size(); i++) {
missingIndexesByteArr[i] = missingIndexes.get(i).byteValue();
}
boolean shouldReconstructCommandExist =
missingIndexes.size() > 0 && missingIndexes.size() <= repConfig
.getParity();
for (Map.Entry<DatanodeDetails, SCMCommand<?>> dnCommand : entries) {
if (dnCommand.getValue() instanceof ReplicateContainerCommand) {
replicateCommand++;
} else if (dnCommand
.getValue() instanceof ReconstructECContainersCommand) {
if (shouldReconstructCommandExist) {
Assertions.assertArrayEquals(missingIndexesByteArr,
((ReconstructECContainersCommand) dnCommand.getValue())
.getMissingContainerIndexes());
}
reconstructCommand++;
}
}
Assertions.assertEquals(decomIndexes, replicateCommand);
Assertions.assertEquals(shouldReconstructCommandExist ? 1 : 0,
reconstructCommand);
return datanodeDetailsSCMCommandMap;
}
}