| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdds.scm.node; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.NodeReportProto; |
| import org.apache.hadoop.hdds.protocol.proto |
| .StorageContainerDatanodeProtocolProtos.StorageReportProto; |
| import org.apache.hadoop.hdds.scm.HddsTestUtils; |
| import org.apache.hadoop.hdds.scm.TestUtils; |
| import org.apache.hadoop.hdds.scm.container.ContainerID; |
| import org.apache.hadoop.hdds.scm.container.ContainerManager; |
| import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; |
| import org.apache.hadoop.hdds.scm.container.ContainerReplica; |
| import org.apache.hadoop.hdds.scm.container.ContainerInfo; |
| import org.apache.hadoop.hdds.scm.events.SCMEvents; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; |
| import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; |
| import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider; |
| import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher |
| .NodeReportFromDatanode; |
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.security.authentication.client |
| .AuthenticationException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| /** |
| * Test DeadNodeHandler. |
| */ |
| public class TestDeadNodeHandler { |
| |
| private StorageContainerManager scm; |
| private SCMNodeManager nodeManager; |
| private ContainerManager containerManager; |
| private NodeReportHandler nodeReportHandler; |
| private DeadNodeHandler deadNodeHandler; |
| private EventPublisher publisher; |
| private EventQueue eventQueue; |
| private String storageDir; |
| |
| @Before |
| public void setup() throws IOException, AuthenticationException { |
| OzoneConfiguration conf = new OzoneConfiguration(); |
| storageDir = GenericTestUtils.getTempPath( |
| TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID()); |
| conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); |
| eventQueue = new EventQueue(); |
| scm = HddsTestUtils.getScm(conf); |
| nodeManager = (SCMNodeManager) scm.getScmNodeManager(); |
| SCMPipelineManager manager = |
| (SCMPipelineManager)scm.getPipelineManager(); |
| PipelineProvider mockRatisProvider = |
| new MockRatisPipelineProvider(nodeManager, manager.getStateManager(), |
| conf); |
| manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, |
| mockRatisProvider); |
| containerManager = scm.getContainerManager(); |
| deadNodeHandler = new DeadNodeHandler(nodeManager, |
| Mockito.mock(PipelineManager.class), containerManager); |
| eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); |
| publisher = Mockito.mock(EventPublisher.class); |
| nodeReportHandler = new NodeReportHandler(nodeManager); |
| } |
| |
| @After |
| public void teardown() { |
| scm.stop(); |
| scm.join(); |
| FileUtil.fullyDelete(new File(storageDir)); |
| } |
| |
| @Test |
| public void testOnMessage() throws IOException, NodeNotFoundException { |
| //GIVEN |
| DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails(); |
| DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails(); |
| DatanodeDetails datanode3 = TestUtils.randomDatanodeDetails(); |
| |
| String storagePath = GenericTestUtils.getRandomizedTempPath() |
| .concat("/" + datanode1.getUuidString()); |
| |
| StorageReportProto storageOne = TestUtils.createStorageReport( |
| datanode1.getUuid(), storagePath, 100, 10, 90, null); |
| |
| // Standalone pipeline now excludes the nodes which are already used, |
| // is the a proper behavior. Adding 9 datanodes for now to make the |
| // test case happy. |
| |
| nodeManager.register(datanode1, |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(datanode2, |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(datanode3, |
| TestUtils.createNodeReport(storageOne), null); |
| |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| nodeManager.register(TestUtils.randomDatanodeDetails(), |
| TestUtils.createNodeReport(storageOne), null); |
| |
| ContainerInfo container1 = |
| TestUtils.allocateContainer(containerManager); |
| ContainerInfo container2 = |
| TestUtils.allocateContainer(containerManager); |
| ContainerInfo container3 = |
| TestUtils.allocateContainer(containerManager); |
| ContainerInfo container4 = |
| TestUtils.allocateContainer(containerManager); |
| |
| registerContainers(datanode1, container1, container2, container4); |
| registerContainers(datanode2, container1, container2); |
| registerContainers(datanode3, container3); |
| |
| registerReplicas(containerManager, container1, datanode1, datanode2); |
| registerReplicas(containerManager, container2, datanode1, datanode2); |
| registerReplicas(containerManager, container3, datanode3); |
| registerReplicas(containerManager, container4, datanode1); |
| |
| TestUtils.closeContainer(containerManager, container1.containerID()); |
| TestUtils.closeContainer(containerManager, container2.containerID()); |
| TestUtils.quasiCloseContainer(containerManager, container3.containerID()); |
| |
| deadNodeHandler.onMessage(datanode1, publisher); |
| |
| Set<ContainerReplica> container1Replicas = containerManager |
| .getContainerReplicas(new ContainerID(container1.getContainerID())); |
| Assert.assertEquals(1, container1Replicas.size()); |
| Assert.assertEquals(datanode2, |
| container1Replicas.iterator().next().getDatanodeDetails()); |
| |
| Set<ContainerReplica> container2Replicas = containerManager |
| .getContainerReplicas(new ContainerID(container2.getContainerID())); |
| Assert.assertEquals(1, container2Replicas.size()); |
| Assert.assertEquals(datanode2, |
| container2Replicas.iterator().next().getDatanodeDetails()); |
| |
| Set<ContainerReplica> container3Replicas = containerManager |
| .getContainerReplicas(new ContainerID(container3.getContainerID())); |
| Assert.assertEquals(1, container3Replicas.size()); |
| Assert.assertEquals(datanode3, |
| container3Replicas.iterator().next().getDatanodeDetails()); |
| } |
| |
| private void registerReplicas(ContainerManager contManager, |
| ContainerInfo container, DatanodeDetails... datanodes) |
| throws ContainerNotFoundException { |
| for (DatanodeDetails datanode : datanodes) { |
| contManager.updateContainerReplica( |
| new ContainerID(container.getContainerID()), |
| ContainerReplica.newBuilder() |
| .setContainerState(ContainerReplicaProto.State.OPEN) |
| .setContainerID(container.containerID()) |
| .setDatanodeDetails(datanode).build()); |
| } |
| } |
| |
| /** |
| * Update containers available on the datanode. |
| * @param datanode |
| * @param containers |
| * @throws NodeNotFoundException |
| */ |
| private void registerContainers(DatanodeDetails datanode, |
| ContainerInfo... containers) |
| throws NodeNotFoundException { |
| nodeManager |
| .setContainers(datanode, |
| Arrays.stream(containers) |
| .map(container -> new ContainerID(container.getContainerID())) |
| .collect(Collectors.toSet())); |
| } |
| |
| private NodeReportFromDatanode getNodeReport(DatanodeDetails dn, |
| StorageReportProto... reports) { |
| NodeReportProto nodeReportProto = TestUtils.createNodeReport(reports); |
| return new NodeReportFromDatanode(dn, nodeReportProto); |
| } |
| } |