blob: f15d0d4c5cf397d314f46eba4bd0dc7c6a7e150e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@RunWith(value = Parameterized.class)
public class TestIntegration {
private static final Log LOG = LogFactory.getLog(TestIntegration.class);
private static FileSystem fs;
private static Path listFile;
private static Path target;
private static String root;
private int numListstatusThreads;
public TestIntegration(int numListstatusThreads) {
this.numListstatusThreads = numListstatusThreads;
}
@Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 } };
return Arrays.asList(data);
}
private static Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
return conf;
}
@BeforeClass
public static void setup() {
try {
fs = FileSystem.get(getConf());
root = new Path("target/TestIntegration").makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString();
listFile = new Path(root, "listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
target = new Path(root, "target").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
TestDistCpUtils.delete(fs, root);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
}
}
@Test(timeout=100000)
public void testSingleFileMissingTarget() {
caseSingleFileMissingTarget(false);
caseSingleFileMissingTarget(true);
}
private void caseSingleFileMissingTarget(boolean sync) {
try {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1");
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testSingleFileTargetFile() {
caseSingleFileTargetFile(false);
caseSingleFileTargetFile(true);
}
private void caseSingleFileTargetFile(boolean sync) {
try {
addEntries(listFile, "singlefile1/file1");
createFiles("singlefile1/file1", "target");
runTest(listFile, target, false, sync);
checkResult(target, 1);
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testSingleFileTargetDir() {
caseSingleFileTargetDir(false);
caseSingleFileTargetDir(true);
}
private void caseSingleFileTargetDir(boolean sync) {
try {
addEntries(listFile, "singlefile2/file2");
createFiles("singlefile2/file2");
mkdirs(target.toString());
runTest(listFile, target, true, sync);
checkResult(target, 1, "file2");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testSingleDirTargetMissing() {
caseSingleDirTargetMissing(false);
caseSingleDirTargetMissing(true);
}
private void caseSingleDirTargetMissing(boolean sync) {
try {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false, sync);
checkResult(target, 1, "dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testSingleDirTargetPresent() {
try {
addEntries(listFile, "singledir");
mkdirs(root + "/singledir/dir1");
mkdirs(target.toString());
runTest(listFile, target, true, false);
checkResult(target, 1, "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testUpdateSingleDirTargetPresent() {
try {
addEntries(listFile, "Usingledir");
mkdirs(root + "/Usingledir/Udir1");
mkdirs(target.toString());
runTest(listFile, target, true, true);
checkResult(target, 1, "Udir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testMultiFileTargetPresent() {
caseMultiFileTargetPresent(false);
caseMultiFileTargetPresent(true);
}
private void caseMultiFileTargetPresent(boolean sync) {
try {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString());
runTest(listFile, target, true, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testMultiFileTargetMissing() {
caseMultiFileTargetMissing(false);
caseMultiFileTargetMissing(true);
}
private void caseMultiFileTargetMissing(boolean sync) {
try {
addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
runTest(listFile, target, false, sync);
checkResult(target, 3, "file3", "file4", "file5");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testMultiDirTargetPresent() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(target.toString(), root + "/singledir/dir1");
runTest(listFile, target, true, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testUpdateMultiDirTargetPresent() {
try {
addEntries(listFile, "Umultifile", "Usingledir");
createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
mkdirs(target.toString(), root + "/Usingledir/Udir1");
runTest(listFile, target, true, true);
checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testMultiDirTargetMissing() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4",
"multifile/file5", "singledir/dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testUpdateMultiDirTargetMissing() {
try {
addEntries(listFile, "multifile", "singledir");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
mkdirs(root + "/singledir/dir1");
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir1");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
}
}
@Test(timeout=100000)
public void testDeleteMissingInDestination() {
try {
addEntries(listFile, "srcdir");
createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, true, true, false);
checkResult(target, 1, "file1");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testOverwrite() {
byte[] contents1 = "contents1".getBytes();
byte[] contents2 = "contents2".getBytes();
Assert.assertEquals(contents1.length, contents2.length);
try {
addEntries(listFile, "srcdir");
createWithContents("srcdir/file1", contents1);
createWithContents("dstdir/file1", contents2);
Path target = new Path(root + "/dstdir");
runTest(listFile, target, false, false, false, true);
checkResult(target, 1, "file1");
// make sure dstdir/file1 has been overwritten with the contents
// of srcdir/file1
FSDataInputStream is = fs.open(new Path(root + "/dstdir/file1"));
byte[] dstContents = new byte[contents1.length];
is.readFully(dstContents);
is.close();
Assert.assertArrayEquals(contents1, dstContents);
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testGlobTargetMissingSingleLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, false, false);
checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
"singledir/dir2/file6");
} catch (IOException e) {
LOG.error("Exception encountered while testing distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testUpdateGlobTargetMissingSingleLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir/dir2/file6");
runTest(listFile, target, false, true);
checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testGlobTargetMissingMultiLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*/*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, false, false);
checkResult(target, 4, "file3", "file4", "file5",
"dir3/file7", "dir3/file8", "dir3/file9");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testUpdateGlobTargetMissingMultiLevel() {
try {
Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
addEntries(listFile, "*/*");
createFiles("multifile/file3", "multifile/file4", "multifile/file5");
createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
"singledir1/dir3/file9");
runTest(listFile, target, false, true);
checkResult(target, 6, "file3", "file4", "file5",
"file7", "file8", "file9");
} catch (IOException e) {
LOG.error("Exception encountered while running distcp", e);
Assert.fail("distcp failure");
} finally {
TestDistCpUtils.delete(fs, root);
TestDistCpUtils.delete(fs, "target/tmp1");
}
}
@Test(timeout=100000)
public void testCleanup() {
try {
Path sourcePath = new Path("noscheme:///file");
List<Path> sources = new ArrayList<Path>();
sources.add(sourcePath);
DistCpOptions options = new DistCpOptions(sources, target);
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(conf), conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
try {
new DistCp(conf, options).execute();
} catch (Throwable t) {
Assert.assertEquals(stagingDir.getFileSystem(conf).
listStatus(stagingDir).length, 0);
}
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("testCleanup failed " + e.getMessage());
}
}
private void addEntries(Path listFile, String... entries) throws IOException {
OutputStream out = fs.create(listFile);
try {
for (String entry : entries){
out.write((root + "/" + entry).getBytes());
out.write("\n".getBytes());
}
} finally {
out.close();
}
}
private void createFiles(String... entries) throws IOException {
for (String entry : entries){
OutputStream out = fs.create(new Path(root + "/" + entry));
try {
out.write((root + "/" + entry).getBytes());
out.write("\n".getBytes());
} finally {
out.close();
}
}
}
private void createWithContents(String entry, byte[] contents) throws IOException {
OutputStream out = fs.create(new Path(root + "/" + entry));
try {
out.write(contents);
} finally {
out.close();
}
}
private void mkdirs(String... entries) throws IOException {
for (String entry : entries){
fs.mkdirs(new Path(entry));
}
}
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync) throws IOException {
runTest(listFile, target, targetExists, sync, false, false);
}
private void runTest(Path listFile, Path target, boolean targetExists,
boolean sync, boolean delete,
boolean overwrite) throws IOException {
DistCpOptions options = new DistCpOptions(listFile, target);
options.setSyncFolder(sync);
options.setDeleteMissing(delete);
options.setOverwrite(overwrite);
options.setTargetPathExists(targetExists);
options.setNumListstatusThreads(numListstatusThreads);
try {
new DistCp(getConf(), options).execute();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new IOException(e);
}
}
private void checkResult(Path target, int count, String... relPaths) throws IOException {
Assert.assertEquals(count, fs.listStatus(target).length);
if (relPaths == null || relPaths.length == 0) {
Assert.assertTrue(target.toString(), fs.exists(target));
return;
}
for (String relPath : relPaths) {
Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath)));
}
}
}