blob: 0b68d4ac762aa3c2daa3f25a1369c3f23dade543 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.yaml.snakeyaml.Yaml;
/**
* Test cases for mini ozone cluster.
*/
public class TestMiniOzoneCluster {
/**
* Set a timeout for each test.
*/
@Rule
public Timeout timeout = new Timeout(300000);
private MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
private final static File WRITE_TMP = new File(TEST_ROOT, "write");
private final static File READ_TMP = new File(TEST_ROOT, "read");
@BeforeClass
public static void setup() {
conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, TEST_ROOT.toString());
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
WRITE_TMP.mkdirs();
READ_TMP.mkdirs();
}
@After
public void cleanup() {
if (cluster != null) {
cluster.shutdown();
}
}
@AfterClass
public static void afterClass() {
FileUtils.deleteQuietly(WRITE_TMP);
FileUtils.deleteQuietly(READ_TMP);
}
@Test(timeout = 60000)
public void testStartMultipleDatanodes() throws Exception {
final int numberOfNodes = 3;
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numberOfNodes)
.build();
cluster.waitForClusterToBeReady();
List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
assertEquals(numberOfNodes, datanodes.size());
for(HddsDatanodeService dn : datanodes) {
// Create a single member pipe line
List<DatanodeDetails> dns = new ArrayList<>();
dns.add(dn.getDatanodeDetails());
Pipeline pipeline = Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setNodes(dns)
.build();
// Verify client is able to connect to the container
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
client.connect();
assertTrue(client.isConnected(pipeline.getFirstNode()));
}
}
}
@Test
public void testDatanodeIDPersistent() throws Exception {
// Generate IDs for testing
DatanodeDetails id1 = randomDatanodeDetails();
DatanodeDetails id2 = randomDatanodeDetails();
DatanodeDetails id3 = randomDatanodeDetails();
id1.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 1));
id2.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 2));
id3.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 3));
// Add certificate serial id.
String certSerialId = "" + RandomUtils.nextLong();
id1.setCertSerialId(certSerialId);
// Write a single ID to the file and read it out
File validIdsFile = new File(WRITE_TMP, "valid-values.id");
validIdsFile.delete();
ContainerUtils.writeDatanodeDetailsTo(id1, validIdsFile);
// Validate using yaml parser
Yaml yaml = new Yaml();
try {
yaml.load(new FileReader(validIdsFile));
} catch (Exception e) {
Assert.fail("Failed parsing datanode id yaml.");
}
DatanodeDetails validId = ContainerUtils.readDatanodeDetailsFrom(
validIdsFile);
assertEquals(validId.getCertSerialId(), certSerialId);
assertEquals(id1, validId);
assertEquals(id1.getProtoBufMessage(), validId.getProtoBufMessage());
// Read should return an empty value if file doesn't exist
File nonExistFile = new File(READ_TMP, "non_exist.id");
nonExistFile.delete();
try {
ContainerUtils.readDatanodeDetailsFrom(nonExistFile);
Assert.fail();
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
// Read should fail if the file is malformed
File malformedFile = new File(READ_TMP, "malformed.id");
createMalformedIDFile(malformedFile);
try {
ContainerUtils.readDatanodeDetailsFrom(malformedFile);
fail("Read a malformed ID file should fail");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
// Test upgrade scenario - protobuf file instead of yaml
File protoFile = new File(WRITE_TMP, "valid-proto.id");
try (FileOutputStream out = new FileOutputStream(protoFile)) {
HddsProtos.DatanodeDetailsProto proto = id1.getProtoBufMessage();
proto.writeTo(out);
}
validId = ContainerUtils.readDatanodeDetailsFrom(protoFile);
assertEquals(validId.getCertSerialId(), certSerialId);
assertEquals(id1, validId);
assertEquals(id1.getProtoBufMessage(), validId.getProtoBufMessage());
}
@Test
public void testContainerRandomPort() throws IOException {
OzoneConfiguration ozoneConf = SCMTestUtils.getConf();
File testDir = PathUtils.getTestDir(TestOzoneContainer.class);
ozoneConf.set(DFSConfigKeysLegacy.DFS_DATANODE_DATA_DIR_KEY,
testDir.getAbsolutePath());
ozoneConf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
TEST_ROOT.toString());
// Each instance of SM will create an ozone container
// that bounds to a random port.
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
List<DatanodeStateMachine> stateMachines = new ArrayList<>();
try {
for (int i = 0; i < 3; i++) {
stateMachines.add(new DatanodeStateMachine(
randomDatanodeDetails(), ozoneConf, null, null));
}
//we need to start all the servers to get the fix ports
for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getReadChannel().start();
dsm.getContainer().getWriteChannel().start();
}
for (DatanodeStateMachine dsm : stateMachines) {
dsm.getContainer().getWriteChannel().stop();
dsm.getContainer().getReadChannel().stop();
}
//after the start the real port numbers should be available AND unique
HashSet<Integer> ports = new HashSet<Integer>();
for (DatanodeStateMachine dsm : stateMachines) {
int readPort = dsm.getContainer().getReadChannel().getIPCPort();
assertNotEquals("Port number of the service is not updated", 0,
readPort);
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(readPort));
int writePort = dsm.getContainer().getWriteChannel().getIPCPort();
assertNotEquals("Port number of the service is not updated", 0,
writePort);
assertTrue("Port of datanode service is conflicted with other server.",
ports.add(writePort));
}
} finally {
for (DatanodeStateMachine dsm : stateMachines) {
dsm.close();
}
}
// Turn off the random port flag and test again
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
try (
DatanodeStateMachine sm1 = new DatanodeStateMachine(
randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm2 = new DatanodeStateMachine(
randomDatanodeDetails(), ozoneConf, null, null);
DatanodeStateMachine sm3 = new DatanodeStateMachine(
randomDatanodeDetails(), ozoneConf, null, null);
) {
HashSet<Integer> ports = new HashSet<Integer>();
assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
assertFalse(ports.add(sm2.getContainer().getReadChannel().getIPCPort()));
assertFalse(ports.add(sm3.getContainer().getReadChannel().getIPCPort()));
assertEquals(ports.iterator().next().intValue(),
conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT));
}
}
private void createMalformedIDFile(File malformedFile)
throws IOException{
malformedFile.delete();
DatanodeDetails id = randomDatanodeDetails();
ContainerUtils.writeDatanodeDetailsTo(id, malformedFile);
FileOutputStream out = new FileOutputStream(malformedFile);
out.write("malformed".getBytes());
out.close();
}
/**
* Test that a DN can register with SCM even if it was started before the SCM.
* @throws Exception
*/
@Test (timeout = 100000)
public void testDNstartAfterSCM() throws Exception {
// Start a cluster with 3 DN
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
// Stop the SCM
StorageContainerManager scm = cluster.getStorageContainerManager();
scm.stop();
// Restart DN
cluster.restartHddsDatanode(0, false);
// DN should be in GETVERSION state till the SCM is restarted.
// Check DN endpoint state for 20 seconds
DatanodeStateMachine dnStateMachine = cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine();
for (int i = 0; i < 20; i++) {
for (EndpointStateMachine endpoint :
dnStateMachine.getConnectionManager().getValues()) {
Assert.assertEquals(
EndpointStateMachine.EndPointStates.GETVERSION,
endpoint.getState());
}
Thread.sleep(1000);
}
// DN should successfully register with the SCM after SCM is restarted.
// Restart the SCM
cluster.restartStorageContainerManager(true);
// Wait for DN to register
cluster.waitForClusterToBeReady();
// DN should be in HEARTBEAT state after registering with the SCM
for (EndpointStateMachine endpoint :
dnStateMachine.getConnectionManager().getValues()) {
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
endpoint.getState());
}
}
/**
* Test that multiple datanode directories are created in MiniOzoneCluster.
* @throws Exception
*/
@Test (timeout = 60000)
public void testMultipleDataDirs() throws Exception {
// Start a cluster with 3 DN
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(1)
.setNumDataVolumes(3)
.build();
cluster.waitForClusterToBeReady();
Assert.assertEquals(3, cluster.getHddsDatanodes().get(0)
.getDatanodeStateMachine().getContainer().getVolumeSet()
.getVolumesList().size());
}
}