| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdds.scm.node; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.hdds.DFSConfigKeysLegacy; |
| import org.apache.hadoop.hdds.HddsConfigKeys; |
| import org.apache.hadoop.hdds.client.RatisReplicationConfig; |
| import org.apache.hadoop.hdds.client.ReplicationConfig; |
| import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; |
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; |
| import org.apache.hadoop.hdds.scm.ha.SCMContext; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl; |
| import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; |
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
| import org.apache.hadoop.hdds.scm.HddsTestUtils; |
| import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; |
| import org.apache.hadoop.hdds.scm.net.NetworkTopology; |
| import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; |
| import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
| import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; |
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; |
| import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint; |
| import org.apache.hadoop.hdds.server.events.EventPublisher; |
| import org.apache.hadoop.hdds.server.events.EventQueue; |
| import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager; |
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode; |
| import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils; |
| import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; |
| import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; |
| import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; |
| import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; |
| import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; |
| import org.apache.hadoop.ozone.protocol.commands.SCMCommand; |
| import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand; |
| import org.apache.hadoop.security.authentication.client.AuthenticationException; |
| import org.apache.hadoop.util.Time; |
| import org.apache.ozone.test.GenericTestUtils; |
| import org.apache.hadoop.test.PathUtils; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| |
| import java.util.Map; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static java.util.Collections.emptyList; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY; |
| import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; |
| import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.createDatanodeDetails; |
| import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; |
| import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; |
| import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand; |
| import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted; |
| import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success; |
| import static org.apache.hadoop.hdds.scm.HddsTestUtils.getRandomPipelineReports; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT; |
| import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; |
| import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; |
| import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND_COUNT_UPDATED; |
| import static org.apache.hadoop.hdds.scm.events.SCMEvents.NEW_NODE; |
| import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.eq; |
| |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.junit.jupiter.params.provider.ValueSource; |
| import org.mockito.ArgumentCaptor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Test the SCM Node Manager class. |
| */ |
| public class TestSCMNodeManager { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestSCMNodeManager.class); |
| |
| private File testDir; |
| private StorageContainerManager scm; |
| private SCMContext scmContext; |
| |
| private static final int MAX_LV = HDDSLayoutVersionManager.maxLayoutVersion(); |
| private static final LayoutVersionProto LARGER_SLV_LAYOUT_PROTO = |
| toLayoutVersionProto(MAX_LV, MAX_LV + 1); |
| private static final LayoutVersionProto SMALLER_MLV_LAYOUT_PROTO = |
| toLayoutVersionProto(MAX_LV - 1, MAX_LV); |
| // In a real cluster, startup is disallowed if MLV is larger than SLV, so |
| // increase both numbers to test smaller SLV or larger MLV. |
| private static final LayoutVersionProto SMALLER_MLV_SLV_LAYOUT_PROTO = |
| toLayoutVersionProto(MAX_LV - 1, MAX_LV - 1); |
| private static final LayoutVersionProto LARGER_MLV_SLV_LAYOUT_PROTO = |
| toLayoutVersionProto(MAX_LV + 1, MAX_LV + 1); |
| private static final LayoutVersionProto CORRECT_LAYOUT_PROTO = |
| toLayoutVersionProto(MAX_LV, MAX_LV); |
| |
| @BeforeEach |
| public void setup() { |
| testDir = PathUtils.getTestDir( |
| TestSCMNodeManager.class); |
| } |
| |
| @AfterEach |
| public void cleanup() { |
| if (scm != null) { |
| scm.stop(); |
| scm.join(); |
| } |
| FileUtil.fullyDelete(testDir); |
| } |
| |
| /** |
| * Returns a new copy of Configuration. |
| * |
| * @return Config |
| */ |
| OzoneConfiguration getConf() { |
| OzoneConfiguration conf = new OzoneConfiguration(); |
| conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, |
| testDir.getAbsolutePath()); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, |
| TimeUnit.MILLISECONDS); |
| conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); |
| conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10); |
| conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, false); |
| return conf; |
| } |
| |
| /** |
| * Creates a NodeManager. |
| * |
| * @param config - Config for the node manager. |
| * @return SCNNodeManager |
| * @throws IOException |
| */ |
| |
| SCMNodeManager createNodeManager(OzoneConfiguration config) |
| throws IOException, AuthenticationException { |
| scm = HddsTestUtils.getScm(config); |
| scmContext = new SCMContext.Builder().setIsInSafeMode(true) |
| .setLeader(true).setIsPreCheckComplete(true) |
| .setSCM(scm).build(); |
| PipelineManagerImpl pipelineManager = |
| (PipelineManagerImpl) scm.getPipelineManager(); |
| pipelineManager.setScmContext(scmContext); |
| return (SCMNodeManager) scm.getScmNodeManager(); |
| } |
| |
| /** |
| * Tests that Node manager handles heartbeats correctly, and comes out of |
| * safe Mode. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmHeartbeat() |
| throws IOException, InterruptedException, AuthenticationException { |
| |
| try (SCMNodeManager nodeManager = createNodeManager(getConf())) { |
| int registeredNodes = 5; |
| // Send some heartbeats from different nodes. |
| for (int x = 0; x < registeredNodes; x++) { |
| DatanodeDetails datanodeDetails = HddsTestUtils |
| .createRandomDatanodeAndRegister(nodeManager); |
| nodeManager.processHeartbeat(datanodeDetails); |
| } |
| |
| //TODO: wait for heartbeat to be processed |
| Thread.sleep(4 * 1000); |
| assertEquals(nodeManager.getAllNodes().size(), registeredNodes, |
| "Heartbeat thread should have picked up the scheduled heartbeats."); |
| } |
| } |
| |
| @Test |
| public void testGetLastHeartbeatTimeDiff() throws Exception { |
| try (SCMNodeManager nodeManager = createNodeManager(getConf())) { |
| String timeNow = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow()); |
| assertEquals("Just now", timeNow); |
| |
| String time1s = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow() - 10000); |
| assertEquals("10s ago", time1s); |
| |
| String time1m = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow() - 60000); |
| assertEquals("1m ago", time1m); |
| |
| String time1m10s = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow() - 70000); |
| assertEquals("1m 10s ago", time1m10s); |
| |
| // 1h 1m 10s |
| // 10000ms = 10s |
| // 60000ms = 1m |
| // 60000ms * 60 = 3600000ms = 1h |
| String time1h1m10s = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow() - 3670000); |
| assertEquals("1h 1m 10s ago", time1h1m10s); |
| |
| // 1d 1h 1m 10s |
| // 10000ms = 10s |
| // 60000ms = 1m |
| // 60000ms * 60 = 3600000ms = 1h |
| // 3600000ms * 24 = 86400000ms = 1d |
| String time1d1h1m10s = nodeManager.getLastHeartbeatTimeDiff(Time.monotonicNow() - 90070000); |
| assertEquals("1d 1h 1m 10s ago", time1d1h1m10s); |
| } |
| } |
| |
| /** |
| * Tests that node manager handles layout version changes from heartbeats |
| * correctly. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmLayoutOnHeartbeat() throws Exception { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, |
| 1, TimeUnit.DAYS); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| assertTrue(scm.checkLeader()); |
| // Register 2 nodes correctly. |
| // These will be used with a faulty node to test pipeline creation. |
| DatanodeDetails goodNode1 = registerWithCapacity(nodeManager); |
| DatanodeDetails goodNode2 = registerWithCapacity(nodeManager); |
| |
| scm.exitSafeMode(); |
| |
| assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, |
| nodeManager, SMALLER_MLV_LAYOUT_PROTO); |
| assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, |
| nodeManager, LARGER_MLV_SLV_LAYOUT_PROTO); |
| assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, |
| nodeManager, SMALLER_MLV_SLV_LAYOUT_PROTO); |
| assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, |
| nodeManager, LARGER_SLV_LAYOUT_PROTO); |
| } |
| } |
| |
| /** |
| * Create {@link DatanodeDetails} to register with {@code nodeManager}, and |
| * provide the datanode maximum capacity so that space used does not block |
| * pipeline creation. |
| * @return The created {@link DatanodeDetails}. |
| */ |
| private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager) { |
| return registerWithCapacity(nodeManager, |
| UpgradeUtils.defaultLayoutVersionProto(), success); |
| } |
| |
| /** |
| * Create {@link DatanodeDetails} to register with {@code nodeManager}, and |
| * provide the datanode maximum capacity so that space used does not block |
| * pipeline creation. Also check that the result of registering matched |
| * {@code expectedResult}. |
| * @return The created {@link DatanodeDetails}. |
| */ |
| private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager, |
| LayoutVersionProto layout, ErrorCode expectedResult) { |
| DatanodeDetails details = MockDatanodeDetails.randomDatanodeDetails(); |
| |
| StorageReportProto storageReport = |
| HddsTestUtils.createStorageReport(details.getUuid(), |
| details.getNetworkFullPath(), Long.MAX_VALUE); |
| MetadataStorageReportProto metadataStorageReport = |
| HddsTestUtils.createMetadataStorageReport(details.getNetworkFullPath(), |
| Long.MAX_VALUE); |
| |
| RegisteredCommand cmd = nodeManager.register( |
| MockDatanodeDetails.randomDatanodeDetails(), |
| HddsTestUtils.createNodeReport(Arrays.asList(storageReport), |
| Arrays.asList(metadataStorageReport)), |
| getRandomPipelineReports(), layout); |
| |
| assertEquals(expectedResult, cmd.getError()); |
| return cmd.getDatanode(); |
| } |
| |
| private void assertPipelineClosedAfterLayoutHeartbeat( |
| DatanodeDetails originalNode1, DatanodeDetails originalNode2, |
| SCMNodeManager nodeManager, LayoutVersionProto layout) throws Exception { |
| |
| List<DatanodeDetails> originalNodes = |
| Arrays.asList(originalNode1, originalNode2); |
| |
| // Initial condition: 2 healthy nodes registered. |
| assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2, |
| originalNodes); |
| assertPipelines(HddsProtos.ReplicationFactor.THREE, |
| count -> count == 0, new ArrayList<>()); |
| |
| // Even when safemode exit or new node addition trigger pipeline |
| // creation, they will fail with not enough healthy nodes for ratis 3 |
| // pipeline. Therefore we do not have to worry about this create call |
| // failing due to datanodes reaching their maximum pipeline limit. |
| assertPipelineCreationFailsWithNotEnoughNodes(2); |
| |
| // Register a new node correctly. |
| DatanodeDetails node = registerWithCapacity(nodeManager); |
| |
| List<DatanodeDetails> allNodes = new ArrayList<>(originalNodes); |
| allNodes.add(node); |
| |
| // Safemode exit and adding the new node should trigger pipeline creation. |
| assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 3, |
| allNodes); |
| assertPipelines(HddsProtos.ReplicationFactor.THREE, count -> count >= 1, |
| allNodes); |
| |
| // node sends incorrect layout. |
| nodeManager.processLayoutVersionReport(node, layout); |
| |
| // Its pipelines should be closed then removed, meaning there is not |
| // enough nodes for factor 3 pipelines. |
| assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2, |
| originalNodes); |
| assertPipelines(HddsProtos.ReplicationFactor.THREE, |
| count -> count == 0, new ArrayList<>()); |
| |
| assertPipelineCreationFailsWithNotEnoughNodes(2); |
| } |
| |
| /** |
| * Tests that node manager handles layout versions for newly registered nodes |
| * correctly. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmLayoutOnRegister() |
| throws Exception { |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, |
| 1, TimeUnit.DAYS); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| assertTrue(scm.checkLeader()); |
| // Nodes with mismatched SLV cannot join the cluster. |
| registerWithCapacity(nodeManager, |
| LARGER_SLV_LAYOUT_PROTO, errorNodeNotPermitted); |
| registerWithCapacity(nodeManager, |
| SMALLER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted); |
| registerWithCapacity(nodeManager, |
| LARGER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted); |
| // Nodes with mismatched MLV can join, but should not be allowed in |
| // pipelines. |
| DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager, |
| SMALLER_MLV_LAYOUT_PROTO, success); |
| DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager, |
| SMALLER_MLV_LAYOUT_PROTO, success); |
| // This node has correct MLV and SLV, so it can join and be used in |
| // pipelines. |
| DatanodeDetails goodNode = registerWithCapacity(nodeManager, |
| CORRECT_LAYOUT_PROTO, success); |
| |
| assertEquals(3, nodeManager.getAllNodes().size()); |
| |
| scm.exitSafeMode(); |
| |
| // SCM should auto create a factor 1 pipeline for the one healthy node. |
| // Still should not have enough healthy nodes for ratis 3 pipeline. |
| assertPipelines(HddsProtos.ReplicationFactor.ONE, |
| count -> count == 1, |
| Collections.singletonList(goodNode)); |
| assertPipelines(HddsProtos.ReplicationFactor.THREE, |
| count -> count == 0, |
| new ArrayList<>()); |
| |
| // Even when safemode exit or new node addition trigger pipeline |
| // creation, they will fail with not enough healthy nodes for ratis 3 |
| // pipeline. Therefore we do not have to worry about this create call |
| // failing due to datanodes reaching their maximum pipeline limit. |
| assertPipelineCreationFailsWithNotEnoughNodes(1); |
| |
| // Heartbeat bad MLV nodes back to healthy. |
| nodeManager.processLayoutVersionReport(badMlvNode1, CORRECT_LAYOUT_PROTO); |
| nodeManager.processLayoutVersionReport(badMlvNode2, CORRECT_LAYOUT_PROTO); |
| nodeManager.processHeartbeat(badMlvNode1); |
| nodeManager.processHeartbeat(badMlvNode2); |
| |
| // After moving out of healthy readonly, pipeline creation should be |
| // triggered. |
| assertPipelines(HddsProtos.ReplicationFactor.ONE, |
| count -> count == 3, |
| Arrays.asList(badMlvNode1, badMlvNode2, goodNode)); |
| assertPipelines(HddsProtos.ReplicationFactor.THREE, |
| count -> count >= 1, |
| Arrays.asList(badMlvNode1, badMlvNode2, goodNode)); |
| } |
| } |
| |
| private void assertPipelineCreationFailsWithNotEnoughNodes( |
| int actualNodeCount) throws Exception { |
| SCMException ex = assertThrows(SCMException.class, () -> { |
| ReplicationConfig ratisThree = |
| ReplicationConfig.fromProtoTypeAndFactor( |
| HddsProtos.ReplicationType.RATIS, |
| HddsProtos.ReplicationFactor.THREE); |
| scm.getPipelineManager().createPipeline(ratisThree); |
| }, "3 nodes should not have been found for a pipeline."); |
| assertThat(ex.getMessage()).contains("Required 3. Found " + |
| actualNodeCount); |
| } |
| |
| private void assertPipelines(HddsProtos.ReplicationFactor factor, |
| Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs) |
| throws Exception { |
| |
| Set<String> allowedDnIds = allowedDNs.stream() |
| .map(DatanodeDetails::getUuidString) |
| .collect(Collectors.toSet()); |
| |
| RatisReplicationConfig replConfig = RatisReplicationConfig |
| .getInstance(factor); |
| |
| // Wait for the expected number of pipelines using allowed DNs. |
| GenericTestUtils.waitFor(() -> { |
| // Closed pipelines are no longer in operation so we should not count |
| // them. We cannot check for open pipelines only because this is a mock |
| // test so the pipelines may remain in ALLOCATED state. |
| List<Pipeline> pipelines = scm.getPipelineManager() |
| .getPipelines(replConfig) |
| .stream() |
| .filter(p -> p.getPipelineState() != Pipeline.PipelineState.CLOSED) |
| .collect(Collectors.toList()); |
| LOG.info("Found {} non-closed pipelines of type {} and factor {}.", |
| pipelines.size(), |
| replConfig.getReplicationType(), replConfig.getReplicationFactor()); |
| boolean success = countCheck.test(pipelines.size()); |
| |
| // If we have the correct number of pipelines, make sure that none of |
| // these pipelines use nodes outside of allowedDNs. |
| if (success) { |
| for (Pipeline pipeline: pipelines) { |
| for (DatanodeDetails pipelineDN: pipeline.getNodes()) { |
| // Do not wait for this condition to be true. Disallowed DNs should |
| // never be used once we have the expected number of pipelines. |
| if (!allowedDnIds.contains(pipelineDN.getUuidString())) { |
| String message = String.format("Pipeline %s used datanode %s " + |
| "which is not in the set of allowed datanodes: %s", |
| pipeline.getId().toString(), pipelineDN.getUuidString(), |
| allowedDnIds); |
| fail(message); |
| } |
| } |
| } |
| } |
| |
| return success; |
| }, 1000, 10000); |
| } |
| |
| /** |
| * asserts that if we send no heartbeats node manager stays in safemode. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmNoHeartbeats() |
| throws IOException, InterruptedException, AuthenticationException { |
| |
| try (SCMNodeManager nodeManager = createNodeManager(getConf())) { |
| //TODO: wait for heartbeat to be processed |
| Thread.sleep(4 * 1000); |
| assertEquals(0, nodeManager.getAllNodes().size(), |
| "No heartbeats, 0 nodes should be registered"); |
| } |
| } |
| |
| /** |
| * Asserts that adding heartbeats after shutdown does not work. This implies |
| * that heartbeat thread has been shutdown safely by closing the node |
| * manager. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmShutdown() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, |
| 100, TimeUnit.MILLISECONDS); |
| SCMNodeManager nodeManager = createNodeManager(conf); |
| DatanodeDetails datanodeDetails = HddsTestUtils |
| .createRandomDatanodeAndRegister(nodeManager); |
| // These should never be processed. |
| nodeManager.processHeartbeat(datanodeDetails); |
| |
| // Let us just wait for 2 seconds to prove that HBs are not processed. |
| Thread.sleep(2 * 1000); |
| |
| //TODO: add assertion |
| } |
| |
| /** |
| * Asserts that we detect as many healthy nodes as we have generated heartbeat |
| * for. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmHealthyNodeCount() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| final int count = 10; |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| for (int x = 0; x < count; x++) { |
| DatanodeDetails datanodeDetails = HddsTestUtils |
| .createRandomDatanodeAndRegister(nodeManager); |
| nodeManager.processHeartbeat(datanodeDetails); |
| } |
| //TODO: wait for heartbeat to be processed |
| Thread.sleep(4 * 1000); |
| assertEquals(count, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| |
| Map<String, Map<String, Integer>> nodeCounts = nodeManager.getNodeCount(); |
| assertEquals(count, |
| nodeCounts.get(HddsProtos.NodeOperationalState.IN_SERVICE.name()) |
| .get(HddsProtos.NodeState.HEALTHY.name()).intValue()); |
| } |
| } |
| |
| /** |
| * Asserts that if Stale Interval value is more than 5 times the value of HB |
| * processing thread it is a sane value. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmSanityOfUserConfig2() |
| throws IOException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| final int interval = 100; |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, |
| TimeUnit.MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS); |
| |
| // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL |
| // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS); |
| createNodeManager(conf).close(); |
| } |
| |
| /** |
| * For leader SCM, ensure that a change to the operationalState of a node |
| * fires a SCMCommand of type SetNodeOperationalStateCommand. |
| * |
| * For follower SCM, no SetNodeOperationalStateCommand should be fired, yet |
| * operationalState of the node will be updated according to the heartbeat. |
| */ |
| @Test |
| public void testSetNodeOpStateAndCommandFired() |
| throws IOException, NodeNotFoundException, AuthenticationException { |
| final int interval = 100; |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, |
| MILLISECONDS); |
| // If factor 1 pipelines are auto created, registering the new node will |
| // trigger a pipeline creation command which may interfere with command |
| // checking in this test. |
| conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| DatanodeDetails dn = HddsTestUtils.createRandomDatanodeAndRegister( |
| nodeManager); |
| |
| long expiry = System.currentTimeMillis() / 1000 + 1000; |
| nodeManager.setNodeOperationalState(dn, |
| HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry); |
| |
| // If found mismatch, leader SCM fires a SetNodeOperationalStateCommand |
| // to update the opState persisted in Datanode. |
| scm.getScmContext().updateLeaderAndTerm(true, 1); |
| List<SCMCommand> commands = nodeManager.processHeartbeat(dn); |
| |
| assertEquals(SetNodeOperationalStateCommand.class, |
| commands.get(0).getClass()); |
| assertEquals(1, commands.size()); |
| |
| // If found mismatch, follower SCM update its own opState according |
| // to the heartbeat, and no SCMCommand will be fired. |
| scm.getScmContext().updateLeaderAndTerm(false, 2); |
| commands = nodeManager.processHeartbeat(dn); |
| |
| assertEquals(0, commands.size()); |
| |
| NodeStatus scmStatus = nodeManager.getNodeStatus(dn); |
| assertTrue(scmStatus.getOperationalState() == dn.getPersistedOpState() |
| && scmStatus.getOpStateExpiryEpochSeconds() |
| == dn.getPersistedOpStateExpiryEpochSec()); |
| } |
| } |
| |
| /** |
| * Asserts that a single node moves from Healthy to stale node, then from |
| * stale node to dead node if it misses enough heartbeats. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmDetectStaleAndDeadNode() |
| throws IOException, InterruptedException, AuthenticationException { |
| final int interval = 100; |
| final int nodeCount = 10; |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, |
| MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); |
| |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount); |
| |
| |
| DatanodeDetails staleNode = HddsTestUtils.createRandomDatanodeAndRegister( |
| nodeManager); |
| |
| // Heartbeat once |
| nodeManager.processHeartbeat(staleNode); |
| |
| // Heartbeat all other nodes. |
| for (DatanodeDetails dn : nodeList) { |
| nodeManager.processHeartbeat(dn); |
| } |
| |
| // Wait for 2 seconds .. and heartbeat good nodes again. |
| Thread.sleep(2 * 1000); |
| |
| for (DatanodeDetails dn : nodeList) { |
| nodeManager.processHeartbeat(dn); |
| } |
| |
| // Wait for 2 seconds, wait a total of 4 seconds to make sure that the |
| // node moves into stale state. |
| Thread.sleep(2 * 1000); |
| List<DatanodeDetails> staleNodeList = |
| nodeManager.getNodes(NodeStatus.inServiceStale()); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale()), |
| "Expected to find 1 stale node"); |
| assertEquals(1, staleNodeList.size(), |
| "Expected to find 1 stale node"); |
| assertEquals(staleNode.getUuid(), staleNodeList.get(0).getUuid(), |
| "Stale node is not the expected ID"); |
| |
| Map<String, Map<String, Integer>> nodeCounts = nodeManager.getNodeCount(); |
| assertEquals(1, |
| nodeCounts.get(HddsProtos.NodeOperationalState.IN_SERVICE.name()) |
| .get(HddsProtos.NodeState.STALE.name()).intValue()); |
| |
| Thread.sleep(1000); |
| // heartbeat good nodes again. |
| for (DatanodeDetails dn : nodeList) { |
| nodeManager.processHeartbeat(dn); |
| } |
| |
| // 6 seconds is the dead window for this test , so we wait a total of |
| // 7 seconds to make sure that the node moves into dead state. |
| Thread.sleep(2 * 1000); |
| |
| // the stale node has been removed |
| staleNodeList = nodeManager.getNodes(NodeStatus.inServiceStale()); |
| nodeCounts = nodeManager.getNodeCount(); |
| assertEquals(0, nodeManager.getNodeCount(NodeStatus.inServiceStale()), |
| "Expected to find 1 stale node"); |
| assertEquals(0, staleNodeList.size(), |
| "Expected to find 1 stale node"); |
| assertEquals(0, |
| nodeCounts.get(HddsProtos.NodeOperationalState.IN_SERVICE.name()) |
| .get(HddsProtos.NodeState.STALE.name()).intValue()); |
| |
| // Check for the dead node now. |
| List<DatanodeDetails> deadNodeList = |
| nodeManager.getNodes(NodeStatus.inServiceDead()); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceDead()), |
| "Expected to find 1 dead node"); |
| assertEquals(1, deadNodeList.size(), "Expected to find 1 dead node"); |
| assertEquals(1, |
| nodeCounts.get(HddsProtos.NodeOperationalState.IN_SERVICE.name()) |
| .get(HddsProtos.NodeState.DEAD.name()).intValue()); |
| assertEquals(staleNode.getUuid(), deadNodeList.get(0).getUuid(), |
| "Dead node is not the expected ID"); |
| } |
| } |
| |
| /** |
| * Simulate a JVM Pause by pausing the health check process |
| * Ensure that none of the nodes with heartbeats become Dead or Stale. |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws AuthenticationException |
| */ |
| @Test |
| void testScmHandleJvmPause() throws Exception { |
| final int healthCheckInterval = 200; // milliseconds |
| final int heartbeatInterval = 1; // seconds |
| final int staleNodeInterval = 3; // seconds |
| final int deadNodeInterval = 6; // seconds |
| ScheduledFuture schedFuture; |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, |
| healthCheckInterval, MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, |
| heartbeatInterval, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, |
| staleNodeInterval, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, |
| deadNodeInterval, SECONDS); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| DatanodeDetails node1 = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| DatanodeDetails node2 = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| |
| nodeManager.processHeartbeat(node1); |
| nodeManager.processHeartbeat(node2); |
| |
| // Sleep so that heartbeat processing thread gets to run. |
| Thread.sleep(1000); |
| |
| //Assert all nodes are healthy. |
| assertEquals(2, nodeManager.getAllNodes().size()); |
| assertEquals(2, |
| nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| /** |
| * Simulate a JVM Pause and subsequent handling in following steps: |
| * Step 1 : stop heartbeat check process for stale node interval |
| * Step 2 : resume heartbeat check |
| * Step 3 : wait for 1 iteration of heartbeat check thread |
| * Step 4 : retrieve the state of all nodes - assert all are HEALTHY |
| * Step 5 : heartbeat for node1 |
| * [TODO : what if there is scheduling delay of test thread in Step 5?] |
| * Step 6 : wait for some time to allow iterations of check process |
| * Step 7 : retrieve the state of all nodes - assert node2 is STALE |
| * and node1 is HEALTHY |
| */ |
| |
| // Step 1 : stop health check process (simulate JVM pause) |
| nodeManager.pauseHealthCheck(); |
| Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS)); |
| |
| // Step 2 : resume health check |
| assertEquals(0, nodeManager.getSkippedHealthChecks(), |
| "Unexpected, already skipped heartbeat checks"); |
| schedFuture = nodeManager.unpauseHealthCheck(); |
| |
| // Step 3 : wait for 1 iteration of health check |
| |
| schedFuture.get(); |
| assertThat(nodeManager.getSkippedHealthChecks()) |
| .withFailMessage("We did not skip any heartbeat checks") |
| .isGreaterThan(0); |
| |
| // Step 4 : all nodes should still be HEALTHY |
| assertEquals(2, nodeManager.getAllNodes().size()); |
| assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| |
| // Step 5 : heartbeat for node1 |
| nodeManager.processHeartbeat(node1); |
| |
| // Step 6 : wait for health check process to run |
| Thread.sleep(1000); |
| |
| // Step 7 : node2 should transition to STALE |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale())); |
| } |
| } |
| |
| @Test |
| public void testProcessLayoutVersion() throws IOException { |
| // TODO: Refactor this class to use org.junit.jupiter so test |
| // parameterization can be used. |
| for (FinalizationCheckpoint checkpoint: FinalizationCheckpoint.values()) { |
| LOG.info("Testing with SCM finalization checkpoint {}", checkpoint); |
| testProcessLayoutVersionLowerMlv(checkpoint); |
| testProcessLayoutVersionReportHigherMlv(checkpoint); |
| } |
| } |
| |
| // Currently invoked by testProcessLayoutVersion. |
| public void testProcessLayoutVersionReportHigherMlv( |
| FinalizationCheckpoint currentCheckpoint) |
| throws IOException { |
| final int healthCheckInterval = 200; // milliseconds |
| final int heartbeatInterval = 1; // seconds |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, |
| healthCheckInterval, MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, |
| heartbeatInterval, SECONDS); |
| |
| SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); |
| when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); |
| EventPublisher eventPublisher = mock(EventPublisher.class); |
| HDDSLayoutVersionManager lvm = |
| new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); |
| SCMContext nodeManagerContext = SCMContext.emptyContext(); |
| nodeManagerContext.setFinalizationCheckpoint(currentCheckpoint); |
| SCMNodeManager nodeManager = new SCMNodeManager(conf, |
| scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), |
| nodeManagerContext, lvm); |
| |
| // Regardless of SCM's finalization checkpoint, datanodes with higher MLV |
| // than SCM should not be found in the cluster. |
| DatanodeDetails node1 = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer |
| .captureLogs(SCMNodeManager.LOG); |
| int scmMlv = |
| nodeManager.getLayoutVersionManager().getMetadataLayoutVersion(); |
| int scmSlv = |
| nodeManager.getLayoutVersionManager().getSoftwareLayoutVersion(); |
| nodeManager.processLayoutVersionReport(node1, |
| LayoutVersionProto.newBuilder() |
| .setMetadataLayoutVersion(scmMlv + 1) |
| .setSoftwareLayoutVersion(scmSlv + 1) |
| .build()); |
| assertThat(logCapturer.getOutput()) |
| .contains("Invalid data node in the cluster"); |
| nodeManager.close(); |
| } |
| |
| // Currently invoked by testProcessLayoutVersion. |
| public void testProcessLayoutVersionLowerMlv(FinalizationCheckpoint |
| currentCheckpoint) throws IOException { |
| OzoneConfiguration conf = new OzoneConfiguration(); |
| SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); |
| when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); |
| EventPublisher eventPublisher = mock(EventPublisher.class); |
| HDDSLayoutVersionManager lvm = |
| new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); |
| SCMContext nodeManagerContext = SCMContext.emptyContext(); |
| nodeManagerContext.setFinalizationCheckpoint(currentCheckpoint); |
| SCMNodeManager nodeManager = new SCMNodeManager(conf, |
| scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), |
| nodeManagerContext, lvm); |
| DatanodeDetails node1 = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| verify(eventPublisher, |
| times(1)).fireEvent(NEW_NODE, node1); |
| int scmMlv = |
| nodeManager.getLayoutVersionManager().getMetadataLayoutVersion(); |
| nodeManager.processLayoutVersionReport(node1, |
| LayoutVersionProto.newBuilder() |
| .setMetadataLayoutVersion(scmMlv - 1) |
| .setSoftwareLayoutVersion(scmMlv) |
| .build()); |
| ArgumentCaptor<CommandForDatanode> captor = |
| ArgumentCaptor.forClass(CommandForDatanode.class); |
| |
| if (currentCheckpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV)) { |
| // If the mlv equals slv checkpoint passed, datanodes with older mlvs |
| // should be instructed to finalize. |
| verify(eventPublisher, times(1)) |
| .fireEvent(eq(DATANODE_COMMAND), captor.capture()); |
| assertEquals(captor.getValue().getDatanodeId(), node1.getUuid()); |
| assertEquals(captor.getValue().getCommand().getType(), |
| finalizeNewLayoutVersionCommand); |
| } else { |
| // SCM has not finished finalizing its mlv, so datanodes with older |
| // mlvs should not be instructed to finalize yet. |
| verify(eventPublisher, times(0)) |
| .fireEvent(eq(DATANODE_COMMAND), captor.capture()); |
| } |
| } |
| |
| @Test |
| public void testProcessCommandQueueReport() |
| throws IOException, NodeNotFoundException, AuthenticationException { |
| OzoneConfiguration conf = new OzoneConfiguration(); |
| SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); |
| when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); |
| EventPublisher eventPublisher = mock(EventPublisher.class); |
| HDDSLayoutVersionManager lvm = |
| new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); |
| createNodeManager(getConf()); |
| SCMNodeManager nodeManager = new SCMNodeManager(conf, |
| scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), |
| scmContext, lvm); |
| |
| DatanodeDetails node1 = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| verify(eventPublisher, |
| times(1)).fireEvent(NEW_NODE, node1); |
| for (int i = 0; i < 3; i++) { |
| nodeManager.addDatanodeCommand(node1.getUuid(), ReplicateContainerCommand |
| .toTarget(1, MockDatanodeDetails.randomDatanodeDetails())); |
| } |
| for (int i = 0; i < 5; i++) { |
| nodeManager.addDatanodeCommand(node1.getUuid(), |
| new DeleteBlocksCommand(emptyList())); |
| } |
| |
| assertEquals(3, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(5, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.deleteBlocksCommand)); |
| |
| nodeManager.processHeartbeat(node1, |
| CommandQueueReportProto.newBuilder() |
| .addCommand(SCMCommandProto.Type.replicateContainerCommand) |
| .addCount(123) |
| .addCommand(SCMCommandProto.Type.closeContainerCommand) |
| .addCount(11) |
| .build()); |
| assertEquals(-1, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.closePipelineCommand)); |
| assertEquals(126, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(11, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.closeContainerCommand)); |
| assertEquals(5, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.deleteBlocksCommand)); |
| |
| assertEquals(126, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(5, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.deleteBlocksCommand)); |
| |
| ArgumentCaptor<DatanodeDetails> captor = |
| ArgumentCaptor.forClass(DatanodeDetails.class); |
| verify(eventPublisher, times(1)) |
| .fireEvent(eq(DATANODE_COMMAND_COUNT_UPDATED), |
| captor.capture()); |
| assertEquals(node1, captor.getValue()); |
| |
| // Send another report missing an earlier entry, and ensure it is not |
| // still reported as a stale value. |
| nodeManager.processHeartbeat(node1, |
| CommandQueueReportProto.newBuilder() |
| .addCommand(SCMCommandProto.Type.closeContainerCommand) |
| .addCount(11) |
| .build()); |
| assertEquals(-1, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(11, nodeManager.getNodeQueuedCommandCount( |
| node1, SCMCommandProto.Type.closeContainerCommand)); |
| |
| verify(eventPublisher, times(2)) |
| .fireEvent(eq(DATANODE_COMMAND_COUNT_UPDATED), |
| captor.capture()); |
| assertEquals(node1, captor.getValue()); |
| |
| // Add a a few more commands to the queue and check the counts are the sum. |
| for (int i = 0; i < 5; i++) { |
| nodeManager.addDatanodeCommand(node1.getUuid(), |
| new CloseContainerCommand(1, PipelineID.randomId())); |
| } |
| assertEquals(0, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(16, nodeManager.getTotalDatanodeCommandCount( |
| node1, SCMCommandProto.Type.closeContainerCommand)); |
| Map<SCMCommandProto.Type, Integer> counts = |
| nodeManager.getTotalDatanodeCommandCounts(node1, |
| SCMCommandProto.Type.replicateContainerCommand, |
| SCMCommandProto.Type.closeContainerCommand); |
| assertEquals(0, |
| counts.get(SCMCommandProto.Type.replicateContainerCommand)); |
| assertEquals(16, |
| counts.get(SCMCommandProto.Type.closeContainerCommand)); |
| } |
| |
| @Test |
| public void testCommandCount() |
| throws AuthenticationException, IOException { |
| SCMNodeManager nodeManager = createNodeManager(getConf()); |
| |
| UUID datanode1 = UUID.randomUUID(); |
| UUID datanode2 = UUID.randomUUID(); |
| long containerID = 1; |
| |
| SCMCommand<?> closeContainerCommand = |
| new CloseContainerCommand(containerID, PipelineID.randomId()); |
| SCMCommand<?> createPipelineCommand = |
| new CreatePipelineCommand(PipelineID.randomId(), |
| HddsProtos.ReplicationType.RATIS, |
| HddsProtos.ReplicationFactor.THREE, emptyList()); |
| |
| nodeManager.onMessage( |
| new CommandForDatanode<>(datanode1, closeContainerCommand), null); |
| nodeManager.onMessage( |
| new CommandForDatanode<>(datanode1, closeContainerCommand), null); |
| nodeManager.onMessage( |
| new CommandForDatanode<>(datanode1, createPipelineCommand), null); |
| |
| assertEquals(2, nodeManager.getCommandQueueCount( |
| datanode1, SCMCommandProto.Type.closeContainerCommand)); |
| assertEquals(1, nodeManager.getCommandQueueCount( |
| datanode1, SCMCommandProto.Type.createPipelineCommand)); |
| assertEquals(0, nodeManager.getCommandQueueCount( |
| datanode1, SCMCommandProto.Type.closePipelineCommand)); |
| |
| assertEquals(0, nodeManager.getCommandQueueCount( |
| datanode2, SCMCommandProto.Type.closeContainerCommand)); |
| } |
| |
| /** |
| * Check for NPE when datanodeDetails is passed null for sendHeartbeat. |
| * |
| * @throws IOException |
| */ |
| @Test |
| public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException, AuthenticationException { |
| try (SCMNodeManager nodeManager = createNodeManager(getConf())) { |
| NullPointerException npe = assertThrows(NullPointerException.class, |
| () -> nodeManager.processHeartbeat(null)); |
| assertThat(npe).hasMessage("Heartbeat is missing DatanodeDetails."); |
| } |
| } |
| |
| /** |
| * Asserts that a dead node, stale node and healthy nodes co-exist. The counts |
| * , lists and node ID match the expected node state. |
| * <p/> |
| * This test is pretty complicated because it explores all states of Node |
| * manager in a single test. Please read thru the comments to get an idea of |
| * the current state of the node Manager. |
| * <p/> |
| * This test is written like a state machine to avoid threads and concurrency |
| * issues. This test is replicated below with the use of threads. Avoiding |
| * threads make it easy to debug the state machine. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| /** |
| * These values are very important. Here is what it means so you don't |
| * have to look it up while reading this code. |
| * |
| * OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the |
| * HB processing thread that is running in the SCM. This thread must run |
| * for the SCM to process the Heartbeats. |
| * |
| * OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which |
| * datanodes will send heartbeats to SCM. Please note: This is the only |
| * config value for node manager that is specified in seconds. We don't |
| * want SCM heartbeat resolution to be more than in seconds. |
| * In this test it is not used, but we are forced to set it because we |
| * have validation code that checks Stale Node interval and Dead Node |
| * interval is larger than the value of |
| * OZONE_SCM_HEARTBEAT_INTERVAL. |
| * |
| * OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse |
| * from the last heartbeat for us to mark a node as stale. In this test |
| * we set that to 3. That is if a node has not heartbeat SCM for last 3 |
| * seconds we will mark it as stale. |
| * |
| * OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse |
| * from the last heartbeat for a node to be marked dead. We have an |
| * additional constraint that this must be at least 2 times bigger than |
| * Stale node Interval. |
| * |
| * With these we are trying to explore the state of this cluster with |
| * various timeouts. Each section is commented so that you can keep |
| * track of the state of the cluster nodes. |
| * |
| */ |
| |
| @Test |
| public void testScmClusterIsInExpectedState1() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, |
| MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); |
| |
| |
| /** |
| * Cluster state: Healthy: All nodes are heartbeat-ing like normal. |
| */ |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| DatanodeDetails healthyNode = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| DatanodeDetails staleNode = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| DatanodeDetails deadNode = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| nodeManager.processHeartbeat(healthyNode); |
| nodeManager.processHeartbeat(staleNode); |
| nodeManager.processHeartbeat(deadNode); |
| |
| // Sleep so that heartbeat processing thread gets to run. |
| Thread.sleep(500); |
| |
| //Assert all nodes are healthy. |
| assertEquals(3, nodeManager.getAllNodes().size()); |
| assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| |
| /** |
| * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which |
| * means that no node is heartbeating. All nodes should move to Stale. |
| */ |
| Thread.sleep(3 * 1000); |
| assertEquals(3, nodeManager.getAllNodes().size()); |
| assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceStale())); |
| |
| |
| /** |
| * Cluster State : Move healthy node back to healthy state, move other 2 |
| * nodes to Stale State. |
| * |
| * We heartbeat healthy node after 1 second and let other 2 nodes elapse |
| * the 3 second windows. |
| */ |
| |
| nodeManager.processHeartbeat(healthyNode); |
| nodeManager.processHeartbeat(staleNode); |
| nodeManager.processHeartbeat(deadNode); |
| |
| Thread.sleep(1500); |
| nodeManager.processHeartbeat(healthyNode); |
| Thread.sleep(2 * 1000); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| |
| |
| // 3.5 seconds from last heartbeat for the stale and deadNode. So those |
| // 2 nodes must move to Stale state and the healthy node must |
| // remain in the healthy State. |
| List<DatanodeDetails> healthyList = nodeManager.getNodes( |
| NodeStatus.inServiceHealthy()); |
| assertEquals(1, healthyList.size(), "Expected one healthy node"); |
| assertEquals(healthyNode.getUuid(), healthyList.get(0).getUuid(), |
| "Healthy node is not the expected ID"); |
| |
| assertEquals(2, nodeManager.getNodeCount(NodeStatus.inServiceStale())); |
| |
| /** |
| * Cluster State: Allow healthyNode to remain in healthy state and |
| * staleNode to move to stale state and deadNode to move to dead state. |
| */ |
| |
| nodeManager.processHeartbeat(healthyNode); |
| nodeManager.processHeartbeat(staleNode); |
| Thread.sleep(1500); |
| nodeManager.processHeartbeat(healthyNode); |
| Thread.sleep(2 * 1000); |
| |
| // 3.5 seconds have elapsed for stale node, so it moves into Stale. |
| // 7 seconds have elapsed for dead node, so it moves into dead. |
| // 2 Seconds have elapsed for healthy node, so it stays in healthy state. |
| healthyList = nodeManager.getNodes((NodeStatus.inServiceHealthy())); |
| List<DatanodeDetails> staleList = |
| nodeManager.getNodes(NodeStatus.inServiceStale()); |
| List<DatanodeDetails> deadList = |
| nodeManager.getNodes(NodeStatus.inServiceDead()); |
| |
| assertEquals(3, nodeManager.getAllNodes().size()); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceStale())); |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceDead())); |
| |
| assertEquals(1, healthyList.size(), "Expected one healthy node"); |
| assertEquals(healthyNode.getUuid(), healthyList.get(0).getUuid(), |
| "Healthy node is not the expected ID"); |
| |
| assertEquals(1, staleList.size(), "Expected one stale node"); |
| assertEquals(staleNode.getUuid(), staleList.get(0).getUuid(), |
| "Stale node is not the expected ID"); |
| |
| assertEquals(1, deadList.size(), "Expected one dead node"); |
| assertEquals(deadNode.getUuid(), deadList.get(0).getUuid(), |
| "Dead node is not the expected ID"); |
| /** |
| * Cluster State : let us heartbeat all the nodes and verify that we get |
| * back all the nodes in healthy state. |
| */ |
| nodeManager.processHeartbeat(healthyNode); |
| nodeManager.processHeartbeat(staleNode); |
| nodeManager.processHeartbeat(deadNode); |
| Thread.sleep(500); |
| //Assert all nodes are healthy. |
| assertEquals(3, nodeManager.getAllNodes().size()); |
| assertEquals(3, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| } |
| } |
| |
| /** |
| * Heartbeat a given set of nodes at a specified frequency. |
| * |
| * @param manager - Node Manager |
| * @param list - List of datanodeIDs |
| * @param sleepDuration - Duration to sleep between heartbeats. |
| * @throws InterruptedException |
| */ |
| private void heartbeatNodeSet(SCMNodeManager manager, |
| List<DatanodeDetails> list, |
| int sleepDuration) throws InterruptedException { |
| while (!Thread.currentThread().isInterrupted()) { |
| for (DatanodeDetails dn : list) { |
| manager.processHeartbeat(dn); |
| } |
| Thread.sleep(sleepDuration); |
| } |
| } |
| |
| /** |
| * Create a set of Nodes with a given prefix. |
| * |
| * @param count - number of nodes. |
| * @return List of Nodes. |
| */ |
| private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int |
| count) { |
| List<DatanodeDetails> list = new ArrayList<>(); |
| for (int x = 0; x < count; x++) { |
| DatanodeDetails datanodeDetails = HddsTestUtils |
| .createRandomDatanodeAndRegister(nodeManager); |
| list.add(datanodeDetails); |
| } |
| return list; |
| } |
| |
| /** |
| * Function that tells us if we found the right number of stale nodes. |
| * |
| * @param nodeManager - node manager |
| * @param count - number of stale nodes to look for. |
| * @return true if we found the expected number. |
| */ |
| private boolean findNodes(NodeManager nodeManager, int count, |
| HddsProtos.NodeState state) { |
| return count == nodeManager.getNodeCount(NodeStatus.inServiceStale()); |
| } |
| |
| /** |
| * Asserts that we can create a set of nodes that send its heartbeats from |
| * different threads and NodeManager behaves as expected. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| @Test |
| public void testScmClusterIsInExpectedState2() |
| throws IOException, InterruptedException, TimeoutException, |
| AuthenticationException { |
| final int healthyCount = 5000; |
| final int staleCount = 100; |
| final int deadCount = 10; |
| |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, |
| MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); |
| |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager, |
| healthyCount); |
| List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager, |
| staleCount); |
| List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, |
| deadCount); |
| |
| Runnable healthyNodeTask = () -> { |
| try { |
| // 2 second heartbeat makes these nodes stay healthy. |
| heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000); |
| } catch (InterruptedException ignored) { |
| } |
| }; |
| |
| Runnable staleNodeTask = () -> { |
| try { |
| // 4 second heartbeat makes these nodes go to stale and back to |
| // healthy again. |
| heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000); |
| } catch (InterruptedException ignored) { |
| } |
| }; |
| |
| // No Thread just one time HBs the node manager, so that these will be |
| // marked as dead nodes eventually. |
| for (DatanodeDetails dn : deadNodeList) { |
| nodeManager.processHeartbeat(dn); |
| } |
| |
| |
| Thread thread1 = new Thread(healthyNodeTask); |
| thread1.setDaemon(true); |
| thread1.start(); |
| |
| |
| Thread thread2 = new Thread(staleNodeTask); |
| thread2.setDaemon(true); |
| thread2.start(); |
| |
| Thread.sleep(10 * 1000); |
| |
| // Assert all healthy nodes are healthy now, this has to be a greater |
| // than check since Stale nodes can be healthy when we check the state. |
| |
| assertThat(nodeManager.getNodeCount(NodeStatus.inServiceHealthy())) |
| .isGreaterThanOrEqualTo(healthyCount); |
| |
| assertEquals(deadCount, |
| nodeManager.getNodeCount(NodeStatus.inServiceDead())); |
| |
| List<DatanodeDetails> deadList = |
| nodeManager.getNodes(NodeStatus.inServiceDead()); |
| |
| for (DatanodeDetails node : deadList) { |
| assertThat(deadNodeList).contains(node); |
| } |
| |
| |
| |
| // Checking stale nodes is tricky since they have to move between |
| // healthy and stale to avoid becoming dead nodes. So we search for |
| // that state for a while, if we don't find that state waitfor will |
| // throw. |
| GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), |
| 500, 4 * 1000); |
| |
| thread1.interrupt(); |
| thread2.interrupt(); |
| } |
| } |
| |
| /** |
| * Asserts that we can handle 6000+ nodes heartbeating SCM. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmCanHandleScale() |
| throws IOException, InterruptedException, TimeoutException, |
| AuthenticationException { |
| final int healthyCount = 3000; |
| final int staleCount = 3000; |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, |
| MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, |
| SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, |
| MILLISECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000, |
| MILLISECONDS); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| List<DatanodeDetails> healthyList = createNodeSet(nodeManager, |
| healthyCount); |
| List<DatanodeDetails> staleList = createNodeSet(nodeManager, |
| staleCount); |
| |
| Runnable healthyNodeTask = () -> { |
| try { |
| heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); |
| } catch (InterruptedException ignored) { |
| |
| } |
| }; |
| |
| Runnable staleNodeTask = () -> { |
| try { |
| heartbeatNodeSet(nodeManager, staleList, 4 * 1000); |
| } catch (InterruptedException ignored) { |
| } |
| }; |
| |
| Thread thread1 = new Thread(healthyNodeTask); |
| thread1.setDaemon(true); |
| thread1.start(); |
| |
| Thread thread2 = new Thread(staleNodeTask); |
| thread2.setDaemon(true); |
| thread2.start(); |
| Thread.sleep(3 * 1000); |
| |
| GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), |
| 500, 20 * 1000); |
| assertEquals(healthyCount + staleCount, |
| nodeManager.getAllNodes().size(), "Node count mismatch"); |
| |
| thread1.interrupt(); |
| thread2.interrupt(); |
| } |
| } |
| |
| /** |
| * Test multiple nodes sending initial heartbeat with their node report. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmStatsFromNodeReport() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| final int nodeCount = 10; |
| final long capacity = 2000; |
| final long used = 100; |
| final long remaining = capacity - used; |
| List<DatanodeDetails> dnList = new ArrayList<>(nodeCount); |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| |
| EventQueue eventQueue = (EventQueue) scm.getEventQueue(); |
| for (int x = 0; x < nodeCount; x++) { |
| DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); |
| dnList.add(dn); |
| UUID dnId = dn.getUuid(); |
| long free = capacity - used; |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| StorageReportProto report = HddsTestUtils |
| .createStorageReport(dnId, storagePath, capacity, used, free, null); |
| nodeManager.register(dn, HddsTestUtils.createNodeReport( |
| Arrays.asList(report), emptyList()), null); |
| nodeManager.processHeartbeat(dn); |
| } |
| //TODO: wait for EventQueue to be processed |
| eventQueue.processAll(8000L); |
| |
| assertEquals(nodeCount, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(capacity * nodeCount, (long) nodeManager.getStats().getCapacity().get()); |
| assertEquals(used * nodeCount, (long) nodeManager.getStats().getScmUsed().get()); |
| assertEquals(remaining * nodeCount, (long) nodeManager.getStats().getRemaining().get()); |
| assertEquals(1, nodeManager.minHealthyVolumeNum(dnList)); |
| dnList.clear(); |
| } |
| } |
| |
| private List<StorageReportProto> generateStorageReportProto( |
| int volumeCount, UUID dnId, long capacity, long used, long remaining) { |
| List<StorageReportProto> reports = new ArrayList<>(volumeCount); |
| boolean failed = true; |
| for (int x = 0; x < volumeCount; x++) { |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| reports.add(HddsTestUtils |
| .createStorageReport(dnId, storagePath, capacity, |
| used, remaining, null, failed)); |
| failed = !failed; |
| } |
| return reports; |
| } |
| |
| private static Stream<Arguments> calculateStoragePercentageScenarios() { |
| return Stream.of( |
| Arguments.of(600, 65, 500, 1, "600.0B", "10.83", "5.83"), |
| Arguments.of(10000, 1000, 8800, 12, "117.2KB", "10.00", "2.00"), |
| Arguments.of(100000000, 1000, 899999, 12, "1.1GB", "0.00", "99.10"), |
| Arguments.of(10000, 1000, 0, 0, "0.0B", "N/A", "N/A"), |
| Arguments.of(0, 0, 0, 0, "0.0B", "N/A", "N/A"), |
| Arguments.of(1010, 547, 400, 5, "4.9KB", "54.16", "6.24") |
| ); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("calculateStoragePercentageScenarios") |
| public void testCalculateStoragePercentage(long perCapacity, |
| long used, long remaining, int volumeCount, String totalCapacity, |
| String scmUsedPerc, String nonScmUsedPerc) { |
| DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); |
| UUID dnId = dn.getUuid(); |
| List<StorageReportProto> reports = volumeCount > 0 ? |
| generateStorageReportProto(volumeCount, dnId, perCapacity, |
| used, remaining) : null; |
| String capacityResult = SCMNodeManager.calculateStorageCapacity(reports); |
| assertEquals(totalCapacity, capacityResult); |
| String[] storagePercentage = SCMNodeManager.calculateStoragePercentage( |
| reports); |
| assertEquals(scmUsedPerc, storagePercentage[0]); |
| assertEquals(nonScmUsedPerc, storagePercentage[1]); |
| } |
| |
| /** |
| * Test multiple nodes sending initial heartbeat with their node report |
| * with multiple volumes. |
| * |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void tesVolumeInfoFromNodeReport() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| final int volumeCount = 10; |
| final long capacity = 2000; |
| final long used = 100; |
| List<DatanodeDetails> dnList = new ArrayList<>(1); |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| EventQueue eventQueue = (EventQueue) scm.getEventQueue(); |
| DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); |
| dnList.add(dn); |
| UUID dnId = dn.getUuid(); |
| long free = capacity - used; |
| List<StorageReportProto> reports = new ArrayList<>(volumeCount); |
| boolean failed = true; |
| for (int x = 0; x < volumeCount; x++) { |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| reports.add(HddsTestUtils |
| .createStorageReport(dnId, storagePath, capacity, |
| used, free, null, failed)); |
| failed = !failed; |
| } |
| nodeManager.register(dn, HddsTestUtils.createNodeReport(reports, |
| emptyList()), null); |
| nodeManager.processHeartbeat(dn); |
| //TODO: wait for EventQueue to be processed |
| eventQueue.processAll(8000L); |
| |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(volumeCount / 2, nodeManager.minHealthyVolumeNum(dnList)); |
| dnList.clear(); |
| } |
| } |
| |
| |
| /** |
| * Test single node stat update based on nodereport from different heartbeat |
| * status (healthy, stale and dead). |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| @Test |
| public void testScmNodeReportUpdate() |
| throws IOException, InterruptedException, TimeoutException, |
| AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| final int heartbeatCount = 5; |
| final int nodeCount = 1; |
| final int interval = 100; |
| |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, |
| MILLISECONDS); |
| conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); |
| conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); |
| |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| DatanodeDetails datanodeDetails = |
| HddsTestUtils.createRandomDatanodeAndRegister(nodeManager); |
| NodeReportHandler nodeReportHandler = new NodeReportHandler(nodeManager); |
| EventPublisher publisher = mock(EventPublisher.class); |
| final long capacity = 2000; |
| final long usedPerHeartbeat = 100; |
| UUID dnId = datanodeDetails.getUuid(); |
| for (int x = 0; x < heartbeatCount; x++) { |
| long scmUsed = x * usedPerHeartbeat; |
| long remaining = capacity - scmUsed; |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| StorageReportProto report = HddsTestUtils |
| .createStorageReport(dnId, storagePath, capacity, scmUsed, |
| remaining, null); |
| NodeReportProto nodeReportProto = HddsTestUtils.createNodeReport( |
| Arrays.asList(report), emptyList()); |
| nodeReportHandler.onMessage( |
| new NodeReportFromDatanode(datanodeDetails, nodeReportProto), |
| publisher); |
| nodeManager.processHeartbeat(datanodeDetails); |
| Thread.sleep(100); |
| } |
| |
| final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1); |
| final long expectedRemaining = capacity - expectedScmUsed; |
| |
| GenericTestUtils.waitFor( |
| () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, |
| 100, 4 * 1000); |
| |
| long foundCapacity = nodeManager.getStats().getCapacity().get(); |
| assertEquals(capacity, foundCapacity); |
| |
| long foundScmUsed = nodeManager.getStats().getScmUsed().get(); |
| assertEquals(expectedScmUsed, foundScmUsed); |
| |
| long foundRemaining = nodeManager.getStats().getRemaining().get(); |
| assertEquals(expectedRemaining, foundRemaining); |
| |
| // Test NodeManager#getNodeStats |
| assertEquals(nodeCount, nodeManager.getNodeStats().size()); |
| long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get() |
| .getCapacity().get(); |
| assertEquals(capacity, nodeCapacity); |
| |
| foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() |
| .get(); |
| assertEquals(expectedScmUsed, foundScmUsed); |
| |
| foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() |
| .getRemaining().get(); |
| assertEquals(expectedRemaining, foundRemaining); |
| |
| // Compare the result from |
| // NodeManager#getNodeStats and NodeManager#getNodeStat |
| SCMNodeStat stat1 = nodeManager.getNodeStats(). |
| get(datanodeDetails); |
| SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get(); |
| assertEquals(stat1, stat2); |
| |
| // Wait up to 4s so that the node becomes stale |
| // Verify the usage info should be unchanged. |
| GenericTestUtils.waitFor( |
| () -> nodeManager.getNodeCount(NodeStatus.inServiceStale()) == 1, 100, |
| 4 * 1000); |
| assertEquals(nodeCount, nodeManager.getNodeStats().size()); |
| |
| foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() |
| .getCapacity().get(); |
| assertEquals(capacity, foundCapacity); |
| foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get() |
| .getScmUsed().get(); |
| assertEquals(expectedScmUsed, foundScmUsed); |
| |
| foundRemaining = nodeManager.getNodeStat(datanodeDetails).get(). |
| getRemaining().get(); |
| assertEquals(expectedRemaining, foundRemaining); |
| |
| // Wait up to 4 more seconds so the node becomes dead |
| // Verify usage info should be updated. |
| GenericTestUtils.waitFor( |
| () -> nodeManager.getNodeCount(NodeStatus.inServiceDead()) == 1, 100, |
| 4 * 1000); |
| |
| assertEquals(0, nodeManager.getNodeStats().size()); |
| foundCapacity = nodeManager.getStats().getCapacity().get(); |
| assertEquals(0, foundCapacity); |
| |
| foundScmUsed = nodeManager.getStats().getScmUsed().get(); |
| assertEquals(0, foundScmUsed); |
| |
| foundRemaining = nodeManager.getStats().getRemaining().get(); |
| assertEquals(0, foundRemaining); |
| |
| nodeManager.processHeartbeat(datanodeDetails); |
| |
| // Wait up to 5 seconds so that the dead node becomes healthy |
| // Verify usage info should be updated. |
| GenericTestUtils.waitFor( |
| () -> nodeManager.getNodeCount(NodeStatus.inServiceHealthy()) == 1, |
| 100, 5 * 1000); |
| GenericTestUtils.waitFor( |
| () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, |
| 100, 4 * 1000); |
| assertEquals(nodeCount, nodeManager.getNodeStats().size()); |
| foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() |
| .getCapacity().get(); |
| assertEquals(capacity, foundCapacity); |
| foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() |
| .get(); |
| assertEquals(expectedScmUsed, foundScmUsed); |
| foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() |
| .getRemaining().get(); |
| assertEquals(expectedRemaining, foundRemaining); |
| } |
| } |
| |
| @Test |
| public void testHandlingSCMCommandEvent() |
| throws IOException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, |
| 100, TimeUnit.MILLISECONDS); |
| |
| DatanodeDetails datanodeDetails = randomDatanodeDetails(); |
| UUID dnId = datanodeDetails.getUuid(); |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| StorageReportProto report = |
| HddsTestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null); |
| |
| EventQueue eq = new EventQueue(); |
| try (SCMNodeManager nodemanager = createNodeManager(conf)) { |
| eq.addHandler(DATANODE_COMMAND, nodemanager); |
| |
| nodemanager |
| .register(datanodeDetails, HddsTestUtils.createNodeReport( |
| Arrays.asList(report), emptyList()), |
| HddsTestUtils.getRandomPipelineReports()); |
| eq.fireEvent(DATANODE_COMMAND, |
| new CommandForDatanode<>(datanodeDetails.getUuid(), |
| new CloseContainerCommand(1L, |
| PipelineID.randomId()))); |
| |
| eq.processAll(1000L); |
| List<SCMCommand> command = |
| nodemanager.processHeartbeat(datanodeDetails); |
| // With dh registered, SCM will send create pipeline command to dn |
| assertThat(command.size()).isGreaterThanOrEqualTo(1); |
| assertTrue(command.get(0).getClass().equals( |
| CloseContainerCommand.class) || |
| command.get(1).getClass().equals(CloseContainerCommand.class)); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Test add node into a 4-layer network topology during node register. |
| */ |
| @Test |
| public void testScmRegisterNodeWith4LayerNetworkTopology() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| |
| // create table mapping file |
| String[] hostNames = {"host1", "host2", "host3", "host4"}; |
| String[] ipAddress = {"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"}; |
| String mapFile = this.getClass().getClassLoader() |
| .getResource("nodegroup-mapping").getPath(); |
| |
| // create and register nodes |
| conf.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, |
| "org.apache.hadoop.net.TableMapping"); |
| conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile); |
| conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, |
| "network-topology-nodegroup.xml"); |
| final int nodeCount = hostNames.length; |
| // use default IP address to resolve node |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| for (int i = 0; i < nodeCount; i++) { |
| DatanodeDetails node = createDatanodeDetails( |
| UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null); |
| nodeManager.register(node, null, null); |
| } |
| |
| // verify network topology cluster has all the registered nodes |
| Thread.sleep(4 * 1000); |
| NetworkTopology clusterMap = scm.getClusterMap(); |
| assertEquals(nodeCount, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(nodeCount, clusterMap.getNumOfLeafNode("")); |
| assertEquals(4, clusterMap.getMaxLevel()); |
| List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); |
| nodeList.forEach(node -> assertTrue( |
| node.getNetworkLocation().startsWith("/rack1/ng"))); |
| nodeList.forEach(node -> assertNotNull(node.getParent())); |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testScmRegisterNodeWithNetworkTopology(boolean useHostname) |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| conf.setBoolean(DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME, |
| useHostname); |
| |
| // create table mapping file |
| String[] hostNames = {"host1", "host2", "host3", "host4"}; |
| String[] ipAddress = {"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"}; |
| String mapFile = this.getClass().getClassLoader() |
| .getResource("rack-mapping").getPath(); |
| |
| // create and register nodes |
| conf.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, |
| "org.apache.hadoop.net.TableMapping"); |
| conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile); |
| |
| final int nodeCount = hostNames.length; |
| // use default IP address to resolve node |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| for (int i = 0; i < nodeCount; i++) { |
| DatanodeDetails node = createDatanodeDetails( |
| UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null); |
| nodeManager.register(node, null, null); |
| } |
| |
| // verify network topology cluster has all the registered nodes |
| Thread.sleep(4 * 1000); |
| NetworkTopology clusterMap = scm.getClusterMap(); |
| assertEquals(nodeCount, |
| nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(nodeCount, clusterMap.getNumOfLeafNode("")); |
| assertEquals(3, clusterMap.getMaxLevel()); |
| List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); |
| nodeList.forEach(node -> |
| assertEquals("/rack1", node.getNetworkLocation())); |
| |
| // test get node |
| Arrays.stream(hostNames).forEach(hostname -> assertNotEquals(0, |
| nodeManager.getNodesByAddress(hostname).size())); |
| |
| Arrays.stream(ipAddress).forEach(ip -> assertNotEquals(0, |
| nodeManager.getNodesByAddress(ip).size())); |
| } |
| } |
| |
| @Test |
| public void testGetNodeInfo() |
| throws IOException, InterruptedException, NodeNotFoundException, |
| AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| final int nodeCount = 6; |
| SCMNodeManager nodeManager = createNodeManager(conf); |
| |
| for (int i = 0; i < nodeCount; i++) { |
| DatanodeDetails datanodeDetails = |
| MockDatanodeDetails.randomDatanodeDetails(); |
| final long capacity = 2000; |
| final long used = 100; |
| final long remaining = 1900; |
| UUID dnId = datanodeDetails.getUuid(); |
| String storagePath = testDir.getAbsolutePath() + "/" + dnId; |
| StorageReportProto report = HddsTestUtils |
| .createStorageReport(dnId, storagePath, capacity, used, |
| remaining, null); |
| |
| nodeManager.register(datanodeDetails, HddsTestUtils.createNodeReport( |
| Arrays.asList(report), emptyList()), |
| HddsTestUtils.getRandomPipelineReports()); |
| |
| nodeManager.register(datanodeDetails, |
| HddsTestUtils.createNodeReport(Arrays.asList(report), |
| emptyList()), |
| HddsTestUtils.getRandomPipelineReports()); |
| nodeManager.processHeartbeat(datanodeDetails); |
| if (i == 5) { |
| nodeManager.setNodeOperationalState(datanodeDetails, |
| HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE); |
| } |
| if (i == 3 || i == 4) { |
| nodeManager.setNodeOperationalState(datanodeDetails, |
| HddsProtos.NodeOperationalState.DECOMMISSIONED); |
| } |
| } |
| Thread.sleep(100); |
| |
| Map<String, Long> stats = nodeManager.getNodeInfo(); |
| // 3 IN_SERVICE nodes: |
| assertEquals(6000, stats.get("DiskCapacity").longValue()); |
| assertEquals(300, stats.get("DiskUsed").longValue()); |
| assertEquals(5700, stats.get("DiskRemaining").longValue()); |
| |
| // 2 Decommissioned nodes |
| assertEquals(4000, stats.get("DecommissionedDiskCapacity").longValue()); |
| assertEquals(200, stats.get("DecommissionedDiskUsed").longValue()); |
| assertEquals(3800, stats.get("DecommissionedDiskRemaining").longValue()); |
| |
| // 1 Maintenance node |
| assertEquals(2000, stats.get("MaintenanceDiskCapacity").longValue()); |
| assertEquals(100, stats.get("MaintenanceDiskUsed").longValue()); |
| assertEquals(1900, stats.get("MaintenanceDiskRemaining").longValue()); |
| |
| // All nodes |
| assertEquals(12000, stats.get("TotalCapacity").longValue()); |
| assertEquals(600, stats.get("TotalUsed").longValue()); |
| } |
| |
| /** |
| * Test add node into a 4-layer network topology during node register. |
| */ |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testGetNodesByAddress(boolean useHostname) |
| throws IOException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| conf.setBoolean(DFSConfigKeysLegacy.DFS_DATANODE_USE_DN_HOSTNAME, |
| useHostname); |
| |
| // create a set of hosts - note two hosts on "host1" |
| String[] hostNames = {"host1", "host1", "host2", "host3", "host4"}; |
| String[] ipAddress = |
| {"1.2.3.4", "1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"}; |
| |
| final int nodeCount = hostNames.length; |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| for (int i = 0; i < nodeCount; i++) { |
| DatanodeDetails node = createDatanodeDetails( |
| UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null); |
| nodeManager.register(node, null, null); |
| } |
| // test get node |
| assertEquals(0, nodeManager.getNodesByAddress(null).size()); |
| |
| assertEquals(2, nodeManager.getNodesByAddress("host1").size()); |
| assertEquals(1, nodeManager.getNodesByAddress("host2").size()); |
| assertEquals(0, nodeManager.getNodesByAddress("unknown").size()); |
| |
| assertEquals(2, nodeManager.getNodesByAddress("1.2.3.4").size()); |
| assertEquals(1, nodeManager.getNodesByAddress("2.3.4.5").size()); |
| assertEquals(0, nodeManager.getNodesByAddress("1.9.8.7").size()); |
| } |
| } |
| |
| /** |
| * Test node register with updated IP and host name. |
| */ |
| @Test |
| public void testScmRegisterNodeWithUpdatedIpAndHostname() |
| throws IOException, InterruptedException, AuthenticationException { |
| OzoneConfiguration conf = getConf(); |
| conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, |
| MILLISECONDS); |
| |
| // create table mapping file |
| String hostName = "host1"; |
| String ipAddress = "1.2.3.4"; |
| String mapFile = this.getClass().getClassLoader() |
| .getResource("nodegroup-mapping").getPath(); |
| conf.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, |
| "org.apache.hadoop.net.TableMapping"); |
| conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile); |
| conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, |
| "network-topology-nodegroup.xml"); |
| |
| // use default IP address to resolve node |
| try (SCMNodeManager nodeManager = createNodeManager(conf)) { |
| String nodeUuid = UUID.randomUUID().toString(); |
| DatanodeDetails node = createDatanodeDetails( |
| nodeUuid, hostName, ipAddress, null); |
| nodeManager.register(node, null, null); |
| |
| // verify network topology cluster has all the registered nodes |
| Thread.sleep(2 * 1000); |
| NetworkTopology clusterMap = scm.getClusterMap(); |
| assertEquals(1, |
| nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(1, clusterMap.getNumOfLeafNode("")); |
| assertEquals(4, clusterMap.getMaxLevel()); |
| List<DatanodeDetails> nodeList = nodeManager.getAllNodes(); |
| assertEquals(1, nodeList.size()); |
| |
| DatanodeDetails returnedNode = nodeList.get(0); |
| assertEquals(ipAddress, returnedNode.getIpAddress()); |
| assertEquals(hostName, returnedNode.getHostName()); |
| assertTrue(returnedNode.getNetworkLocation() |
| .startsWith("/rack1/ng")); |
| assertNotNull(returnedNode.getParent()); |
| |
| // test updating ip address and host name |
| String updatedIpAddress = "2.3.4.5"; |
| String updatedHostName = "host2"; |
| DatanodeDetails updatedNode = createDatanodeDetails( |
| nodeUuid, updatedHostName, updatedIpAddress, null); |
| nodeManager.register(updatedNode, null, null); |
| |
| assertEquals(1, nodeManager.getNodeCount(NodeStatus.inServiceHealthy())); |
| assertEquals(1, clusterMap.getNumOfLeafNode("")); |
| assertEquals(4, clusterMap.getMaxLevel()); |
| List<DatanodeDetails> updatedNodeList = nodeManager.getAllNodes(); |
| assertEquals(1, updatedNodeList.size()); |
| |
| DatanodeDetails returnedUpdatedNode = updatedNodeList.get(0); |
| assertEquals(updatedIpAddress, returnedUpdatedNode.getIpAddress()); |
| assertEquals(updatedHostName, returnedUpdatedNode.getHostName()); |
| assertTrue(returnedUpdatedNode.getNetworkLocation() |
| .startsWith("/rack1/ng")); |
| assertNotNull(returnedUpdatedNode.getParent()); |
| |
| assertEquals(emptyList(), nodeManager.getNodesByAddress(hostName)); |
| assertEquals(emptyList(), nodeManager.getNodesByAddress(ipAddress)); |
| } |
| } |
| } |