blob: aa42cb968d61fc21bce15408ec08bc50c1ae6b94 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.contract;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_DISTCP_JOB_ID;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.SimpleCopyListing;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Contract test suite covering a file system's integration with DistCp. The
* tests coordinate two file system instances: one "local", which is the local
* file system, and the other "remote", which is the file system implementation
* under test. The tests in the suite cover both copying from local to remote
* (e.g. a backup use case) and copying from remote to local (e.g. a restore use
* case).
* The HDFS contract test needs to be run explicitly.
*/
public abstract class AbstractContractDistCpTest
extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractDistCpTest.class);
/** Using offset to change modification time in tests. */
private static final long MODIFICATION_TIME_OFFSET = 10000;
public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
= "scale.test.distcp.file.size.kb";
public static final int DEFAULT_DISTCP_SIZE_KB = 1024;
protected static final int MB = 1024 * 1024;
/**
* Default depth for a directory tree: {@value}.
*/
protected static final int DEFAULT_DEPTH = 3;
/**
* Default width for a directory tree: {@value}.
* Total dir size is
* <pre>
* DEFAULT_WITH^DEFAULT_DEPTH
* </pre>
* So the duration of a test run grows rapidly with this value.
* This has very significant consequences for object storage runs.
*/
protected static final int DEFAULT_WIDTH = 2;
@Rule
public TestName testName = new TestName();
/**
* The timeout value is extended over the default so that large updates
* are allowed to take time, especially to remote stores.
* @return the current test timeout
*/
protected int getTestTimeoutMillis() {
return 15 * 60 * 1000;
}
private Configuration conf;
private FileSystem localFS, remoteFS;
private Path localDir, remoteDir;
private Path inputDir;
private Path inputSubDir1;
private Path inputSubDir2;
private Path inputSubDir4;
private Path inputFile1;
private Path inputFile2;
private Path inputFile3;
private Path inputFile4;
private Path inputFile5;
private Path outputDir;
private Path outputSubDir1;
private Path outputSubDir2;
private Path outputSubDir4;
private Path outputFile1;
private Path outputFile2;
private Path outputFile3;
private Path outputFile4;
private Path outputFile5;
private Path inputDirUnderOutputDir;
@Override
protected Configuration createConfiguration() {
Configuration newConf = new Configuration();
newConf.set("mapred.job.tracker", "local");
return newConf;
}
@Before
@Override
public void setup() throws Exception {
super.setup();
conf = getContract().getConf();
localFS = FileSystem.getLocal(conf);
remoteFS = getFileSystem();
// Test paths are isolated by concrete subclass name and test method name.
// All paths are fully qualified including scheme (not taking advantage of
// default file system), so if something fails, the messages will make it
// clear which paths are local and which paths are remote.
String className = getClass().getSimpleName();
String testSubDir = className + "/" + testName.getMethodName();
localDir =
localFS.makeQualified(new Path(new Path(
GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
localFS.delete(localDir, true);
mkdirs(localFS, localDir);
Path testSubPath = path(testSubDir);
remoteDir = new Path(testSubPath, "remote");
// test teardown does this, but IDE-based test debugging can skip
// that teardown; this guarantees the initial state is clean
remoteFS.delete(remoteDir, true);
}
@Override
public void teardown() throws Exception {
// if remote FS supports IOStatistics log it.
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, getRemoteFS());
super.teardown();
}
/**
* Set up both input and output fields.
* @param src source tree
* @param dest dest tree
*/
protected void initPathFields(final Path src, final Path dest) {
initInputFields(src);
initOutputFields(dest);
}
/**
* Output field setup.
* @param path path to set up
*/
protected void initOutputFields(final Path path) {
outputDir = new Path(path, "outputDir");
inputDirUnderOutputDir = new Path(outputDir, "inputDir");
outputFile1 = new Path(inputDirUnderOutputDir, "file1");
outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1");
outputFile2 = new Path(outputSubDir1, "file2");
outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2");
outputFile3 = new Path(outputSubDir2, "file3");
outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4");
outputFile4 = new Path(outputSubDir4, "file4");
outputFile5 = new Path(outputSubDir4, "file5");
}
/**
* this path setup is used across different methods (copy, update, track)
* so they are set up as fields.
* @param srcDir source directory for these to go under.
*/
protected void initInputFields(final Path srcDir) {
inputDir = new Path(srcDir, "inputDir");
inputFile1 = new Path(inputDir, "file1");
inputSubDir1 = new Path(inputDir, "subDir1");
inputFile2 = new Path(inputSubDir1, "file2");
inputSubDir2 = new Path(inputDir, "subDir2/subDir2");
inputFile3 = new Path(inputSubDir2, "file3");
inputSubDir4 = new Path(inputDir, "subDir4/subDir4");
inputFile4 = new Path(inputSubDir4, "file4");
inputFile5 = new Path(inputSubDir4, "file5");
}
protected FileSystem getLocalFS() {
return localFS;
}
protected FileSystem getRemoteFS() {
return remoteFS;
}
protected Path getLocalDir() {
return localDir;
}
protected Path getRemoteDir() {
return remoteDir;
}
@Test
public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
describe("update a deep directory structure from local to remote");
distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
}
@Test
public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
describe("update an unchanged directory structure"
+ " from local to remote; expect no copy");
Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
remoteDir);
describe("\nExecuting Update\n");
Job job = distCpUpdate(localDir, target);
assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0);
}
/**
* Assert that a counter is in a range; min and max values are inclusive.
* @param job job to query
* @param counter counter to examine
* @param min min value, if negative "no minimum"
* @param max max value, if negative "no maximum"
* @throws IOException IO problem
*/
void assertCounterInRange(Job job, Enum<?> counter, long min, long max)
throws IOException {
Counter c = job.getCounters().findCounter(counter);
long value = c.getValue();
String description =
String.format("%s value %s", c.getDisplayName(), value, false);
if (min >= 0) {
assertTrue(description + " too below minimum " + min,
value >= min);
}
if (max >= 0) {
assertTrue(description + " above maximum " + max,
value <= max);
}
}
/**
* Do a distcp from the local source to the destination filesystem.
* This is executed as part of
* {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be
* overidden or wrapped by subclasses which wish to add more assertions.
*
* Life is complicated here by the way that the src/dest paths
* on a distcp is different with -update.
* @param destDir output directory used by the initial distcp
* @return the distcp job
*/
protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
throws Exception {
describe("Now do an incremental update with deletion of missing files");
Path srcDir = inputDir;
LOG.info("Source directory = {}, dest={}", srcDir, destDir);
ContractTestUtils.assertPathsExist(localFS,
"Paths for test are wrong",
inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
modifySourceDirectories();
Job job = distCpUpdate(srcDir, destDir);
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
lsR("Updated Remote", remoteFS, destDir);
ContractTestUtils.assertPathDoesNotExist(remoteFS,
" deleted from " + inputFile1, outputFile1);
ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
ContractTestUtils.assertPathsDoNotExist(remoteFS,
"DistCP should have deleted",
outputFile3, outputFile4, outputSubDir4);
assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1);
assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
return job;
}
/**
* Run distcp -update srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdate(final Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Local to update", localFS, srcDir);
lsR("Remote before update", remoteFS, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withSkipCRC(true)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
}
/**
* Run distcp -update srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
FileSystem sourceFs, FileSystem targetFs)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Source Fs to update", sourceFs, srcDir);
lsR("Target Fs before update", targetFs, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withSkipCRC(false)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
}
/**
* Update the source directories as various tests expect,
* including adding a new file.
* @return the path to the newly created file
* @throws IOException IO failure
*/
private Path modifySourceDirectories() throws IOException {
localFS.delete(inputFile1, false);
localFS.delete(inputFile3, false);
// delete all of subdir4, so input/output file 4 & 5 will go
localFS.delete(inputSubDir4, true);
// add one new file
Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
ContractTestUtils.touch(localFS, inputFileNew1);
return inputFileNew1;
}
@Test
public void testTrackDeepDirectoryStructureToRemote() throws Exception {
describe("copy a deep directory structure from local to remote");
Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
remoteDir);
ContractTestUtils.assertIsDirectory(remoteFS, destDir);
describe("Now do an incremental update and save of missing files");
Path srcDir = inputDir;
// same path setup as in deepDirectoryStructure()
Path trackDir = new Path(localDir, "trackDir");
describe("\nDirectories\n");
lsR("Local to update", localFS, srcDir);
lsR("Remote before update", remoteFS, destDir);
ContractTestUtils.assertPathsExist(localFS,
"Paths for test are wrong",
inputFile2, inputFile3, inputFile4, inputFile5);
Path inputFileNew1 = modifySourceDirectories();
// Distcp set to track but not delete
runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir),
inputDirUnderOutputDir)
.withTrackMissing(trackDir)
.withSyncFolder(true)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
lsR("tracked udpate", remoteFS, destDir);
// new file went over
Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
ContractTestUtils.assertPathExists(localFS, "tracking directory",
trackDir);
// now read in the listings
Path sortedSourceListing = new Path(trackDir,
DistCpConstants.SOURCE_SORTED_FILE);
ContractTestUtils.assertIsFile(localFS, sortedSourceListing);
Path sortedTargetListing = new Path(trackDir,
DistCpConstants.TARGET_SORTED_FILE);
ContractTestUtils.assertIsFile(localFS, sortedTargetListing);
// deletion didn't happen
ContractTestUtils.assertPathsExist(remoteFS,
"DistCP should have retained",
outputFile2, outputFile3, outputFile4, outputSubDir4);
// now scan the table and see that things are there.
Map<String, Path> sourceFiles = new HashMap<>(10);
Map<String, Path> targetFiles = new HashMap<>(10);
try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sortedSourceListing));
SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sortedTargetListing))) {
CopyListingFileStatus copyStatus = new CopyListingFileStatus();
Text name = new Text();
while(sourceReader.next(name, copyStatus)) {
String key = name.toString();
Path path = copyStatus.getPath();
LOG.info("{}: {}", key, path);
sourceFiles.put(key, path);
}
while(targetReader.next(name, copyStatus)) {
String key = name.toString();
Path path = copyStatus.getPath();
LOG.info("{}: {}", key, path);
targetFiles.put(name.toString(), copyStatus.getPath());
}
}
// look for the new file in both lists
assertTrue("No " + outputFileNew1 + " in source listing",
sourceFiles.containsValue(inputFileNew1));
assertTrue("No " + outputFileNew1 + " in target listing",
targetFiles.containsValue(outputFileNew1));
assertTrue("No " + outputSubDir4 + " in target listing",
targetFiles.containsValue(outputSubDir4));
assertFalse("Found " + inputSubDir4 + " in source listing",
sourceFiles.containsValue(inputSubDir4));
}
public void lsR(final String description,
final FileSystem fs,
final Path dir) throws IOException {
RemoteIterator<LocatedFileStatus> files = fs.listFiles(dir, true);
LOG.info("{}: {}:", description, dir);
StringBuilder sb = new StringBuilder();
while(files.hasNext()) {
LocatedFileStatus status = files.next();
sb.append(String.format(" %s; type=%s; length=%d",
status.getPath(),
status.isDirectory()? "dir" : "file",
status.getLen()));
}
LOG.info("{}", sb);
}
@Test
public void largeFilesToRemote() throws Exception {
describe("copy multiple large files from local to remote");
largeFiles(localFS, localDir, remoteFS, remoteDir);
}
@Test
public void testDeepDirectoryStructureFromRemote() throws Exception {
describe("copy a deep directory structure from remote to local");
distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
}
@Test
public void testLargeFilesFromRemote() throws Exception {
describe("copy multiple large files from remote to local");
largeFiles(remoteFS, remoteDir, localFS, localDir);
}
@Test
public void testSetJobId() throws Exception {
describe("check jobId is set in the conf");
remoteFS.create(new Path(remoteDir, "file1")).close();
DistCpTestUtils
.assertRunDistCp(DistCpConstants.SUCCESS, remoteDir.toString(),
localDir.toString(), getDefaultCLIOptionsOrNull(), conf);
assertNotNull("DistCp job id isn't set",
conf.get(CONF_LABEL_DISTCP_JOB_ID));
}
/**
* Executes a DistCp using a file system sub-tree with multiple nesting
* levels.
* The filenames are those of the fields initialized in setup.
*
* @param srcFS source FileSystem
* @param srcDir source directory
* @param dstFS destination FileSystem
* @param dstDir destination directory
* @return the target directory of the copy
* @throws Exception if there is a failure
*/
private Path distCpDeepDirectoryStructure(FileSystem srcFS,
Path srcDir,
FileSystem dstFS,
Path dstDir) throws Exception {
initPathFields(srcDir, dstDir);
mkdirs(srcFS, inputSubDir1);
mkdirs(srcFS, inputSubDir2);
byte[] data1 = dataset(100, 33, 43);
createFile(srcFS, inputFile1, true, data1);
byte[] data2 = dataset(200, 43, 53);
createFile(srcFS, inputFile2, true, data2);
byte[] data3 = dataset(300, 53, 63);
createFile(srcFS, inputFile3, true, data3);
createFile(srcFS, inputFile4, true, dataset(400, 53, 63));
createFile(srcFS, inputFile5, true, dataset(500, 53, 63));
Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target);
ContractTestUtils.assertIsDirectory(dstFS, target);
lsR("Destination tree after distcp", dstFS, target);
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS,
new Path(target, "inputDir/subDir1/file2"), data2);
verifyFileContents(dstFS,
new Path(target, "inputDir/subDir2/subDir2/file3"), data3);
return target;
}
/**
* Executes a test using multiple large files.
*
* @param srcFS source FileSystem
* @param srcDir source directory
* @param dstFS destination FileSystem
* @param dstDir destination directory
* @throws Exception if there is a failure
*/
private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
Path dstDir) throws Exception {
int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
getDefaultDistCPSizeKb());
if (fileSizeKb < 1) {
skip("File size in " + SCALE_TEST_DISTCP_FILE_SIZE_KB + " is zero");
}
initPathFields(srcDir, dstDir);
Path largeFile1 = new Path(inputDir, "file1");
Path largeFile2 = new Path(inputDir, "file2");
Path largeFile3 = new Path(inputDir, "file3");
int fileSizeMb = fileSizeKb / 1024;
getLogger().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
createFile(srcFS, largeFile1, true, data1);
byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53);
createFile(srcFS, largeFile2, true, data2);
byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63);
createFile(srcFS, largeFile3, true, data3);
Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target);
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS, new Path(target, "inputDir/file2"), data2);
verifyFileContents(dstFS, new Path(target, "inputDir/file3"), data3);
}
/**
* Override point. What is the default distcp size
* for large files if not overridden by
* {@link #SCALE_TEST_DISTCP_FILE_SIZE_KB}.
* If 0 then, unless overridden in the configuration,
* the large file tests will not run.
* @return file size.
*/
protected int getDefaultDistCPSizeKb() {
return DEFAULT_DISTCP_SIZE_KB;
}
/**
* Executes DistCp and asserts that the job finished successfully.
* The choice of direct/indirect is based on the value of
* {@link #shouldUseDirectWrite()}.
* @param src source path
* @param dst destination path
* @throws Exception if there is a failure
*/
private void runDistCp(Path src, Path dst) throws Exception {
if (shouldUseDirectWrite()) {
runDistCpDirectWrite(src, dst);
} else {
runDistCpWithRename(src, dst);
}
}
/**
* Run the distcp job.
* @param options distcp options
* @return the job. It will have already completed.
* @throws Exception failure
*/
private Job runDistCp(final DistCpOptions options) throws Exception {
Job job = new DistCp(conf, options).execute();
assertNotNull("Unexpected null job returned from DistCp execution.", job);
assertTrue("DistCp job did not complete.", job.isComplete());
assertTrue("DistCp job did not complete successfully.", job.isSuccessful());
return job;
}
/**
* Add any standard options and then build.
* @param builder DistCp option builder
* @return the build options
*/
private DistCpOptions buildWithStandardOptions(
DistCpOptions.Builder builder) {
return builder
.withNumListstatusThreads(DistCpOptions.MAX_NUM_LISTSTATUS_THREADS)
.build();
}
/**
* Creates a directory and any ancestor directories required.
*
* @param fs FileSystem in which to create directories
* @param dir path of directory to create
* @throws Exception if there is a failure
*/
private static void mkdirs(FileSystem fs, Path dir) throws Exception {
assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir));
}
@Test
public void testDirectWrite() throws Exception {
describe("copy file from local to remote using direct write option");
if (shouldUseDirectWrite()) {
skip("not needed as all other tests use the -direct option.");
}
directWrite(localFS, localDir, remoteFS, remoteDir, true);
}
@Test
public void testNonDirectWrite() throws Exception {
describe("copy file from local to remote without using direct write " +
"option");
directWrite(localFS, localDir, remoteFS, remoteDir, false);
}
@Test
public void testDistCpWithIterator() throws Exception {
describe("Build listing in distCp using the iterator option.");
Path source = new Path(remoteDir, "src");
Path dest = new Path(localDir, "dest");
dest = localFS.makeQualified(dest);
GenericTestUtils
.createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
GenericTestUtils.LogCapturer log =
GenericTestUtils.LogCapturer.captureLogs(SimpleCopyListing.LOG);
String options = "-useiterator -update -delete" + getDefaultCLIOptions();
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), options, conf);
// Check the target listing was also done using iterator.
Assertions.assertThat(log.getOutput()).contains(
"Building listing using iterator mode for " + dest.toString());
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.describedAs("files").hasSize(getTotalFiles());
}
public int getDepth() {
return DEFAULT_DEPTH;
}
public int getWidth() {
return DEFAULT_WIDTH;
}
private int getTotalFiles() {
int totalFiles = 0;
for (int i = 1; i <= getDepth(); i++) {
totalFiles += Math.pow(getWidth(), i);
}
return totalFiles;
}
/**
* Override point: should direct write always be used?
* false by default; enable for stores where rename is slow.
* @return true if direct write should be used in all tests.
*/
protected boolean shouldUseDirectWrite() {
return false;
}
/**
* Return the default options for distcp, including,
* if {@link #shouldUseDirectWrite()} is true,
* the -direct option.
* Append or prepend this to string CLIs.
* @return default options.
*/
protected String getDefaultCLIOptions() {
return shouldUseDirectWrite()
? " -direct "
: "";
}
/**
* Return the default options for distcp, including,
* if {@link #shouldUseDirectWrite()} is true,
* the -direct option, null if there are no
* defaults.
* @return default options.
*/
protected String getDefaultCLIOptionsOrNull() {
return shouldUseDirectWrite()
? " -direct "
: null;
}
/**
* Executes a test with support for using direct write option.
*
* @param srcFS source FileSystem
* @param srcDir source directory
* @param dstFS destination FileSystem
* @param dstDir destination directory
* @param directWrite whether to use -directwrite option
* @throws Exception if there is a failure
*/
private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS,
Path dstDir, boolean directWrite) throws Exception {
initPathFields(srcDir, dstDir);
// Create 2 test files
mkdirs(srcFS, inputSubDir1);
byte[] data1 = dataset(64, 33, 43);
createFile(srcFS, inputFile1, true, data1);
byte[] data2 = dataset(200, 43, 53);
createFile(srcFS, inputFile2, true, data2);
Path target = new Path(dstDir, "outputDir");
if (directWrite) {
runDistCpDirectWrite(inputDir, target);
} else {
runDistCpWithRename(inputDir, target);
}
ContractTestUtils.assertIsDirectory(dstFS, target);
lsR("Destination tree after distcp", dstFS, target);
// Verify copied file contents
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"),
data2);
}
/**
* Run distcp -direct srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory
* @return the completed job
* @throws Exception any failure.
*/
private Job runDistCpDirectWrite(final Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp -direct from " + srcDir + " to " + destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDirectWrite(true)));
}
/**
* Run distcp srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory
* @return the completed job
* @throws Exception any failure.
*/
private Job runDistCpWithRename(Path srcDir, final Path destDir)
throws Exception {
describe("\nDistcp from " + srcDir + " to " + destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDirectWrite(false)));
}
@Test
public void testDistCpWithFile() throws Exception {
describe("Distcp only file");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
dest = localFS.makeQualified(dest);
mkdirs(localFS, localDir);
int len = 4;
int base = 0x40;
byte[] block = dataset(len, base, base + len);
ContractTestUtils.createFile(remoteFS, source, true, block);
verifyPathExists(remoteFS, "", source);
verifyPathExists(localFS, "", localDir);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), getDefaultCLIOptionsOrNull(), conf);
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.describedAs("files").hasSize(1);
verifyFileContents(localFS, dest, block);
}
@Test
public void testDistCpWithUpdateExistFile() throws Exception {
describe("Now update an existing file.");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
dest = localFS.makeQualified(dest);
int len = 4;
int base = 0x40;
byte[] block = dataset(len, base, base + len);
byte[] destBlock = dataset(len, base, base + len + 1);
ContractTestUtils.createFile(remoteFS, source, true, block);
ContractTestUtils.createFile(localFS, dest, true, destBlock);
verifyPathExists(remoteFS, "", source);
verifyPathExists(localFS, "", dest);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), "-delete -update" + getDefaultCLIOptions(), conf);
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.hasSize(1);
verifyFileContents(localFS, dest, block);
}
@Test
public void testDistCpUpdateCheckFileSkip() throws Exception {
describe("Distcp update to check file skips.");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
Path source0byte = new Path(remoteDir, "file_0byte");
Path dest0byte = new Path(localDir, "file_0byte");
dest = localFS.makeQualified(dest);
dest0byte = localFS.makeQualified(dest0byte);
// Creating a source file with certain dataset.
byte[] sourceBlock = dataset(10, 'a', 'z');
// Write the dataset.
ContractTestUtils
.writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
1024, true);
// Create 0 byte source and target files.
ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);
// Execute the distcp -update job.
Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);
// First distcp -update would normally copy the source to dest.
verifyFileContents(localFS, dest, sourceBlock);
// Verify 1 file was skipped in the distcp -update (The 0 byte file).
// Verify 1 file was copied in the distcp -update (The new source file).
verifySkipAndCopyCounter(job, 1, 1);
// Remove the source file and replace with a file with same name and size
// but different content.
remoteFS.delete(source, false);
Path updatedSource = new Path(remoteDir, "file");
byte[] updatedSourceBlock = dataset(10, 'b', 'z');
ContractTestUtils.writeDataset(remoteFS, updatedSource,
updatedSourceBlock, updatedSourceBlock.length, 1024, true);
// For testing purposes we would take the modification time of the
// updated Source file and add an offset or subtract the offset and set
// that time as the modification time for target file, this way we can
// ensure that our test can emulate a scenario where source is either more
// recently changed after -update so that copy takes place or target file
// is more recently changed which would skip the copying since the source
// has not been recently updated.
FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
long modTimeSourceUpd = fsSourceUpd.getModificationTime();
// Add by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
localFS.setTimes(dest, newTargetModTimeNew, -1);
// Execute the distcp -update job.
Job updatedSourceJobOldSrc =
distCpUpdateWithFs(remoteDir, localDir, remoteFS,
localFS);
// File contents should remain same since the mod time for target is
// newer than the updatedSource which indicates that the sync happened
// more recently and there is no update.
verifyFileContents(localFS, dest, sourceBlock);
// Skipped both 0 byte file and sourceFile (since mod time of target is
// older than the source it is perceived that source is of older version
// and we can skip it's copy).
verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);
// Subtract by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeOld =
Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
localFS.setTimes(dest, newTargetModTimeOld, -1);
// Execute the distcp -update job.
Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
remoteFS,
localFS);
// Verifying the target directory have both 0 byte file and the content
// file.
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
.hasSize(2);
// Now the copy should take place and the file contents should change
// since the mod time for target is older than the source file indicating
// that there was an update to the source after the last sync took place.
verifyFileContents(localFS, dest, updatedSourceBlock);
// Verifying we skipped the 0 byte file and copied the updated source
// file (since the modification time of the new source is older than the
// target now).
verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
}
/**
* Method to check the skipped and copied counters of a distcp job.
*
* @param job job to check.
* @param skipExpectedValue expected skip counter value.
* @param copyExpectedValue expected copy counter value.
* @throws IOException throw in case of failures.
*/
private void verifySkipAndCopyCounter(Job job,
int skipExpectedValue, int copyExpectedValue) throws IOException {
// get the skip and copy counters from the job.
long skipActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.SKIP).getValue();
long copyActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.COPY).getValue();
// Verify if the actual values equals the expected ones.
assertEquals("Mismatch in COPY counter value", copyExpectedValue,
copyActualValue);
assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
skipActualValue);
}
}