blob: 62940f64b34889ef9836122969df884b2401272b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.security.Credentials;
import org.junit.*;
import java.io.IOException;
import java.util.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH;
import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
public class TestCopyCommitter {
private static final Logger LOG = LoggerFactory.getLogger(TestCopyCommitter.class);
private static final Random rand = new Random();
private static final long BLOCK_SIZE = 1024;
private static final Credentials CREDENTIALS = new Credentials();
public static final int PORT = 39737;
private static Configuration clusterConfig;
private static MiniDFSCluster cluster;
private Configuration config;
private static Job getJobForClient() throws IOException {
Job job = Job.getInstance(new Configuration());
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
job.setInputFormatClass(NullInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
return job;
}
@BeforeClass
public static void create() throws IOException {
clusterConfig = getJobForClient().getConfiguration();
clusterConfig.setLong(
DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
clusterConfig.setLong(
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
clusterConfig.setLong(
DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
cluster = new MiniDFSCluster.Builder(clusterConfig)
.numDataNodes(1)
.format(true)
.build();
}
@AfterClass
public static void destroy() {
if (cluster != null) {
cluster.shutdown();
}
}
@Before
public void createMetaFolder() throws IOException {
config = new Configuration(clusterConfig);
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
Path meta = new Path("/meta");
cluster.getFileSystem().mkdirs(meta);
}
@After
public void cleanupMetaFolder() throws IOException {
Path meta = new Path("/meta");
if (cluster.getFileSystem().exists(meta)) {
cluster.getFileSystem().delete(meta, true);
Assert.fail("Expected meta folder to be deleted");
}
}
@Test
public void testNoCommitAction() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
committer.commitJob(jobContext);
Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
//Test for idempotent commit
committer.commitJob(jobContext);
Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
}
@Test
public void testPreserveStatus() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
FsPermission sourcePerm = new FsPermission((short) 511);
FsPermission initialPerm = new FsPermission((short) 448);
sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.preserve(FileAttribute.PERMISSION).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + rand.nextLong());
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
//Test for idempotent commit
committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
}
}
@Test
public void testPreserveStatusWithAtomicCommit() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String workBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
FsPermission sourcePerm = new FsPermission((short) 511);
FsPermission initialPerm = new FsPermission((short) 448);
sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
workBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
targetBase = "/tmp1/" + rand.nextLong();
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.preserve(FileAttribute.PERMISSION).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + rand.nextLong());
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
committer.commitJob(jobContext);
checkDirectoryPermissions(fs, targetBase, sourcePerm);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
}
}
@Test
public void testDeleteMissing() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
fs.rename(new Path(targetBaseAdd), new Path(targetBase));
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
//Test for idempotent commit
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
@Test
public void testDeleteMissingWithOnlyFile() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext
.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
sourceBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs,
FsPermission.getDefault());
targetBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs,
FsPermission.getDefault());
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path(targetBase))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path(sourceBase);
listing.buildListing(listingFile, context);
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
//Test for idempotent commit
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
verifyFoldersAreInSync(fs, sourceBase, targetBase);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
// for HDFS-14621, should preserve times after -delete
@Test
public void testPreserveTimeWithDeleteMiss() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
String sourceBase = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
String targetBase = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
String targetBaseAdd = TestDistCpUtils.createTestSetup(
fs, FsPermission.getDefault());
fs.rename(new Path(targetBaseAdd), new Path(targetBase));
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true)
.preserve(FileAttribute.TIMES).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
Path sourceListing = new Path(
conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(sourceListing));
Path targetRoot = new Path(targetBase);
committer.commitJob(jobContext);
checkDirectoryTimes(fs, sourceReader, targetRoot);
//Test for idempotent commit
committer.commitJob(jobContext);
checkDirectoryTimes(fs, sourceReader, targetRoot);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
@Test
public void testDeleteMissingFlatInterleavedFiles() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
createFile(fs, sourceBase + "/1");
createFile(fs, sourceBase + "/3");
createFile(fs, sourceBase + "/4");
createFile(fs, sourceBase + "/5");
createFile(fs, sourceBase + "/7");
createFile(fs, sourceBase + "/8");
createFile(fs, sourceBase + "/9");
createFile(fs, targetBase + "/2");
createFile(fs, targetBase + "/4");
createFile(fs, targetBase + "/5");
createFile(fs, targetBase + "/7");
createFile(fs, targetBase + "/9");
createFile(fs, targetBase + "/A");
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
.withSyncFolder(true).withDeleteMissing(true).build();
options.appendToConf(conf);
final DistCpContext context = new DistCpContext(options);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
//Test for idempotent commit
committer.commitJob(jobContext);
verifyFoldersAreInSync(fs, targetBase, sourceBase);
Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
}
}
@Test
public void testAtomicCommitMissingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
fs.mkdirs(new Path(workPath));
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
assertPathExists(fs, "Work path", new Path(workPath));
assertPathDoesNotExist(fs, "Final path", new Path(finalPath));
committer.commitJob(jobContext);
assertPathDoesNotExist(fs, "Work path", new Path(workPath));
assertPathExists(fs, "Final path", new Path(finalPath));
//Test for idempotent commit
committer.commitJob(jobContext);
assertPathDoesNotExist(fs, "Work path", new Path(workPath));
assertPathExists(fs, "Final path", new Path(finalPath));
} finally {
TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false);
}
}
@Test
public void testAtomicCommitExistingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
FileSystem fs = null;
try {
OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
fs = FileSystem.get(conf);
fs.mkdirs(new Path(workPath));
fs.mkdirs(new Path(finalPath));
conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
assertPathExists(fs, "Work path", new Path(workPath));
assertPathExists(fs, "Final path", new Path(finalPath));
try {
committer.commitJob(jobContext);
Assert.fail("Should not be able to atomic-commit to pre-existing path.");
} catch(Exception exception) {
assertPathExists(fs, "Work path", new Path(workPath));
assertPathExists(fs, "Final path", new Path(finalPath));
LOG.info("Atomic-commit Test pass.");
}
} finally {
TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false);
}
}
@Test
public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException {
testCommitWithChecksumMismatch(true);
}
@Test
public void testCommitWithChecksumMismatchWithoutSkipCrc()
throws IOException {
testCommitWithChecksumMismatch(false);
}
private void testCommitWithChecksumMismatch(boolean skipCrc)
throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(
taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
Configuration conf = jobContext.getConfiguration();
String sourceBase;
String targetBase;
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
int blocksPerChunk = 5;
String srcFilename = "/srcdata";
createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase,
srcFilename, blocksPerChunk);
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)),
new Path("/out"))
.withBlocksPerChunk(blocksPerChunk)
.withCRC(skipCrc)
.build();
options.appendToConf(conf);
conf.setBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
DistCpContext context = new DistCpContext(options);
context.setTargetPathExists(false);
CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
Path listingFile = new Path("/tmp1/"
+ String.valueOf(rand.nextLong()));
listing.buildListing(listingFile, context);
conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
OutputCommitter committer = new CopyCommitter(
null, taskAttemptContext);
try {
committer.commitJob(jobContext);
if (!skipCrc) {
Assert.fail("Expected commit to fail");
}
Path sourcePath = new Path(sourceBase + srcFilename);
CopyListingFileStatus sourceCurrStatus =
new CopyListingFileStatus(fs.getFileStatus(sourcePath));
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
fs, new Path(sourceBase + srcFilename), null,
fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()));
} catch(IOException exception) {
if (skipCrc) {
LOG.error("Unexpected exception is found", exception);
throw exception;
}
Throwable cause = exception.getCause();
GenericTestUtils.assertExceptionContains(
"Checksum mismatch", cause);
}
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
TestDistCpUtils.delete(fs, "/meta");
}
}
/**
* Create a source file and its DistCp working files with different checksum
* to test the checksum validation for copying blocks in parallel.
*
* For the ease of construction, it assumes a source file can be broken down
* into 2 working files (or 2 chunks).
*
* So for a source file with length =
* BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2,
* its 1st working file will have length =
* BLOCK_SIZE * blocksPerChunk,
* then the 2nd working file will have length =
* BLOCK_SIZE / 2.
* And the working files are generated with a different seed to mimic
* same length but different checksum scenario.
*
* @param fs the FileSystem
* @param targetBase the path to the working files
* @param sourceBase the path to a source file
* @param filename the filename to copy and work on
* @param blocksPerChunk the blocks per chunk config that enables copying
* blocks in parallel
* @throws IOException when it fails to create files
*/
private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs,
String targetBase,
String sourceBase,
String filename,
int blocksPerChunk)
throws IOException {
long srcSeed = System.currentTimeMillis();
long dstSeed = srcSeed + rand.nextLong();
int bufferLen = 128;
short replFactor = 2;
Path srcData = new Path(sourceBase + filename);
// create data with 2 chunks: the 2nd chunk has half of the block size
long firstChunkLength = BLOCK_SIZE * blocksPerChunk;
long secondChunkLength = BLOCK_SIZE / 2;
DFSTestUtil.createFile(fs, srcData,
bufferLen, firstChunkLength, BLOCK_SIZE, replFactor,
srcSeed);
DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData,
(int) secondChunkLength);
DFSTestUtil.createFile(fs, new Path(targetBase
+ filename + ".____distcpSplit____0."
+ firstChunkLength), bufferLen,
firstChunkLength, BLOCK_SIZE, replFactor, dstSeed);
DFSTestUtil.createFile(fs, new Path(targetBase
+ filename + ".____distcpSplit____"
+ firstChunkLength + "." + secondChunkLength), bufferLen,
secondChunkLength, BLOCK_SIZE, replFactor, dstSeed);
}
private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
return new TaskAttemptContextImpl(conf,
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
}
private void checkDirectoryPermissions(FileSystem fs, String targetBase,
FsPermission sourcePerm) throws IOException {
Path base = new Path(targetBase);
Stack<Path> stack = new Stack<>();
stack.push(base);
while (!stack.isEmpty()) {
Path file = stack.pop();
if (!fs.exists(file)) continue;
FileStatus[] fStatus = fs.listStatus(file);
if (fStatus == null || fStatus.length == 0) continue;
for (FileStatus status : fStatus) {
if (status.isDirectory()) {
stack.push(status.getPath());
Assert.assertEquals(sourcePerm, status.getPermission());
}
}
}
}
private void checkDirectoryTimes(
FileSystem fs, SequenceFile.Reader sourceReader, Path targetRoot)
throws IOException {
try {
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
// Iterate over every source path that was copied.
while (sourceReader.next(srcRelPath, srcFileStatus)) {
Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
FileStatus targetStatus = fs.getFileStatus(targetFile);
Assert.assertEquals(srcFileStatus.getModificationTime(),
targetStatus.getModificationTime());
Assert.assertEquals(srcFileStatus.getAccessTime(),
targetStatus.getAccessTime());
}
} finally {
IOUtils.closeStream(sourceReader);
}
}
private static class NullInputFormat extends InputFormat {
@Override
public List getSplits(JobContext context)
throws IOException, InterruptedException {
return Collections.emptyList();
}
@Override
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
return null;
}
}
}