blob: 2ad8640ab5f58aba09d5fea1c309b52bf1c93b8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.List;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.*;
/**
* Test persistence of satisfying files/directories.
*/
public class TestPersistentStoragePolicySatisfier {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
private NameNodeConnector nnc;
private StoragePolicySatisfier sps;
private ExternalSPSContext ctxt;
private static Path testFile =
new Path("/testFile");
private static String testFileName = testFile.toString();
private static Path parentDir = new Path("/parentDir");
private static Path parentFile = new Path(parentDir, "parentFile");
private static String parentFileName = parentFile.toString();
private static Path childDir = new Path(parentDir, "childDir");
private static Path childFile = new Path(childDir, "childFile");
private static String childFileName = childFile.toString();
private static final String COLD = "COLD";
private static final String WARM = "WARM";
private static final String ONE_SSD = "ONE_SSD";
private static StorageType[][] storageTypes = new StorageType[][] {
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD},
{StorageType.DISK, StorageType.ARCHIVE, StorageType.SSD}
};
private final int timeout = 90000;
/**
* Setup environment for every test case.
* @throws IOException
*/
public void clusterSetUp() throws Exception {
clusterSetUp(false, new HdfsConfiguration());
}
/**
* Setup environment for every test case.
* @param hdfsConf hdfs conf.
* @throws Exception
*/
public void clusterSetUp(Configuration hdfsConf) throws Exception {
clusterSetUp(false, hdfsConf);
}
/**
* Setup cluster environment.
* @param isHAEnabled if true, enable simple HA.
* @throws IOException
*/
private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
throws Exception {
conf = newConf;
conf.set(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
"3000");
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
// Reduced refresh cycle to update latest datanodes.
conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
1000);
final int dnNumber = storageTypes.length;
final short replication = 3;
MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
.storageTypes(storageTypes).storagesPerDatanode(3)
.numDataNodes(dnNumber);
if (isHAEnabled) {
clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology());
}
cluster = clusterBuilder.build();
cluster.waitActive();
if (isHAEnabled) {
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
} else {
fs = cluster.getFileSystem();
}
nnc = DFSTestUtil.getNameNodeConnector(conf,
HdfsServerConstants.MOVER_ID_PATH, 1, false);
sps = new StoragePolicySatisfier(conf);
ctxt = new ExternalSPSContext(sps, nnc);
sps.init(ctxt);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
createTestFiles(fs, replication);
}
/**
* Setup test files for testing.
* @param dfs
* @param replication
* @throws Exception
*/
private void createTestFiles(DistributedFileSystem dfs,
short replication) throws Exception {
DFSTestUtil.createFile(dfs, testFile, 1024L, replication, 0L);
DFSTestUtil.createFile(dfs, parentFile, 1024L, replication, 0L);
DFSTestUtil.createFile(dfs, childFile, 1024L, replication, 0L);
DFSTestUtil.waitReplication(dfs, testFile, replication);
DFSTestUtil.waitReplication(dfs, parentFile, replication);
DFSTestUtil.waitReplication(dfs, childFile, replication);
}
/**
* Tear down environment for every test case.
* @throws IOException
*/
private void clusterShutdown() throws IOException{
if(fs != null) {
fs.close();
fs = null;
}
if(cluster != null) {
cluster.shutdown(true);
cluster = null;
}
if (sps != null) {
sps.stopGracefully();
}
}
/**
* While satisfying file/directory, trigger the cluster's checkpoint to
* make sure satisfier persistence work as expected. This test case runs
* as below:
* 1. use satisfyStoragePolicy and add xAttr to the file.
* 2. do the checkpoint by secondary NameNode.
* 3. restart the cluster immediately.
* 4. make sure all the storage policies are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithCheckpoint() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(testFile, WARM);
fs.satisfyStoragePolicy(testFile);
// Start the checkpoint.
conf.set(
DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
SecondaryNameNode secondary = new SecondaryNameNode(conf);
secondary.doCheckpoint();
restartCluster();
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.ARCHIVE, 2, timeout, fs);
fs.setStoragePolicy(parentDir, COLD);
fs.satisfyStoragePolicy(parentDir);
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify satisfier persistence working well with multiple
* restarts operations. This test case runs as below:
* 1. satisfy the storage policy of file1.
* 2. restart the cluster.
* 3. check whether all the blocks are satisfied.
* 4. satisfy the storage policy of file2.
* 5. restart the cluster.
* 6. check whether all the blocks are satisfied.
* @throws Exception
*/
@Test(timeout = 300000)
public void testWithRestarts() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(testFile, ONE_SSD);
fs.satisfyStoragePolicy(testFile);
restartCluster();
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.SSD, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 2, timeout, fs);
// test directory
fs.setStoragePolicy(parentDir, COLD);
fs.satisfyStoragePolicy(parentDir);
restartCluster();
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify SPS xattr will be removed if the satisfy work has
* been finished, expect that the method satisfyStoragePolicy can be
* invoked on the same file again after the block movement has been
* finished:
* 1. satisfy storage policy of file1.
* 2. wait until storage policy is satisfied.
* 3. satisfy storage policy of file1 again
* 4. make sure step 3 works as expected.
* @throws Exception
*/
@Test(timeout = 300000)
public void testMultipleSatisfyStoragePolicy() throws Exception {
try {
// Lower block movement check for testing.
conf = new HdfsConfiguration();
final long minCheckTimeout = 500; // minimum value
conf.setLong(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
minCheckTimeout);
clusterSetUp(conf);
fs.setStoragePolicy(testFile, ONE_SSD);
fs.satisfyStoragePolicy(testFile);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.SSD, 1, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.DISK, 2, timeout, fs);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
fs.setStoragePolicy(testFile, COLD);
fs.satisfyStoragePolicy(testFile);
DFSTestUtil.waitExpectedStorageType(
testFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}
}
/**
* Tests to verify SPS xattr is removed after SPS is dropped,
* expect that if the SPS is disabled/dropped, the SPS
* xattr should be removed accordingly:
* 1. satisfy storage policy of file1.
* 2. drop SPS thread in block manager.
* 3. make sure sps xattr is removed.
* @throws Exception
*/
@Test(timeout = 300000000)
public void testDropSPS() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(testFile, ONE_SSD);
fs.satisfyStoragePolicy(testFile);
cluster.getNamesystem().getBlockManager().getSPSManager()
.changeModeEvent(StoragePolicySatisfierMode.NONE);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
clusterShutdown();
}
}
/**
* Tests that Xattrs should be cleaned if all blocks already satisfied.
*
* @throws Exception
*/
@Test(timeout = 300000)
public void testSPSShouldNotLeakXattrIfStorageAlreadySatisfied()
throws Exception {
try {
clusterSetUp();
DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
timeout, fs);
fs.satisfyStoragePolicy(testFile);
DFSTestUtil.waitExpectedStorageType(testFileName, StorageType.DISK, 3,
timeout, fs);
// Make sure satisfy xattr has been removed.
DFSTestUtil.waitForXattrRemoved(testFileName,
XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
} finally {
clusterShutdown();
}
}
/**
* Test loading of SPS xAttrs from the edits log when satisfyStoragePolicy
* called on child file and parent directory.
* 1. Create one directory and create one child file.
* 2. Set storage policy for child file and call
* satisfyStoragePolicy.
* 3. wait for SPS to remove xAttr for file child file.
* 4. Set storage policy for parent directory and call
* satisfyStoragePolicy.
* 5. restart the namenode.
* NameNode should be started successfully.
*/
@Test(timeout = 300000)
public void testNameNodeRestartWhenSPSCalledOnChildFileAndParentDir()
throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(childFile, "COLD");
fs.satisfyStoragePolicy(childFile);
DFSTestUtil.waitExpectedStorageType(childFile.toUri().getPath(),
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
// wait for SPS to remove Xattr from file
Thread.sleep(30000);
fs.setStoragePolicy(childDir, "COLD");
fs.satisfyStoragePolicy(childDir);
try {
cluster.restartNameNodes();
} catch (Exception e) {
assertFalse(e.getMessage().contains(
"Cannot request to call satisfy storage policy"));
}
} finally {
clusterShutdown();
}
}
/**
* Test SPS when satisfyStoragePolicy called on child file and
* parent directory.
* 1. Create one parent directory and child directory.
* 2. Create some file in both the directory.
* 3. Set storage policy for parent directory and call
* satisfyStoragePolicy.
* 4. Set storage policy for child directory and call
* satisfyStoragePolicy.
* 5. restart the namenode.
* All the file blocks should satisfy the policy.
*/
@Test(timeout = 300000)
public void testSPSOnChildAndParentDirectory() throws Exception {
try {
clusterSetUp();
fs.setStoragePolicy(parentDir, "COLD");
fs.satisfyStoragePolicy(childDir);
DFSTestUtil.waitExpectedStorageType(childFileName, StorageType.ARCHIVE,
3, 30000, cluster.getFileSystem());
fs.satisfyStoragePolicy(parentDir);
DFSTestUtil.waitExpectedStorageType(parentFileName, StorageType.ARCHIVE,
3, 30000, cluster.getFileSystem());
} finally {
clusterShutdown();
}
}
/**
* Test SPS xAttr on directory. xAttr should be removed from the directory
* once all the files blocks moved to specific storage.
*/
@Test(timeout = 300000)
public void testSPSxAttrWhenSpsCalledForDir() throws Exception {
try {
clusterSetUp();
Path parent = new Path("/parent");
// create parent dir
fs.mkdirs(parent);
// create 10 child files
for (int i = 0; i < 5; i++) {
DFSTestUtil.createFile(fs, new Path(parent, "f" + i), 1024, (short) 3,
0);
}
// Set storage policy for parent directory
fs.setStoragePolicy(parent, "COLD");
// Stop one DN so we can check the SPS xAttr for directory.
DataNodeProperties stopDataNode = cluster.stopDataNode(0);
fs.satisfyStoragePolicy(parent);
// Check xAttr for parent directory
FSNamesystem namesystem = cluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode("/parent");
XAttrFeature f = inode.getXAttrFeature();
assertTrue("SPS xAttr should be exist",
f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
// check for the child, SPS xAttr should not be there
for (int i = 0; i < 5; i++) {
inode = namesystem.getFSDirectory().getINode("/parent/f" + i);
f = inode.getXAttrFeature();
assertTrue(f == null);
}
cluster.restartDataNode(stopDataNode, false);
// wait and check all the file block moved in ARCHIVE
for (int i = 0; i < 5; i++) {
DFSTestUtil.waitExpectedStorageType("/parent/f" + i,
StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem());
}
DFSTestUtil.waitForXattrRemoved("/parent", XATTR_SATISFY_STORAGE_POLICY,
namesystem, 10000);
} finally {
clusterShutdown();
}
}
/**
* Test SPS xAttr on file. xAttr should be removed from the file
* once all the blocks moved to specific storage.
*/
@Test(timeout = 300000)
public void testSPSxAttrWhenSpsCalledForFile() throws Exception {
try {
clusterSetUp();
Path file = new Path("/file");
DFSTestUtil.createFile(fs, file, 1024, (short) 3, 0);
// Set storage policy for file
fs.setStoragePolicy(file, "COLD");
// Stop one DN so we can check the SPS xAttr for file.
DataNodeProperties stopDataNode = cluster.stopDataNode(0);
fs.satisfyStoragePolicy(file);
// Check xAttr for parent directory
FSNamesystem namesystem = cluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode("/file");
XAttrFeature f = inode.getXAttrFeature();
assertTrue("SPS xAttr should be exist",
f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null);
cluster.restartDataNode(stopDataNode, false);
// wait and check all the file block moved in ARCHIVE
DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 3,
30000, cluster.getFileSystem());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
return !existingXAttrs.contains(XATTR_SATISFY_STORAGE_POLICY);
}
}, 100, 10000);
} finally {
clusterShutdown();
}
}
/**
* Restart the hole env and trigger the DataNode's heart beats.
* @throws Exception
*/
private void restartCluster() throws Exception {
cluster.restartDataNodes();
cluster.restartNameNodes();
cluster.waitActive();
cluster.triggerHeartbeats();
}
}