blob: 0f3e7d12d220fdc31f853597488ef3341184193a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
import static org.junit.Assert.assertTrue;
/**
* Tests the datanode state machine class and its states.
*/
public class TestDatanodeStateMachine {
private static final Logger LOG =
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
// Changing it to 1, as current code checks for multiple scm directories,
// and fail if exists
private final int scmServerCount = 1;
private List<String> serverAddresses;
private List<RPC.Server> scmServers;
private List<ScmTestMock> mockServers;
private ExecutorService executorService;
private Configuration conf;
private File testRoot;
@Before
public void setUp() throws Exception {
conf = SCMTestUtils.getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, 500,
TimeUnit.MILLISECONDS);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
serverAddresses = new ArrayList<>();
scmServers = new ArrayList<>();
mockServers = new ArrayList<>();
for (int x = 0; x < scmServerCount; x++) {
int port = SCMTestUtils.getReuseableAddress().getPort();
String address = "127.0.0.1";
serverAddresses.add(address + ":" + port);
ScmTestMock mock = new ScmTestMock();
scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock,
new InetSocketAddress(address, port), 10));
mockServers.add(mock);
}
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES,
serverAddresses.toArray(new String[0]));
String path = GenericTestUtils
.getTempPath(TestDatanodeStateMachine.class.getSimpleName());
testRoot = new File(path);
if (!testRoot.mkdirs()) {
LOG.info("Required directories {} already exist.", testRoot);
}
File dataDir = new File(testRoot, "data");
conf.set(HDDS_DATANODE_DIR_KEY, dataDir.getAbsolutePath());
if (!dataDir.mkdirs()) {
LOG.info("Data dir create failed.");
}
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
new File(testRoot, "scm").getAbsolutePath());
path = new File(testRoot, "datanodeID").getAbsolutePath();
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR, path);
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Test Data Node State Machine Thread - %d").build());
}
@After
public void tearDown() throws Exception {
try {
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Unable to shutdown properly.");
}
} catch (InterruptedException e) {
LOG.error("Error attempting to shutdown.", e);
executorService.shutdownNow();
}
}
for (RPC.Server s : scmServers) {
s.stop();
}
} catch (Exception e) {
//ignore all execption from the shutdown
} finally {
FileUtil.fullyDelete(testRoot);
}
}
/**
* Assert that starting statemachine executes the Init State.
*/
@Test
public void testStartStopDatanodeStateMachine() throws IOException,
InterruptedException, TimeoutException {
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null)) {
stateMachine.startDaemon();
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
GenericTestUtils.waitFor(
() -> {
int size = connectionManager.getValues().size();
LOG.info("connectionManager.getValues().size() is {}", size);
return size == 1;
}, 1000, 30000);
stateMachine.stopDaemon();
assertTrue(stateMachine.isDaemonStopped());
}
}
/**
* This test explores the state machine by invoking each call in sequence just
* like as if the state machine would call it. Because this is a test we are
* able to verify each of the assumptions.
* <p>
* Here is what happens at High level.
* <p>
* 1. We start the datanodeStateMachine in the INIT State.
* <p>
* 2. We invoke the INIT state task.
* <p>
* 3. That creates a set of RPC endpoints that are ready to connect to SCMs.
* <p>
* 4. We assert that we have moved to the running state for the
* DatanodeStateMachine.
* <p>
* 5. We get the task for the Running State - Executing that running state,
* makes the first network call in of the state machine. The Endpoint is in
* the GETVERSION State and we invoke the task.
* <p>
* 6. We assert that this call was a success by checking that each of the
* endponts now have version response that it got from the SCM server that it
* was talking to and also each of the mock server serviced one RPC call.
* <p>
* 7. Since the Register is done now, next calls to get task will return
* HeartbeatTask, which sends heartbeats to SCM. We assert that we get right
* task from sub-system below.
*
* @throws IOException
*/
@Test
public void testDatanodeStateContext() throws IOException,
InterruptedException, ExecutionException, TimeoutException {
// There is no mini cluster started in this test,
// create a ID file so that state machine could load a fake datanode ID.
File idPath = new File(
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR),
OzoneConsts.OZONE_SCM_DATANODE_ID_FILE_DEFAULT);
idPath.delete();
DatanodeDetails datanodeDetails = getNewDatanodeDetails();
DatanodeDetails.Port port = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
datanodeDetails.setPort(port);
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
Assert.assertEquals(InitDatanodeState.class, task.getClass());
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
// We assert that each of the is in State GETVERSION.
Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
endpoint.getState());
}
// The Datanode has moved into Running State, since endpoints are created.
// We move to running state when we are ready to issue RPC calls to SCMs.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
// If we had called context.execute instead of calling into each state
// this would have happened automatically.
stateMachine.getContext().setState(newState);
task = stateMachine.getContext().getTask();
Assert.assertEquals(RunningDatanodeState.class, task.getClass());
// This execute will invoke getVersion calls against all SCM endpoints
// that we know of.
task.execute(executorService);
newState = task.await(10, TimeUnit.SECONDS);
// Wait for GetVersion call (called by task.execute) to finish. After
// Earlier task.execute called into GetVersion. Wait for the execution
// to finish and the endPointState to move to REGISTER state.
GenericTestUtils.waitFor(() -> {
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
if (endpoint.getState() !=
EndpointStateMachine.EndPointStates.REGISTER) {
return false;
}
}
return true;
}, 1000, 50000);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
// Since the earlier task.execute called into GetVersion, the
// endPointState Machine should move to REGISTER state.
Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
endpoint.getState());
// We assert that each of the end points have gotten a version from the
// SCM Server.
Assert.assertNotNull(endpoint.getVersion());
}
// We can also assert that all mock servers have received only one RPC
// call at this point of time.
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getRpcCount());
}
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a Register at
// the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(2, mock.getRpcCount());
}
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a
// HeartbeatTask at the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount());
}
}
}
@Test
public void testDatanodeStateMachineWithIdWriteFail() throws Exception {
File idPath = new File(
conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR),
OzoneConsts.OZONE_SCM_DATANODE_ID_FILE_DEFAULT);
idPath.delete();
DatanodeDetails datanodeDetails = getNewDatanodeDetails();
DatanodeDetails.Port port = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
datanodeDetails.setPort(port);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
Assert.assertEquals(InitDatanodeState.class, task.getClass());
//Set the idPath to read only, state machine will fail to write
// datanodeId file and set the state to shutdown.
idPath.getParentFile().mkdirs();
idPath.getParentFile().setReadOnly();
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
//As, we have changed the permission of idPath to readable, writing
// will fail and it will set the state to shutdown.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
newState);
//Setting back to writable.
idPath.getParentFile().setWritable(true);
}
}
/**
* Test state transition with a list of invalid scm configurations,
* and verify the state transits to SHUTDOWN each time.
*/
@Test
public void testDatanodeStateMachineWithInvalidConfiguration()
throws Exception {
List<Map.Entry<String, String>> confList =
new ArrayList<>();
confList.add(Maps.immutableEntry(ScmConfigKeys.OZONE_SCM_NAMES, ""));
// Invalid ozone.scm.names
/** Empty **/
confList.add(Maps.immutableEntry(
ScmConfigKeys.OZONE_SCM_NAMES, ""));
/** Invalid schema **/
confList.add(Maps.immutableEntry(
ScmConfigKeys.OZONE_SCM_NAMES, "x..y"));
/** Invalid port **/
confList.add(Maps.immutableEntry(
ScmConfigKeys.OZONE_SCM_NAMES, "scm:xyz"));
/** Port out of range **/
confList.add(Maps.immutableEntry(
ScmConfigKeys.OZONE_SCM_NAMES, "scm:123456"));
// Invalid ozone.scm.datanode.id.dir
/** Empty **/
confList.add(Maps.immutableEntry(
ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR, ""));
confList.forEach((entry) -> {
Configuration perTestConf = new Configuration(conf);
perTestConf.setStrings(entry.getKey(), entry.getValue());
LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
getNewDatanodeDetails(), perTestConf, null, null)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
newState);
} catch (Exception e) {
Assert.fail("Unexpected exception found");
}
});
}
private DatanodeDetails getNewDatanodeDetails() {
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
return DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID().toString())
.setHostName("localhost")
.setIpAddress("127.0.0.1")
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort)
.build();
}
}