blob: db76d6678789ad78a01ddd549e36a4fc5f3b1f01 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test the SCM Node Manager class.
*/
public class TestSCMNodeManager {
private File testDir;
private StorageContainerManager scm;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
}
@Before
public void setup() {
testDir = PathUtils.getTestDir(
TestSCMNodeManager.class);
}
@After
public void cleanup() {
if (scm != null) {
scm.stop();
scm.join();
}
FileUtil.fullyDelete(testDir);
}
/**
* Returns a new copy of Configuration.
*
* @return Config
*/
OzoneConfiguration getConf() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
testDir.getAbsolutePath());
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
return conf;
}
/**
* Creates a NodeManager.
*
* @param config - Config for the node manager.
* @return SCNNodeManager
* @throws IOException
*/
SCMNodeManager createNodeManager(OzoneConfiguration config)
throws IOException, AuthenticationException {
scm = HddsTestUtils.getScm(config);
return (SCMNodeManager) scm.getScmNodeManager();
}
/**
* Tests that Node manager handles heartbeats correctly, and comes out of
* safe Mode.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmHeartbeat()
throws IOException, InterruptedException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
int registeredNodes = 5;
// Send some heartbeats from different nodes.
for (int x = 0; x < registeredNodes; x++) {
DatanodeDetails datanodeDetails = TestUtils
.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(datanodeDetails);
}
//TODO: wait for heartbeat to be processed
Thread.sleep(4 * 1000);
assertTrue("Heartbeat thread should have picked up the" +
"scheduled heartbeats.",
nodeManager.getAllNodes().size() == registeredNodes);
}
}
/**
* asserts that if we send no heartbeats node manager stays in safemode.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmNoHeartbeats()
throws IOException, InterruptedException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
//TODO: wait for heartbeat to be processed
Thread.sleep(4 * 1000);
assertTrue("No heartbeats, 0 nodes should be registered",
nodeManager.getAllNodes().size() == 0);
}
}
/**
* Asserts that adding heartbeats after shutdown does not work. This implies
* that heartbeat thread has been shutdown safely by closing the node
* manager.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmShutdown()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS);
SCMNodeManager nodeManager = createNodeManager(conf);
DatanodeDetails datanodeDetails = TestUtils
.createRandomDatanodeAndRegister(nodeManager);
nodeManager.close();
// These should never be processed.
nodeManager.processHeartbeat(datanodeDetails);
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
//TODO: add assertion
}
/**
* Asserts that we detect as many healthy nodes as we have generated heartbeat
* for.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmHealthyNodeCount()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
final int count = 10;
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < count; x++) {
DatanodeDetails datanodeDetails = TestUtils
.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(datanodeDetails);
}
//TODO: wait for heartbeat to be processed
Thread.sleep(4 * 1000);
assertEquals(count, nodeManager.getNodeCount(HEALTHY));
}
}
/**
* Asserts that if Stale Interval value is more than 5 times the value of HB
* processing thread it is a sane value.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmSanityOfUserConfig2()
throws IOException, AuthenticationException {
OzoneConfiguration conf = getConf();
final int interval = 100;
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS);
// This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL
// and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS);
createNodeManager(conf).close();
}
/**
* Asserts that a single node moves from Healthy to stale node, then from
* stale node to dead node if it misses enough heartbeats.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmDetectStaleAndDeadNode()
throws IOException, InterruptedException, AuthenticationException {
final int interval = 100;
final int nodeCount = 10;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount);
DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister(
nodeManager);
// Heartbeat once
nodeManager.processHeartbeat(staleNode);
// Heartbeat all other nodes.
for (DatanodeDetails dn : nodeList) {
nodeManager.processHeartbeat(dn);
}
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
for (DatanodeDetails dn : nodeList) {
nodeManager.processHeartbeat(dn);
}
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
// node moves into stale state.
Thread.sleep(2 * 1000);
List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE);
assertEquals("Expected to find 1 stale node",
1, nodeManager.getNodeCount(STALE));
assertEquals("Expected to find 1 stale node",
1, staleNodeList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getUuid(), staleNodeList.get(0).getUuid());
Thread.sleep(1000);
// heartbeat good nodes again.
for (DatanodeDetails dn : nodeList) {
nodeManager.processHeartbeat(dn);
}
// 6 seconds is the dead window for this test , so we wait a total of
// 7 seconds to make sure that the node moves into dead state.
Thread.sleep(2 * 1000);
// the stale node has been removed
staleNodeList = nodeManager.getNodes(STALE);
assertEquals("Expected to find 1 stale node",
0, nodeManager.getNodeCount(STALE));
assertEquals("Expected to find 1 stale node",
0, staleNodeList.size());
// Check for the dead node now.
List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD);
assertEquals("Expected to find 1 dead node", 1,
nodeManager.getNodeCount(DEAD));
assertEquals("Expected to find 1 dead node",
1, deadNodeList.size());
assertEquals("Dead node is not the expected ID", staleNode
.getUuid(), deadNodeList.get(0).getUuid());
}
}
/**
* Simulate a JVM Pause by pausing the health check process
* Ensure that none of the nodes with heartbeats become Dead or Stale.
* @throws IOException
* @throws InterruptedException
* @throws AuthenticationException
*/
@Test
public void testScmHandleJvmPause()
throws IOException, InterruptedException, AuthenticationException {
final int healthCheckInterval = 200; // milliseconds
final int heartbeatInterval = 1; // seconds
final int staleNodeInterval = 3; // seconds
final int deadNodeInterval = 6; // seconds
ScheduledFuture schedFuture;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
healthCheckInterval, MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
heartbeatInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
staleNodeInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
deadNodeInterval, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails node1 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
DatanodeDetails node2 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(node1);
nodeManager.processHeartbeat(node2);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(1000);
//Assert all nodes are healthy.
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
/**
* Simulate a JVM Pause and subsequent handling in following steps:
* Step 1 : stop heartbeat check process for stale node interval
* Step 2 : resume heartbeat check
* Step 3 : wait for 1 iteration of heartbeat check thread
* Step 4 : retrieve the state of all nodes - assert all are HEALTHY
* Step 5 : heartbeat for node1
* [TODO : what if there is scheduling delay of test thread in Step 5?]
* Step 6 : wait for some time to allow iterations of check process
* Step 7 : retrieve the state of all nodes - assert node2 is STALE
* and node1 is HEALTHY
*/
// Step 1 : stop health check process (simulate JVM pause)
nodeManager.pauseHealthCheck();
Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));
// Step 2 : resume health check
assertTrue("Unexpected, already skipped heartbeat checks",
(nodeManager.getSkippedHealthChecks() == 0));
schedFuture = nodeManager.unpauseHealthCheck();
// Step 3 : wait for 1 iteration of health check
try {
schedFuture.get();
assertTrue("We did not skip any heartbeat checks",
nodeManager.getSkippedHealthChecks() > 0);
} catch (ExecutionException e) {
assertEquals("Unexpected exception waiting for Scheduled Health Check",
0, 1);
}
// Step 4 : all nodes should still be HEALTHY
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
// Step 5 : heartbeat for node1
nodeManager.processHeartbeat(node1);
// Step 6 : wait for health check process to run
Thread.sleep(1000);
// Step 7 : node2 should transition to STALE
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(1, nodeManager.getNodeCount(STALE));
}
}
/**
* Check for NPE when datanodeDetails is passed null for sendHeartbeat.
*
* @throws IOException
*/
@Test
public void testScmCheckForErrorOnNullDatanodeDetails()
throws IOException, AuthenticationException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.processHeartbeat(null);
} catch (NullPointerException npe) {
GenericTestUtils.assertExceptionContains("Heartbeat is missing " +
"DatanodeDetails.", npe);
}
}
/**
* Asserts that a dead node, stale node and healthy nodes co-exist. The counts
* , lists and node ID match the expected node state.
* <p/>
* This test is pretty complicated because it explores all states of Node
* manager in a single test. Please read thru the comments to get an idea of
* the current state of the node Manager.
* <p/>
* This test is written like a state machine to avoid threads and concurrency
* issues. This test is replicated below with the use of threads. Avoiding
* threads make it easy to debug the state machine.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
/**
* These values are very important. Here is what it means so you don't
* have to look it up while reading this code.
*
* OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the
* HB processing thread that is running in the SCM. This thread must run
* for the SCM to process the Heartbeats.
*
* OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which
* datanodes will send heartbeats to SCM. Please note: This is the only
* config value for node manager that is specified in seconds. We don't
* want SCM heartbeat resolution to be more than in seconds.
* In this test it is not used, but we are forced to set it because we
* have validation code that checks Stale Node interval and Dead Node
* interval is larger than the value of
* OZONE_SCM_HEARTBEAT_INTERVAL.
*
* OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse
* from the last heartbeat for us to mark a node as stale. In this test
* we set that to 3. That is if a node has not heartbeat SCM for last 3
* seconds we will mark it as stale.
*
* OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse
* from the last heartbeat for a node to be marked dead. We have an
* additional constraint that this must be at least 2 times bigger than
* Stale node Interval.
*
* With these we are trying to explore the state of this cluster with
* various timeouts. Each section is commented so that you can keep
* track of the state of the cluster nodes.
*
*/
@Test
public void testScmClusterIsInExpectedState1()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
/**
* Cluster state: Healthy: All nodes are heartbeat-ing like normal.
*/
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails healthyNode =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
DatanodeDetails staleNode =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
DatanodeDetails deadNode =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(healthyNode);
nodeManager.processHeartbeat(staleNode);
nodeManager.processHeartbeat(deadNode);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(HEALTHY));
/**
* Cluster state: Quiesced: We are going to sleep for 3 seconds. Which
* means that no node is heartbeating. All nodes should move to Stale.
*/
Thread.sleep(3 * 1000);
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(STALE));
/**
* Cluster State : Move healthy node back to healthy state, move other 2
* nodes to Stale State.
*
* We heartbeat healthy node after 1 second and let other 2 nodes elapse
* the 3 second windows.
*/
nodeManager.processHeartbeat(healthyNode);
nodeManager.processHeartbeat(staleNode);
nodeManager.processHeartbeat(deadNode);
Thread.sleep(1500);
nodeManager.processHeartbeat(healthyNode);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
// 3.5 seconds from last heartbeat for the stale and deadNode. So those
// 2 nodes must move to Stale state and the healthy node must
// remain in the healthy State.
List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY);
assertEquals("Expected one healthy node", 1, healthyList.size());
assertEquals("Healthy node is not the expected ID", healthyNode
.getUuid(), healthyList.get(0).getUuid());
assertEquals(2, nodeManager.getNodeCount(STALE));
/**
* Cluster State: Allow healthyNode to remain in healthy state and
* staleNode to move to stale state and deadNode to move to dead state.
*/
nodeManager.processHeartbeat(healthyNode);
nodeManager.processHeartbeat(staleNode);
Thread.sleep(1500);
nodeManager.processHeartbeat(healthyNode);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
// 7 seconds have elapsed for dead node, so it moves into dead.
// 2 Seconds have elapsed for healthy node, so it stays in healthy state.
healthyList = nodeManager.getNodes(HEALTHY);
List<DatanodeDetails> staleList = nodeManager.getNodes(STALE);
List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(1, nodeManager.getNodeCount(STALE));
assertEquals(1, nodeManager.getNodeCount(DEAD));
assertEquals("Expected one healthy node",
1, healthyList.size());
assertEquals("Healthy node is not the expected ID", healthyNode
.getUuid(), healthyList.get(0).getUuid());
assertEquals("Expected one stale node",
1, staleList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getUuid(), staleList.get(0).getUuid());
assertEquals("Expected one dead node",
1, deadList.size());
assertEquals("Dead node is not the expected ID", deadNode
.getUuid(), deadList.get(0).getUuid());
/**
* Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state.
*/
nodeManager.processHeartbeat(healthyNode);
nodeManager.processHeartbeat(staleNode);
nodeManager.processHeartbeat(deadNode);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(HEALTHY));
}
}
/**
* Heartbeat a given set of nodes at a specified frequency.
*
* @param manager - Node Manager
* @param list - List of datanodeIDs
* @param sleepDuration - Duration to sleep between heartbeats.
* @throws InterruptedException
*/
private void heartbeatNodeSet(SCMNodeManager manager,
List<DatanodeDetails> list,
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
for (DatanodeDetails dn : list) {
manager.processHeartbeat(dn);
}
Thread.sleep(sleepDuration);
}
}
/**
* Create a set of Nodes with a given prefix.
*
* @param count - number of nodes.
* @return List of Nodes.
*/
private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int
count) {
List<DatanodeDetails> list = new ArrayList<>();
for (int x = 0; x < count; x++) {
DatanodeDetails datanodeDetails = TestUtils
.createRandomDatanodeAndRegister(nodeManager);
list.add(datanodeDetails);
}
return list;
}
/**
* Function that tells us if we found the right number of stale nodes.
*
* @param nodeManager - node manager
* @param count - number of stale nodes to look for.
* @return true if we found the expected number.
*/
private boolean findNodes(NodeManager nodeManager, int count,
HddsProtos.NodeState state) {
return count == nodeManager.getNodeCount(state);
}
/**
* Asserts that we can create a set of nodes that send its heartbeats from
* different threads and NodeManager behaves as expected.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testScmClusterIsInExpectedState2()
throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
final int healthyCount = 5000;
final int staleCount = 100;
final int deadCount = 10;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager,
healthyCount);
List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager,
staleCount);
List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager,
deadCount);
Runnable healthyNodeTask = () -> {
try {
// 2 second heartbeat makes these nodes stay healthy.
heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000);
} catch (InterruptedException ignored) {
}
};
Runnable staleNodeTask = () -> {
try {
// 4 second heartbeat makes these nodes go to stale and back to
// healthy again.
heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000);
} catch (InterruptedException ignored) {
}
};
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
for (DatanodeDetails dn : deadNodeList) {
nodeManager.processHeartbeat(dn);
}
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(staleNodeTask);
thread2.setDaemon(true);
thread2.start();
Thread.sleep(10 * 1000);
// Assert all healthy nodes are healthy now, this has to be a greater
// than check since Stale nodes can be healthy when we check the state.
assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount);
assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
for (DatanodeDetails node : deadList) {
assertTrue(deadNodeList.contains(node));
}
// Checking stale nodes is tricky since they have to move between
// healthy and stale to avoid becoming dead nodes. So we search for
// that state for a while, if we don't find that state waitfor will
// throw.
GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
500, 4 * 1000);
thread1.interrupt();
thread2.interrupt();
}
}
/**
* Asserts that we can handle 6000+ nodes heartbeating SCM.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmCanHandleScale()
throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
final int healthyCount = 3000;
final int staleCount = 3000;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1,
SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000,
MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000,
MILLISECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeDetails> healthyList = createNodeSet(nodeManager,
healthyCount);
List<DatanodeDetails> staleList = createNodeSet(nodeManager,
staleCount);
Runnable healthyNodeTask = () -> {
try {
heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
} catch (InterruptedException ignored) {
}
};
Runnable staleNodeTask = () -> {
try {
heartbeatNodeSet(nodeManager, staleList, 4 * 1000);
} catch (InterruptedException ignored) {
}
};
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(staleNodeTask);
thread2.setDaemon(true);
thread2.start();
Thread.sleep(3 * 1000);
GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
500, 20 * 1000);
assertEquals("Node count mismatch",
healthyCount + staleCount, nodeManager.getAllNodes().size());
thread1.interrupt();
thread2.interrupt();
}
}
/**
* Test multiple nodes sending initial heartbeat with their node report.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
@Ignore
// TODO: Enable this after we implement NodeReportEvent handler.
public void testScmStatsFromNodeReport()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS);
final int nodeCount = 10;
final long capacity = 2000;
final long used = 100;
final long remaining = capacity - used;
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails datanodeDetails = TestUtils
.createRandomDatanodeAndRegister(nodeManager);
UUID dnId = datanodeDetails.getUuid();
long free = capacity - used;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
StorageReportProto report = TestUtils
.createStorageReport(dnId, storagePath, capacity, used, free, null);
nodeManager.processHeartbeat(datanodeDetails);
}
//TODO: wait for heartbeat to be processed
Thread.sleep(4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount, (long) nodeManager.getStats()
.getCapacity().get());
assertEquals(used * nodeCount, (long) nodeManager.getStats()
.getScmUsed().get());
assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
.getRemaining().get());
}
}
/**
* Test single node stat update based on nodereport from different heartbeat
* status (healthy, stale and dead).
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
@Ignore
// TODO: Enable this after we implement NodeReportEvent handler.
public void testScmNodeReportUpdate()
throws IOException, InterruptedException, TimeoutException,
AuthenticationException {
OzoneConfiguration conf = getConf();
final int heartbeatCount = 5;
final int nodeCount = 1;
final int interval = 100;
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval,
MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails datanodeDetails =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
final long capacity = 2000;
final long usedPerHeartbeat = 100;
UUID dnId = datanodeDetails.getUuid();
for (int x = 0; x < heartbeatCount; x++) {
long scmUsed = x * usedPerHeartbeat;
long remaining = capacity - scmUsed;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
StorageReportProto report = TestUtils
.createStorageReport(dnId, storagePath, capacity, scmUsed,
remaining, null);
nodeManager.processHeartbeat(datanodeDetails);
Thread.sleep(100);
}
final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1);
final long expectedRemaining = capacity - expectedScmUsed;
GenericTestUtils.waitFor(
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
100, 4 * 1000);
long foundCapacity = nodeManager.getStats().getCapacity().get();
assertEquals(capacity, foundCapacity);
long foundScmUsed = nodeManager.getStats().getScmUsed().get();
assertEquals(expectedScmUsed, foundScmUsed);
long foundRemaining = nodeManager.getStats().getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Test NodeManager#getNodeStats
assertEquals(nodeCount, nodeManager.getNodeStats().size());
long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get()
.getCapacity().get();
assertEquals(capacity, nodeCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
.get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
.getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Compare the result from
// NodeManager#getNodeStats and NodeManager#getNodeStat
SCMNodeStat stat1 = nodeManager.getNodeStats().
get(datanodeDetails.getUuid());
SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get();
assertEquals(stat1, stat2);
// Wait up to 4s so that the node becomes stale
// Verify the usage info should be unchanged.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(STALE) == 1, 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
.getCapacity().get();
assertEquals(capacity, foundCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get()
.getScmUsed().get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().
getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Wait up to 4 more seconds so the node becomes dead
// Verify usage info should be updated.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(DEAD) == 1, 100,
4 * 1000);
assertEquals(0, nodeManager.getNodeStats().size());
foundCapacity = nodeManager.getStats().getCapacity().get();
assertEquals(0, foundCapacity);
foundScmUsed = nodeManager.getStats().getScmUsed().get();
assertEquals(0, foundScmUsed);
foundRemaining = nodeManager.getStats().getRemaining().get();
assertEquals(0, foundRemaining);
nodeManager.processHeartbeat(datanodeDetails);
// Wait up to 5 seconds so that the dead node becomes healthy
// Verify usage info should be updated.
GenericTestUtils.waitFor(
() -> nodeManager.getNodeCount(HEALTHY) == 1,
100, 5 * 1000);
GenericTestUtils.waitFor(
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
.getCapacity().get();
assertEquals(capacity, foundCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
.get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
.getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
}
}
@Test
public void testHandlingSCMCommandEvent()
throws IOException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
100, TimeUnit.MILLISECONDS);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
UUID dnId = datanodeDetails.getUuid();
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
StorageReportProto report =
TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null);
EventQueue eq = new EventQueue();
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
eq.addHandler(DATANODE_COMMAND, nodemanager);
nodemanager
.register(datanodeDetails, TestUtils.createNodeReport(report),
TestUtils.getRandomPipelineReports());
eq.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(datanodeDetails.getUuid(),
new CloseContainerCommand(1L,
PipelineID.randomId())));
eq.processAll(1000L);
List<SCMCommand> command =
nodemanager.processHeartbeat(datanodeDetails);
Assert.assertEquals(1, command.size());
Assert
.assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
} catch (IOException e) {
e.printStackTrace();
throw e;
}
}
/**
* Test add node into network topology during node register. Datanode
* uses Ip address to resolve network location.
*/
@Test
public void testScmRegisterNodeWithIpAddress()
throws IOException, InterruptedException, AuthenticationException {
testScmRegisterNodeWithNetworkTopology(false);
}
/**
* Test add node into network topology during node register. Datanode
* uses hostname to resolve network location.
*/
@Test
public void testScmRegisterNodeWithHostname()
throws IOException, InterruptedException, AuthenticationException {
testScmRegisterNodeWithNetworkTopology(true);
}
/**
* Test getNodesByAddress when using IPs.
*
*/
@Test
public void testgetNodesByAddressWithIpAddress()
throws IOException, InterruptedException, AuthenticationException {
testGetNodesByAddress(false);
}
/**
* Test getNodesByAddress when using hostnames.
*/
@Test
public void testgetNodesByAddressWithHostname()
throws IOException, InterruptedException, AuthenticationException {
testGetNodesByAddress(true);
}
/**
* Test add node into a 4-layer network topology during node register.
*/
@Test
public void testScmRegisterNodeWith4LayerNetworkTopology()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS);
// create table mapping file
String[] hostNames = {"host1", "host2", "host3", "host4"};
String[] ipAddress = {"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
String mapFile = this.getClass().getClassLoader()
.getResource("nodegroup-mapping").getPath();
// create and register nodes
conf.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
"org.apache.hadoop.net.TableMapping");
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile);
conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
"network-topology-nodegroup.xml");
final int nodeCount = hostNames.length;
// use default IP address to resolve node
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
for (int i = 0; i < nodeCount; i++) {
DatanodeDetails node = TestUtils.createDatanodeDetails(
UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null);
nodeManager.register(node, null, null);
nodes[i] = node;
}
// verify network topology cluster has all the registered nodes
Thread.sleep(4 * 1000);
NetworkTopology clusterMap = scm.getClusterMap();
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
assertEquals(4, clusterMap.getMaxLevel());
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
nodeList.stream().forEach(node ->
Assert.assertTrue(node.getNetworkLocation().startsWith("/rack1/ng")));
}
}
private void testScmRegisterNodeWithNetworkTopology(boolean useHostname)
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS);
// create table mapping file
String[] hostNames = {"host1", "host2", "host3", "host4"};
String[] ipAddress = {"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
String mapFile = this.getClass().getClassLoader()
.getResource("rack-mapping").getPath();
// create and register nodes
conf.set(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
"org.apache.hadoop.net.TableMapping");
conf.set(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile);
if (useHostname) {
conf.set(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, "true");
}
final int nodeCount = hostNames.length;
// use default IP address to resolve node
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
for (int i = 0; i < nodeCount; i++) {
DatanodeDetails node = TestUtils.createDatanodeDetails(
UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null);
nodeManager.register(node, null, null);
nodes[i] = node;
}
// verify network topology cluster has all the registered nodes
Thread.sleep(4 * 1000);
NetworkTopology clusterMap = scm.getClusterMap();
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(nodeCount, clusterMap.getNumOfLeafNode(""));
assertEquals(3, clusterMap.getMaxLevel());
List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
nodeList.stream().forEach(node ->
Assert.assertTrue(node.getNetworkLocation().equals("/rack1")));
// test get node
if (useHostname) {
Arrays.stream(hostNames).forEach(hostname ->
Assert.assertNotEquals(0, nodeManager.getNodesByAddress(hostname)
.size()));
} else {
Arrays.stream(ipAddress).forEach(ip ->
Assert.assertNotEquals(0, nodeManager.getNodesByAddress(ip)
.size()));
}
}
}
/**
* Test add node into a 4-layer network topology during node register.
*/
private void testGetNodesByAddress(boolean useHostname)
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS);
// create a set of hosts - note two hosts on "host1"
String[] hostNames = {"host1", "host1", "host2", "host3", "host4"};
String[] ipAddress =
{"1.2.3.4", "1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
if (useHostname) {
conf.set(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, "true");
}
final int nodeCount = hostNames.length;
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
for (int i = 0; i < nodeCount; i++) {
DatanodeDetails node = TestUtils.createDatanodeDetails(
UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null);
nodeManager.register(node, null, null);
}
// test get node
Assert.assertEquals(0, nodeManager.getNodesByAddress(null).size());
if (useHostname) {
Assert.assertEquals(2,
nodeManager.getNodesByAddress("host1").size());
Assert.assertEquals(1, nodeManager.getNodesByAddress("host2").size());
Assert.assertEquals(0, nodeManager.getNodesByAddress("unknown").size());
} else {
Assert.assertEquals(2,
nodeManager.getNodesByAddress("1.2.3.4").size());
Assert.assertEquals(1, nodeManager.getNodesByAddress("2.3.4.5").size());
Assert.assertEquals(0, nodeManager.getNodesByAddress("1.9.8.7").size());
}
}
}
}