| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.hdds.scm.container; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.XceiverClientManager; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.ozone.container.common.SCMTestUtils; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.Iterator; |
| import java.util.Optional; |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| /** |
| * Tests for Container ContainerManager. |
| */ |
| public class TestSCMContainerManager { |
| private static SCMContainerManager containerManager; |
| private static MockNodeManager nodeManager; |
| private static PipelineManager pipelineManager; |
| private static File testDir; |
| private static XceiverClientManager xceiverClientManager; |
| private static String containerOwner = "OZONE"; |
| private static Random random; |
| |
| private static final long TIMEOUT = 10000; |
| |
| @Rule |
| public ExpectedException thrown = ExpectedException.none(); |
| @BeforeClass |
| public static void setUp() throws Exception { |
| Configuration conf = SCMTestUtils.getConf(); |
| |
| testDir = GenericTestUtils |
| .getTestDir(TestSCMContainerManager.class.getSimpleName()); |
| conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, |
| testDir.getAbsolutePath()); |
| conf.setTimeDuration( |
| ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, |
| TIMEOUT, |
| TimeUnit.MILLISECONDS); |
| boolean folderExisted = testDir.exists() || testDir.mkdirs(); |
| if (!folderExisted) { |
| throw new IOException("Unable to create test directory path"); |
| } |
| nodeManager = new MockNodeManager(true, 10); |
| pipelineManager = |
| new SCMPipelineManager(conf, nodeManager, new EventQueue(), null); |
| containerManager = new SCMContainerManager(conf, nodeManager, |
| pipelineManager, new EventQueue()); |
| xceiverClientManager = new XceiverClientManager(conf); |
| random = new Random(); |
| } |
| |
| @AfterClass |
| public static void cleanup() throws IOException { |
| if(containerManager != null) { |
| containerManager.close(); |
| } |
| if (pipelineManager != null) { |
| pipelineManager.close(); |
| } |
| FileUtil.fullyDelete(testDir); |
| } |
| |
| @Before |
| public void clearSafeMode() { |
| nodeManager.setSafemode(false); |
| } |
| |
| @Test |
| public void testallocateContainer() throws Exception { |
| ContainerInfo containerInfo = containerManager.allocateContainer( |
| xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), |
| containerOwner); |
| Assert.assertNotNull(containerInfo); |
| } |
| |
| @Test |
| public void testallocateContainerDistributesAllocation() throws Exception { |
| /* This is a lame test, we should really be testing something like |
| z-score or make sure that we don't have 3sigma kind of events. Too lazy |
| to write all that code. This test very lamely tests if we have more than |
| 5 separate nodes from the list of 10 datanodes that got allocated a |
| container. |
| */ |
| Set<UUID> pipelineList = new TreeSet<>(); |
| for (int x = 0; x < 30; x++) { |
| ContainerInfo containerInfo = containerManager.allocateContainer( |
| xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), |
| containerOwner); |
| |
| Assert.assertNotNull(containerInfo); |
| Assert.assertNotNull(containerInfo.getPipelineID()); |
| pipelineList.add(pipelineManager.getPipeline( |
| containerInfo.getPipelineID()).getFirstNode() |
| .getUuid()); |
| } |
| Assert.assertTrue(pipelineList.size() > 5); |
| } |
| |
| @Test |
| public void testAllocateContainerInParallel() throws Exception { |
| int threadCount = 20; |
| List<ExecutorService> executors = new ArrayList<>(threadCount); |
| for (int i = 0; i < threadCount; i++) { |
| executors.add(Executors.newSingleThreadExecutor()); |
| } |
| List<CompletableFuture<ContainerInfo>> futureList = |
| new ArrayList<>(threadCount); |
| for (int i = 0; i < threadCount; i++) { |
| final CompletableFuture<ContainerInfo> future = new CompletableFuture<>(); |
| CompletableFuture.supplyAsync(() -> { |
| try { |
| ContainerInfo containerInfo = containerManager |
| .allocateContainer(xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), containerOwner); |
| |
| Assert.assertNotNull(containerInfo); |
| Assert.assertNotNull(containerInfo.getPipelineID()); |
| future.complete(containerInfo); |
| return containerInfo; |
| } catch (IOException e) { |
| future.completeExceptionally(e); |
| } |
| return future; |
| }, executors.get(i)); |
| futureList.add(future); |
| } |
| try { |
| CompletableFuture |
| .allOf(futureList.toArray(new CompletableFuture[futureList.size()])) |
| .get(); |
| } catch (Exception e) { |
| Assert.fail("testAllocateBlockInParallel failed"); |
| } |
| } |
| |
| @Test |
| public void testGetContainer() throws IOException { |
| ContainerInfo containerInfo = containerManager.allocateContainer( |
| xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), |
| containerOwner); |
| Assert.assertNotNull(containerInfo); |
| Pipeline pipeline = pipelineManager |
| .getPipeline(containerInfo.getPipelineID()); |
| Assert.assertNotNull(pipeline); |
| Assert.assertEquals(containerInfo, |
| containerManager.getContainer(containerInfo.containerID())); |
| } |
| |
| @Test |
| public void testGetContainerWithPipeline() throws Exception { |
| ContainerInfo contInfo = containerManager |
| .allocateContainer(xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), containerOwner); |
| // Add dummy replicas for container. |
| Iterator<DatanodeDetails> nodes = pipelineManager |
| .getPipeline(contInfo.getPipelineID()).getNodes().iterator(); |
| DatanodeDetails dn1 = nodes.next(); |
| containerManager.updateContainerState(contInfo.containerID(), |
| LifeCycleEvent.FINALIZE); |
| containerManager |
| .updateContainerState(contInfo.containerID(), LifeCycleEvent.CLOSE); |
| ContainerInfo finalContInfo = contInfo; |
| Assert.assertEquals(0, |
| containerManager.getContainerReplicas( |
| finalContInfo.containerID()).size()); |
| |
| containerManager.updateContainerReplica(contInfo.containerID(), |
| ContainerReplica.newBuilder().setContainerID(contInfo.containerID()) |
| .setContainerState(ContainerReplicaProto.State.CLOSED) |
| .setDatanodeDetails(dn1).build()); |
| |
| Assert.assertEquals(1, |
| containerManager.getContainerReplicas( |
| finalContInfo.containerID()).size()); |
| |
| contInfo = containerManager.getContainer(contInfo.containerID()); |
| Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED); |
| // After closing the container, we should get the replica and construct |
| // standalone pipeline. No more ratis pipeline. |
| |
| Set<DatanodeDetails> replicaNodes = containerManager |
| .getContainerReplicas(contInfo.containerID()) |
| .stream().map(ContainerReplica::getDatanodeDetails) |
| .collect(Collectors.toSet()); |
| Assert.assertTrue(replicaNodes.contains(dn1)); |
| } |
| |
| @Test |
| public void testGetContainerReplicaWithParallelUpdate() throws Exception { |
| testGetContainerWithPipeline(); |
| final Optional<ContainerID> id = containerManager.getContainerIDs() |
| .stream().findFirst(); |
| Assert.assertTrue(id.isPresent()); |
| final ContainerID cId = id.get(); |
| final Optional<ContainerReplica> replica = containerManager |
| .getContainerReplicas(cId).stream().findFirst(); |
| Assert.assertTrue(replica.isPresent()); |
| final ContainerReplica cReplica = replica.get(); |
| final AtomicBoolean runUpdaterThread = |
| new AtomicBoolean(true); |
| |
| Thread updaterThread = new Thread(() -> { |
| while (runUpdaterThread.get()) { |
| try { |
| containerManager.removeContainerReplica(cId, cReplica); |
| containerManager.updateContainerReplica(cId, cReplica); |
| } catch (ContainerException e) { |
| Assert.fail("Container Exception: " + e.getMessage()); |
| } |
| } |
| }); |
| |
| updaterThread.setDaemon(true); |
| updaterThread.start(); |
| |
| IntStream.range(0, 100).forEach(i -> { |
| try { |
| Assert.assertNotNull(containerManager |
| .getContainerReplicas(cId) |
| .stream().map(ContainerReplica::getDatanodeDetails) |
| .collect(Collectors.toSet())); |
| } catch (ContainerNotFoundException e) { |
| Assert.fail("Missing Container " + id); |
| } |
| }); |
| runUpdaterThread.set(false); |
| } |
| |
| @Test |
| public void testgetNoneExistentContainer() { |
| try { |
| containerManager.getContainer(ContainerID.valueof( |
| random.nextInt() & Integer.MAX_VALUE)); |
| Assert.fail(); |
| } catch (ContainerNotFoundException ex) { |
| // Success! |
| } |
| } |
| |
| @Test |
| public void testCloseContainer() throws IOException { |
| ContainerID id = createContainer().containerID(); |
| containerManager.updateContainerState(id, |
| HddsProtos.LifeCycleEvent.FINALIZE); |
| containerManager.updateContainerState(id, |
| HddsProtos.LifeCycleEvent.CLOSE); |
| ContainerInfo closedContainer = containerManager.getContainer(id); |
| Assert.assertEquals(LifeCycleState.CLOSED, closedContainer.getState()); |
| } |
| |
| /** |
| * Creates a container with the given name in SCMContainerManager. |
| * @throws IOException |
| */ |
| private ContainerInfo createContainer() |
| throws IOException { |
| nodeManager.setSafemode(false); |
| return containerManager |
| .allocateContainer(xceiverClientManager.getType(), |
| xceiverClientManager.getFactor(), containerOwner); |
| } |
| |
| } |