| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.hdds.scm.container; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.node.NodeManager; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.server |
| .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; |
| import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; |
| |
| /** |
| * Test the behaviour of the ContainerReportHandler. |
| */ |
| public class TestContainerReportHandler { |
| |
| private NodeManager nodeManager; |
| private ContainerManager containerManager; |
| private ContainerStateManager containerStateManager; |
| private EventPublisher publisher; |
| |
| @Before |
| public void setup() throws IOException { |
| final Configuration conf = new OzoneConfiguration(); |
| this.nodeManager = new MockNodeManager(true, 10); |
| this.containerManager = Mockito.mock(ContainerManager.class); |
| this.containerStateManager = new ContainerStateManager(conf); |
| this.publisher = Mockito.mock(EventPublisher.class); |
| |
| |
| Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class))) |
| .thenAnswer(invocation -> containerStateManager |
| .getContainer((ContainerID)invocation.getArguments()[0])); |
| |
| Mockito.when(containerManager.getContainerReplicas( |
| Mockito.any(ContainerID.class))) |
| .thenAnswer(invocation -> containerStateManager |
| .getContainerReplicas((ContainerID)invocation.getArguments()[0])); |
| |
| Mockito.doAnswer(invocation -> { |
| containerStateManager |
| .updateContainerState((ContainerID)invocation.getArguments()[0], |
| (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]); |
| return null; |
| }).when(containerManager).updateContainerState( |
| Mockito.any(ContainerID.class), |
| Mockito.any(HddsProtos.LifeCycleEvent.class)); |
| |
| Mockito.doAnswer(invocation -> { |
| containerStateManager.updateContainerReplica( |
| (ContainerID) invocation.getArguments()[0], |
| (ContainerReplica) invocation.getArguments()[1]); |
| return null; |
| }).when(containerManager).updateContainerReplica( |
| Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); |
| |
| Mockito.doAnswer(invocation -> { |
| containerStateManager.removeContainerReplica( |
| (ContainerID) invocation.getArguments()[0], |
| (ContainerReplica) invocation.getArguments()[1]); |
| return null; |
| }).when(containerManager).removeContainerReplica( |
| Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); |
| |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| containerStateManager.close(); |
| } |
| |
| @Test |
| public void testUnderReplicatedContainer() |
| throws NodeNotFoundException, ContainerNotFoundException, SCMException { |
| |
| final ContainerReportHandler reportHandler = new ContainerReportHandler( |
| nodeManager, containerManager); |
| final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( |
| NodeState.HEALTHY).iterator(); |
| final DatanodeDetails datanodeOne = nodeIterator.next(); |
| final DatanodeDetails datanodeTwo = nodeIterator.next(); |
| final DatanodeDetails datanodeThree = nodeIterator.next(); |
| |
| final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); |
| final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); |
| final Set<ContainerID> containerIDSet = Stream.of( |
| containerOne.containerID(), containerTwo.containerID()) |
| .collect(Collectors.toSet()); |
| |
| nodeManager.setContainers(datanodeOne, containerIDSet); |
| nodeManager.setContainers(datanodeTwo, containerIDSet); |
| nodeManager.setContainers(datanodeThree, containerIDSet); |
| |
| containerStateManager.loadContainer(containerOne); |
| containerStateManager.loadContainer(containerTwo); |
| |
| getReplicas(containerOne.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree) |
| .forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerOne.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| getReplicas(containerTwo.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree) |
| .forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| |
| // SCM expects both containerOne and containerTwo to be in all the three |
| // datanodes datanodeOne, datanodeTwo and datanodeThree |
| |
| // Now datanodeOne is sending container report in which containerOne is |
| // missing. |
| |
| // containerOne becomes under replicated. |
| final ContainerReportsProto containerReport = getContainerReportsProto( |
| containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, |
| datanodeOne.getUuidString()); |
| final ContainerReportFromDatanode containerReportFromDatanode = |
| new ContainerReportFromDatanode(datanodeOne, containerReport); |
| reportHandler.onMessage(containerReportFromDatanode, publisher); |
| Assert.assertEquals(2, containerManager.getContainerReplicas( |
| containerOne.containerID()).size()); |
| |
| } |
| |
| @Test |
| public void testOverReplicatedContainer() throws NodeNotFoundException, |
| SCMException, ContainerNotFoundException { |
| |
| final ContainerReportHandler reportHandler = new ContainerReportHandler( |
| nodeManager, containerManager); |
| |
| final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( |
| NodeState.HEALTHY).iterator(); |
| final DatanodeDetails datanodeOne = nodeIterator.next(); |
| final DatanodeDetails datanodeTwo = nodeIterator.next(); |
| final DatanodeDetails datanodeThree = nodeIterator.next(); |
| final DatanodeDetails datanodeFour = nodeIterator.next(); |
| |
| final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); |
| final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); |
| |
| final Set<ContainerID> containerIDSet = Stream.of( |
| containerOne.containerID(), containerTwo.containerID()) |
| .collect(Collectors.toSet()); |
| |
| nodeManager.setContainers(datanodeOne, containerIDSet); |
| nodeManager.setContainers(datanodeTwo, containerIDSet); |
| nodeManager.setContainers(datanodeThree, containerIDSet); |
| |
| containerStateManager.loadContainer(containerOne); |
| containerStateManager.loadContainer(containerTwo); |
| |
| getReplicas(containerOne.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree) |
| .forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerOne.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| getReplicas(containerTwo.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree) |
| .forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| |
| // SCM expects both containerOne and containerTwo to be in all the three |
| // datanodes datanodeOne, datanodeTwo and datanodeThree |
| |
| // Now datanodeFour is sending container report which has containerOne. |
| |
| // containerOne becomes over replicated. |
| |
| final ContainerReportsProto containerReport = getContainerReportsProto( |
| containerOne.containerID(), ContainerReplicaProto.State.CLOSED, |
| datanodeFour.getUuidString()); |
| final ContainerReportFromDatanode containerReportFromDatanode = |
| new ContainerReportFromDatanode(datanodeFour, containerReport); |
| reportHandler.onMessage(containerReportFromDatanode, publisher); |
| |
| Assert.assertEquals(4, containerManager.getContainerReplicas( |
| containerOne.containerID()).size()); |
| } |
| |
| |
| @Test |
| public void testClosingToClosed() throws NodeNotFoundException, IOException { |
| /* |
| * The container is in CLOSING state and all the replicas are in |
| * OPEN/CLOSING state. |
| * |
| * The datanode reports that one of the replica is now CLOSED. |
| * |
| * In this case SCM should mark the container as CLOSED. |
| */ |
| |
| final ContainerReportHandler reportHandler = new ContainerReportHandler( |
| nodeManager, containerManager); |
| |
| final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( |
| NodeState.HEALTHY).iterator(); |
| final DatanodeDetails datanodeOne = nodeIterator.next(); |
| final DatanodeDetails datanodeTwo = nodeIterator.next(); |
| final DatanodeDetails datanodeThree = nodeIterator.next(); |
| |
| final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); |
| final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); |
| |
| final Set<ContainerID> containerIDSet = Stream.of( |
| containerOne.containerID(), containerTwo.containerID()) |
| .collect(Collectors.toSet()); |
| |
| final Set<ContainerReplica> containerOneReplicas = getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.CLOSING, |
| datanodeOne); |
| |
| containerOneReplicas.addAll(getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.OPEN, |
| datanodeTwo, datanodeThree)); |
| |
| final Set<ContainerReplica> containerTwoReplicas = getReplicas( |
| containerTwo.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree); |
| |
| nodeManager.setContainers(datanodeOne, containerIDSet); |
| nodeManager.setContainers(datanodeTwo, containerIDSet); |
| nodeManager.setContainers(datanodeThree, containerIDSet); |
| |
| containerStateManager.loadContainer(containerOne); |
| containerStateManager.loadContainer(containerTwo); |
| |
| containerOneReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| containerTwoReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| |
| final ContainerReportsProto containerReport = getContainerReportsProto( |
| containerOne.containerID(), ContainerReplicaProto.State.CLOSED, |
| datanodeOne.getUuidString()); |
| final ContainerReportFromDatanode containerReportFromDatanode = |
| new ContainerReportFromDatanode(datanodeOne, containerReport); |
| reportHandler.onMessage(containerReportFromDatanode, publisher); |
| |
| Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); |
| } |
| |
| @Test |
| public void testClosingToQuasiClosed() |
| throws NodeNotFoundException, IOException { |
| /* |
| * The container is in CLOSING state and all the replicas are in |
| * OPEN/CLOSING state. |
| * |
| * The datanode reports that the replica is now QUASI_CLOSED. |
| * |
| * In this case SCM should move the container to QUASI_CLOSED. |
| */ |
| |
| final ContainerReportHandler reportHandler = new ContainerReportHandler( |
| nodeManager, containerManager); |
| |
| final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( |
| NodeState.HEALTHY).iterator(); |
| final DatanodeDetails datanodeOne = nodeIterator.next(); |
| final DatanodeDetails datanodeTwo = nodeIterator.next(); |
| final DatanodeDetails datanodeThree = nodeIterator.next(); |
| |
| final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); |
| final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); |
| |
| final Set<ContainerID> containerIDSet = Stream.of( |
| containerOne.containerID(), containerTwo.containerID()) |
| .collect(Collectors.toSet()); |
| |
| final Set<ContainerReplica> containerOneReplicas = getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.CLOSING, |
| datanodeOne, datanodeTwo); |
| containerOneReplicas.addAll(getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.OPEN, |
| datanodeThree)); |
| final Set<ContainerReplica> containerTwoReplicas = getReplicas( |
| containerTwo.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree); |
| |
| nodeManager.setContainers(datanodeOne, containerIDSet); |
| nodeManager.setContainers(datanodeTwo, containerIDSet); |
| nodeManager.setContainers(datanodeThree, containerIDSet); |
| |
| containerStateManager.loadContainer(containerOne); |
| containerStateManager.loadContainer(containerTwo); |
| |
| containerOneReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| containerTwoReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| |
| final ContainerReportsProto containerReport = getContainerReportsProto( |
| containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, |
| datanodeOne.getUuidString()); |
| final ContainerReportFromDatanode containerReportFromDatanode = |
| new ContainerReportFromDatanode(datanodeOne, containerReport); |
| reportHandler.onMessage(containerReportFromDatanode, publisher); |
| |
| Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState()); |
| } |
| |
| @Test |
| public void testQuasiClosedToClosed() |
| throws NodeNotFoundException, IOException { |
| /* |
| * The container is in QUASI_CLOSED state. |
| * - One of the replica is in QUASI_CLOSED state |
| * - The other two replica are in OPEN/CLOSING state |
| * |
| * The datanode reports the second replica is now CLOSED. |
| * |
| * In this case SCM should CLOSE the container. |
| */ |
| |
| final ContainerReportHandler reportHandler = new ContainerReportHandler( |
| nodeManager, containerManager); |
| final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes( |
| NodeState.HEALTHY).iterator(); |
| |
| final DatanodeDetails datanodeOne = nodeIterator.next(); |
| final DatanodeDetails datanodeTwo = nodeIterator.next(); |
| final DatanodeDetails datanodeThree = nodeIterator.next(); |
| |
| final ContainerInfo containerOne = |
| getContainer(LifeCycleState.QUASI_CLOSED); |
| final ContainerInfo containerTwo = |
| getContainer(LifeCycleState.CLOSED); |
| |
| final Set<ContainerID> containerIDSet = Stream.of( |
| containerOne.containerID(), containerTwo.containerID()) |
| .collect(Collectors.toSet()); |
| final Set<ContainerReplica> containerOneReplicas = getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.QUASI_CLOSED, |
| 10000L, |
| datanodeOne); |
| containerOneReplicas.addAll(getReplicas( |
| containerOne.containerID(), |
| ContainerReplicaProto.State.CLOSING, |
| datanodeTwo, datanodeThree)); |
| final Set<ContainerReplica> containerTwoReplicas = getReplicas( |
| containerTwo.containerID(), |
| ContainerReplicaProto.State.CLOSED, |
| datanodeOne, datanodeTwo, datanodeThree); |
| |
| nodeManager.setContainers(datanodeOne, containerIDSet); |
| nodeManager.setContainers(datanodeTwo, containerIDSet); |
| nodeManager.setContainers(datanodeThree, containerIDSet); |
| |
| containerStateManager.loadContainer(containerOne); |
| containerStateManager.loadContainer(containerTwo); |
| |
| containerOneReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| containerTwoReplicas.forEach(r -> { |
| try { |
| containerStateManager.updateContainerReplica( |
| containerTwo.containerID(), r); |
| } catch (ContainerNotFoundException ignored) { |
| |
| } |
| }); |
| |
| |
| final ContainerReportsProto containerReport = getContainerReportsProto( |
| containerOne.containerID(), ContainerReplicaProto.State.CLOSED, |
| datanodeOne.getUuidString()); |
| |
| final ContainerReportFromDatanode containerReportFromDatanode = |
| new ContainerReportFromDatanode(datanodeOne, containerReport); |
| reportHandler.onMessage(containerReportFromDatanode, publisher); |
| |
| Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); |
| } |
| |
| private static ContainerReportsProto getContainerReportsProto( |
| final ContainerID containerId, final ContainerReplicaProto.State state, |
| final String originNodeId) { |
| final ContainerReportsProto.Builder crBuilder = |
| ContainerReportsProto.newBuilder(); |
| final ContainerReplicaProto replicaProto = |
| ContainerReplicaProto.newBuilder() |
| .setContainerID(containerId.getId()) |
| .setState(state) |
| .setOriginNodeId(originNodeId) |
| .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") |
| .setSize(5368709120L) |
| .setUsed(2000000000L) |
| .setKeyCount(100000000L) |
| .setReadCount(100000000L) |
| .setWriteCount(100000000L) |
| .setReadBytes(2000000000L) |
| .setWriteBytes(2000000000L) |
| .setBlockCommitSequenceId(10000L) |
| .setDeleteTransactionId(0) |
| .build(); |
| return crBuilder.addReports(replicaProto).build(); |
| } |
| |
| } |