blob: ec565c36d874061cc838f383c72100fc2b256eb5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.fedbalance;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure.Stage;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure.RetryException;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.Random;
import static junit.framework.TestCase.assertTrue;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.CURRENT_SNAPSHOT_NAME;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.LAST_SNAPSHOT_NAME;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.TrashOption;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
* Test DistCpProcedure.
*/
public class TestDistCpProcedure {
private static MiniDFSCluster cluster;
private static Configuration conf;
static final String MOUNT = "mock_mount_point";
private static final String SRCDAT = "srcdat";
private static final String DSTDAT = "dstdat";
private static final long BLOCK_SIZE = 1024;
private static final long FILE_SIZE = BLOCK_SIZE * 100;
private FileEntry[] srcfiles =
{new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/a", false),
new FileEntry(SRCDAT + "/b", true),
new FileEntry(SRCDAT + "/b/c", false)};
private static String nnUri;
@BeforeClass
public static void beforeClass() throws IOException {
DistCpProcedure.enabledForTest = true;
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
String workPath =
"hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
conf.set(SCHEDULER_JOURNAL_URI, workPath);
nnUri = FileSystem.getDefaultUri(conf).toString();
}
@AfterClass
public static void afterClass() {
DistCpProcedure.enabledForTest = false;
if (cluster != null) {
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testSuccessfulDistCpProcedure() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FsPermission originalPerm = new FsPermission(777);
fs.setPermission(src, originalPerm);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
scheduler.init(true);
BalanceJob balanceJob =
new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
scheduler.submit(balanceJob);
scheduler.waitUntilDone(balanceJob);
assertTrue(balanceJob.isJobDone());
if (balanceJob.getError() != null) {
throw balanceJob.getError();
}
assertNull(balanceJob.getError());
assertTrue(fs.exists(dst));
assertFalse(
fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(
fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
for (FileEntry e : srcfiles) { // verify file len.
if (!e.isDir) {
Path targetFile = new Path(testRoot, e.path.replace(SRCDAT, DSTDAT));
assertEquals(FILE_SIZE, fs.getFileStatus(targetFile).getLen());
}
}
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testInitDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
// set permission.
fs.setPermission(src, FsPermission.createImmutable((short) 020));
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
// submit distcp.
try {
dcProcedure.initDistCp();
} catch (RetryException e) {
}
fs.delete(new Path(src, "a"), true);
// wait until job done.
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
assertTrue(fs.exists(dst));
// Because we used snapshot, the file should be copied.
assertTrue(fs.exists(new Path(dst, "a")));
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testDiffDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
assertTrue(fs.exists(dst));
// move file out of src and test distcp.
fs.rename(new Path(src, "a"), new Path("/a"));
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertFalse(fs.exists(new Path(dst, "a")));
// move back file src/a and test distcp.
fs.rename(new Path("/a"), new Path(src, "a"));
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertTrue(fs.exists(new Path(dst, "a")));
// append file src/a and test.
OutputStream out = fs.append(new Path(src, "a"));
out.write("hello".getBytes());
out.close();
long len = fs.getFileStatus(new Path(src, "a")).getLen();
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
assertEquals(len, fs.getFileStatus(new Path(dst, "a")).getLen());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testStageFinalDistCp() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
// open files.
OutputStream out = fs.append(new Path(src, "a"));
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
executeProcedure(dcProcedure, Stage.DIFF_DISTCP,
() -> dcProcedure.initDistCp());
executeProcedure(dcProcedure, Stage.FINISH,
() -> dcProcedure.finalDistCp());
// Verify all the open files have been closed.
intercept(RemoteException.class, "LeaseExpiredException",
"Expect RemoteException(LeaseExpiredException).", () -> out.close());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testStageFinish() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
fs.mkdirs(src);
fs.mkdirs(dst);
fs.allowSnapshot(src);
fs.allowSnapshot(dst);
fs.createSnapshot(src, LAST_SNAPSHOT_NAME);
fs.createSnapshot(src, CURRENT_SNAPSHOT_NAME);
fs.createSnapshot(dst, LAST_SNAPSHOT_NAME);
FsPermission originalPerm = new FsPermission(777);
fs.setPermission(src, originalPerm);
// Test the finish stage.
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
dcProcedure.disableWrite();
dcProcedure.finish();
// Verify path and permission.
assertTrue(fs.exists(dst));
assertFalse(fs.exists(new Path(src, HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(fs.exists(new Path(dst, HdfsConstants.DOT_SNAPSHOT_DIR)));
assertEquals(originalPerm, fs.getFileStatus(dst).getPermission());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testRecoveryByStage() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
final DistCpProcedure[] dcp = new DistCpProcedure[1];
dcp[0] = new DistCpProcedure("distcp-procedure", null, 1000, context);
// Doing serialization and deserialization before each stage to monitor the
// recovery.
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.INIT_DISTCP, () -> dcp[0].preCheck());
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DIFF_DISTCP, () -> dcp[0].initDistCp());
fs.delete(new Path(src, "a"), true); // make some difference.
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.DISABLE_WRITE, () -> dcp[0].diffDistCp());
dcp[0] = serializeProcedure(dcp[0]);
executeProcedure(dcp[0], Stage.FINAL_DISTCP, () -> dcp[0].disableWrite());
dcp[0] = serializeProcedure(dcp[0]);
OutputStream out = fs.append(new Path(src, "b/c"));
executeProcedure(dcp[0], Stage.FINISH, () -> dcp[0].finalDistCp());
intercept(RemoteException.class, "LeaseExpiredException",
"Expect RemoteException(LeaseExpiredException).", () -> out.close());
dcp[0] = serializeProcedure(dcp[0]);
assertTrue(dcp[0].execute());
assertTrue(fs.exists(dst));
assertFalse(
fs.exists(new Path(context.getSrc(), HdfsConstants.DOT_SNAPSHOT_DIR)));
assertFalse(
fs.exists(new Path(context.getDst(), HdfsConstants.DOT_SNAPSHOT_DIR)));
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testShutdown() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
BalanceProcedureScheduler scheduler = new BalanceProcedureScheduler(conf);
scheduler.init(true);
BalanceJob balanceJob =
new BalanceJob.Builder<>().nextProcedure(dcProcedure).build();
scheduler.submit(balanceJob);
long sleep = Math.abs(new Random().nextLong()) % 10000;
Thread.sleep(sleep);
scheduler.shutDown();
cleanup(fs, new Path(testRoot));
}
@Test(timeout = 30000)
public void testDisableWrite() throws Exception {
String testRoot = nnUri + "/user/foo/testdir." + getMethodName();
DistributedFileSystem fs =
(DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
createFiles(fs, testRoot, srcfiles);
Path src = new Path(testRoot, SRCDAT);
Path dst = new Path(testRoot, DSTDAT);
FedBalanceContext context = buildContext(src, dst, MOUNT);
DistCpProcedure dcProcedure =
new DistCpProcedure("distcp-procedure", null, 1000, context);
assertNotEquals(0, fs.getFileStatus(src).getPermission().toShort());
executeProcedure(dcProcedure, Stage.FINAL_DISTCP,
() -> dcProcedure.disableWrite());
assertEquals(0, fs.getFileStatus(src).getPermission().toShort());
cleanup(fs, new Path(testRoot));
}
private FedBalanceContext buildContext(Path src, Path dst, String mount) {
return new FedBalanceContext.Builder(src, dst, mount, conf).setMapNum(10)
.setBandwidthLimit(1).setTrash(TrashOption.TRASH).setDelayDuration(1000)
.build();
}
interface Call {
void execute() throws IOException, RetryException;
}
/**
* Execute the procedure until its stage is updated to the target stage.
*
* @param procedure the procedure to be executed and verified.
* @param target the target stage.
* @param call the function executing the procedure.
*/
private static void executeProcedure(DistCpProcedure procedure, Stage target,
Call call) throws IOException {
Stage stage = Stage.PRE_CHECK;
procedure.updateStage(stage);
while (stage != target) {
try {
call.execute();
} catch (RetryException e) {
} finally {
stage = procedure.getStage();
}
}
}
static class FileEntry {
private String path;
private boolean isDir;
FileEntry(String path, boolean isDir) {
this.path = path;
this.isDir = isDir;
}
String getPath() {
return path;
}
boolean isDirectory() {
return isDir;
}
}
/**
* Create directories and files with random data.
*
* @param fs the file system obj.
* @param topdir the base dir of the directories and files.
* @param entries the directory and file entries to be created.
*/
private void createFiles(DistributedFileSystem fs, String topdir,
FileEntry[] entries) throws IOException {
long seed = System.currentTimeMillis();
Random rand = new Random(seed);
short replicationFactor = 2;
for (FileEntry entry : entries) {
Path newPath = new Path(topdir + "/" + entry.getPath());
if (entry.isDirectory()) {
fs.mkdirs(newPath);
} else {
int bufSize = 128;
DFSTestUtil.createFile(fs, newPath, bufSize, FILE_SIZE, BLOCK_SIZE,
replicationFactor, seed);
}
seed = System.currentTimeMillis() + rand.nextLong();
}
}
private DistCpProcedure serializeProcedure(DistCpProcedure dcp)
throws IOException {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
DataOutput dataOut = new DataOutputStream(bao);
dcp.write(dataOut);
dcp = new DistCpProcedure();
dcp.readFields(
new DataInputStream(new ByteArrayInputStream(bao.toByteArray())));
return dcp;
}
private void cleanup(DistributedFileSystem dfs, Path root)
throws IOException {
Path src = new Path(root, SRCDAT);
Path dst = new Path(root, DSTDAT);
DistCpProcedure.cleanupSnapshot(dfs, src);
DistCpProcedure.cleanupSnapshot(dfs, dst);
dfs.delete(root, true);
}
}