blob: ed8b1e3d796aa9f25b414b71bddf7ca1842347cb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.primitives.Longs;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;
import org.slf4j.event.Level;
/**
* Tests for ContainerStateManager.
*/
public class TestContainerStateManagerIntegration {
private OzoneConfiguration conf;
private MiniOzoneCluster cluster;
private XceiverClientManager xceiverClientManager;
private StorageContainerManager scm;
private ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private PipelineSelector selector;
private String containerOwner = "OZONE";
@Before
public void setup() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
cluster.waitTobeOutOfChillMode();
xceiverClientManager = new XceiverClientManager(conf);
scm = cluster.getStorageContainerManager();
containerManager = scm.getContainerManager();
containerStateManager = containerManager.getStateManager();
selector = containerManager.getPipelineSelector();
}
@After
public void cleanUp() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAllocateContainer() throws IOException {
// Allocate a container and verify the container info
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes());
Assert.assertEquals(containerOwner, info.getOwner());
Assert.assertEquals(xceiverClientManager.getType(),
info.getReplicationType());
Assert.assertEquals(xceiverClientManager.getFactor(),
info.getReplicationFactor());
Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState());
// Check there are two containers in ALLOCATED state after allocation
ContainerWithPipeline container2 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
int numContainers = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertNotEquals(container1.getContainerInfo().getContainerID(),
container2.getContainerInfo().getContainerID());
Assert.assertEquals(2, numContainers);
}
@Test
public void testContainerStateManagerRestart() throws IOException {
// Allocate 5 containers in ALLOCATED state and 5 in CREATING state
List<ContainerInfo> containers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ContainerWithPipeline container = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers.add(container.getContainerInfo());
if (i >= 5) {
scm.getContainerManager().updateContainerState(container
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
}
}
// New instance of ContainerStateManager should load all the containers in
// container store.
ContainerStateManager stateManager =
new ContainerStateManager(conf, containerManager, selector);
int matchCount = stateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(5, matchCount);
matchCount = stateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(5, matchCount);
}
@Test
public void testGetMatchingContainer() throws IOException {
ContainerWithPipeline container1 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
ContainerWithPipeline container2 = scm.getClientProtocolServer().
allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
ContainerInfo info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
containerManager
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
// space has already been allocated in container1, now container 2 should
// be chosen.
info = containerStateManager
.getMatchingContainer(OzoneConsts.GB * 3, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container2.getContainerInfo().getContainerID(),
info.getContainerID());
}
@Test
public void testUpdateContainerState() throws IOException {
NavigableSet<ContainerID> containerList = containerStateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED);
int containers = containerList == null ? 0 : containerList.size();
Assert.assertEquals(0, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.ALLOCATED).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CREATING).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CLOSING).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.DELETE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
containerManager
.updateContainerState(container1.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLEANUP);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETED).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// DELETING
ContainerWithPipeline container2 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container2.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.TIMEOUT);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.DELETING).size();
Assert.assertEquals(1, containers);
// Allocate container1 and update its state from ALLOCATED -> CREATING ->
// OPEN -> CLOSING -> CLOSED
ContainerWithPipeline container3 = scm.getClientProtocolServer()
.allocateContainer(
xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
containerManager
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
containerManager
.updateContainerState(container3.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CLOSE);
containers = containerStateManager.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.CLOSED).size();
Assert.assertEquals(1, containers);
}
@Test
public void testUpdatingAllocatedBytes() throws Exception {
ContainerWithPipeline container1 = scm.getClientProtocolServer()
.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
containerManager.updateContainerState(container1
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATE);
containerManager.updateContainerState(container1
.getContainerInfo().getContainerID(),
HddsProtos.LifeCycleEvent.CREATED);
Random ran = new Random();
long allocatedSize = 0;
for (int i = 0; i<5; i++) {
long size = Math.abs(ran.nextLong() % OzoneConsts.GB);
allocatedSize += size;
// trigger allocating bytes by calling getMatchingContainer
ContainerInfo info = containerStateManager
.getMatchingContainer(size, containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
HddsProtos.LifeCycleState.OPEN);
Assert.assertEquals(container1.getContainerInfo().getContainerID(),
info.getContainerID());
SCMContainerManager containerMapping =
(SCMContainerManager) containerManager;
// manually trigger a flush, this will persist the allocated bytes value
// to disk
containerMapping.flushContainerInfo();
// the persisted value should always be equal to allocated size.
byte[] containerBytes = containerMapping.getContainerStore().get(
Longs.toByteArray(container1.getContainerInfo().getContainerID()));
HddsProtos.SCMContainerInfo infoProto =
HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto);
Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes());
}
}
@Test
public void testReplicaMap() throws Exception {
GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(ContainerStateMap.getLOG());
DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
.setIpAddress("1.1.1.1")
.setUuid(UUID.randomUUID().toString()).build();
DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2")
.setIpAddress("2.2.2.2")
.setUuid(UUID.randomUUID().toString()).build();
// Test 1: no replica's exist
ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong());
Set<DatanodeDetails> replicaSet;
LambdaTestUtils.intercept(SCMException.class, "", () -> {
containerStateManager.getContainerReplicas(containerID);
});
// Test 2: Add replica nodes and then test
containerStateManager.addContainerReplica(containerID, dn1);
containerStateManager.addContainerReplica(containerID, dn2);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(2, replicaSet.size());
Assert.assertTrue(replicaSet.contains(dn1));
Assert.assertTrue(replicaSet.contains(dn2));
// Test 3: Remove one replica node and then test
containerStateManager.removeContainerReplica(containerID, dn1);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(1, replicaSet.size());
Assert.assertFalse(replicaSet.contains(dn1));
Assert.assertTrue(replicaSet.contains(dn2));
// Test 3: Remove second replica node and then test
containerStateManager.removeContainerReplica(containerID, dn2);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(0, replicaSet.size());
Assert.assertFalse(replicaSet.contains(dn1));
Assert.assertFalse(replicaSet.contains(dn2));
// Test 4: Re-insert dn1
containerStateManager.addContainerReplica(containerID, dn1);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(1, replicaSet.size());
Assert.assertTrue(replicaSet.contains(dn1));
Assert.assertFalse(replicaSet.contains(dn2));
// Re-insert dn2
containerStateManager.addContainerReplica(containerID, dn2);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(2, replicaSet.size());
Assert.assertTrue(replicaSet.contains(dn1));
Assert.assertTrue(replicaSet.contains(dn2));
Assert.assertFalse(logCapturer.getOutput().contains(
"ReplicaMap already contains entry for container Id: " + containerID
.toString() + ",DataNode: " + dn1.toString()));
// Re-insert dn1
containerStateManager.addContainerReplica(containerID, dn1);
replicaSet = containerStateManager.getContainerReplicas(containerID);
Assert.assertEquals(2, replicaSet.size());
Assert.assertTrue(replicaSet.contains(dn1));
Assert.assertTrue(replicaSet.contains(dn2));
Assert.assertTrue(logCapturer.getOutput().contains(
"ReplicaMap already contains entry for container Id: " + containerID
.toString() + ",DataNode: " + dn1.toString()));
}
}