blob: f9c2371ef232da6eb522a6e613a0ba694974d9a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>();
private static int nFiles = 0;
private static final int FILE_SIZE = 1024;
private static MiniDFSCluster cluster;
private static final String SOURCE_PATH = "/tmp/source";
private static final String TARGET_PATH = "/tmp/target";
private static Configuration configuration;
@BeforeClass
public static void setup() throws Exception {
configuration = getConfigurationForCluster();
cluster = new MiniDFSCluster.Builder(configuration)
.numDataNodes(1)
.format(true)
.build();
}
private static Configuration getConfigurationForCluster() throws IOException {
Configuration configuration = new Configuration();
System.setProperty("test.build.data", "target/tmp/build/TEST_COPY_MAPPER/data");
configuration.set("hadoop.log.dir", "target/tmp");
LOG.debug("fs.default.name == " + configuration.get("fs.default.name"));
LOG.debug("dfs.http.address == " + configuration.get("dfs.http.address"));
return configuration;
}
private static Configuration getConfiguration() throws IOException {
Configuration configuration = getConfigurationForCluster();
final FileSystem fs = cluster.getFileSystem();
Path workPath = new Path(TARGET_PATH)
.makeQualified(fs.getUri(), fs.getWorkingDirectory());
configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
workPath.toString());
configuration.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
workPath.toString());
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
false);
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
true);
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
true);
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
"br");
return configuration;
}
private static void createSourceData() throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6");
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9");
}
private static void mkdirs(String path) throws Exception {
FileSystem fileSystem = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
fileSystem.getWorkingDirectory());
pathList.add(qualifiedPath);
fileSystem.mkdirs(qualifiedPath);
}
private static void touchFile(String path) throws Exception {
FileSystem fs;
DataOutputStream outputStream = null;
try {
fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory());
final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2;
outputStream = fs.create(qualifiedPath, true, 0,
(short)(fs.getDefaultReplication(qualifiedPath)*2),
blockSize);
outputStream.write(new byte[FILE_SIZE]);
pathList.add(qualifiedPath);
++nFiles;
FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getReplication());
}
finally {
IOUtils.cleanup(null, outputStream);
}
}
@Test
public void testRun() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
copyMapper.setup(context);
for (Path path: pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fs.getFileStatus(path), context);
}
// Check that the maps worked.
for (Path path : pathList) {
final Path targetPath = new Path(path.toString()
.replaceAll(SOURCE_PATH, TARGET_PATH));
Assert.assertTrue(fs.exists(targetPath));
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
Assert.assertEquals(fs.getFileStatus(path).getReplication(),
fs.getFileStatus(targetPath).getReplication());
Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
fs.getFileStatus(targetPath).getBlockSize());
Assert.assertTrue(!fs.isFile(targetPath) ||
fs.getFileChecksum(targetPath).equals(
fs.getFileChecksum(path)));
}
Assert.assertEquals(pathList.size(),
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
Assert.assertEquals(nFiles * FILE_SIZE,
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
testCopyingExistingFiles(fs, copyMapper, context);
for (Text value : stubContext.getWriter().values()) {
Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
}
}
catch (Exception e) {
LOG.error("Unexpected exception: ", e);
Assert.assertTrue(false);
}
}
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
Mapper<Text, FileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fs.getFileStatus(path), context);
}
Assert.assertEquals(nFiles,
context.getCounter(CopyMapper.Counter.SKIP).getValue());
}
catch (Exception exception) {
Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
false);
}
}
@Test
public void testMakeDirFailure() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
String workPath = new Path("hftp://localhost:1234/*/*/*/?/")
.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
configuration.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
workPath);
copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))),
fs.getFileStatus(pathList.get(0)), context);
Assert.assertTrue("There should have been an exception.", false);
}
catch (Exception ignore) {
}
}
@Test
public void testIgnoreFailures() {
doTestIgnoreFailures(true);
doTestIgnoreFailures(false);
}
@Test
public void testDirToFile() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
mkdirs(SOURCE_PATH + "/src/file");
touchFile(TARGET_PATH + "/src/file");
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
}
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
@Test
public void testPreserve() {
try {
deleteState();
createSourceData();
UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper();
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, FileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
touchFile(SOURCE_PATH + "/src/file");
mkdirs(TARGET_PATH);
cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
return FileSystem.get(configuration);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
throw new RuntimeException("Test ought to fail here");
}
}
});
tmpUser.doAs(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
Assert.fail("Expected copy to fail");
} catch (AccessControlException e) {
Assert.assertTrue("Got exception: " + e.getMessage(), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
});
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
@Test
public void testCopyReadableFiles() {
try {
deleteState();
createSourceData();
UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper();
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text, Text>.Context>() {
@Override
public Mapper<Text, FileStatus, Text, Text>.Context run() {
try {
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
return stubContext.getContext();
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
touchFile(SOURCE_PATH + "/src/file");
mkdirs(TARGET_PATH);
cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511));
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
return FileSystem.get(configuration);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
throw new RuntimeException("Test ought to fail here");
}
}
});
tmpUser.doAs(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
});
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
@Test
public void testSkipCopyNoPerms() {
try {
deleteState();
createSourceData();
UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper();
final StubContext stubContext = tmpUser.
doAs(new PrivilegedAction<StubContext>() {
@Override
public StubContext run() {
try {
return new StubContext(getConfiguration(), null, 0);
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
final Mapper<Text, FileStatus, Text, Text>.Context context = stubContext.getContext();
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
touchFile(SOURCE_PATH + "/src/file");
touchFile(TARGET_PATH + "/src/file");
cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
return FileSystem.get(configuration);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
throw new RuntimeException("Test ought to fail here");
}
}
});
tmpUser.doAs(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
Assert.assertEquals(stubContext.getWriter().values().size(), 1);
Assert.assertTrue(stubContext.getWriter().values().get(0).toString().startsWith("SKIP"));
Assert.assertTrue(stubContext.getWriter().values().get(0).toString().
contains(SOURCE_PATH + "/src/file"));
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
});
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
@Test
public void testFailCopyWithAccessControlException() {
try {
deleteState();
createSourceData();
UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper();
final StubContext stubContext = tmpUser.
doAs(new PrivilegedAction<StubContext>() {
@Override
public StubContext run() {
try {
return new StubContext(getConfiguration(), null, 0);
} catch (Exception e) {
LOG.error("Exception encountered ", e);
throw new RuntimeException(e);
}
}
});
EnumSet<DistCpOptions.FileAttribute> preserveStatus =
EnumSet.allOf(DistCpOptions.FileAttribute.class);
final Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));
touchFile(SOURCE_PATH + "/src/file");
OutputStream out = cluster.getFileSystem().create(new Path(TARGET_PATH + "/src/file"));
out.write("hello world".getBytes());
out.close();
cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
cluster.getFileSystem().setPermission(new Path(TARGET_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
return FileSystem.get(configuration);
} catch (IOException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
throw new RuntimeException("Test ought to fail here");
}
}
});
tmpUser.doAs(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
tmpFS.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
Assert.fail("Didn't expect the file to be copied");
} catch (AccessControlException ignore) {
} catch (Exception e) {
// We want to make sure the underlying cause of the exception is
// due to permissions error. The exception we're interested in is
// wrapped twice - once in RetriableCommand and again in CopyMapper
// itself.
if (e.getCause() == null || e.getCause().getCause() == null ||
!(e.getCause().getCause() instanceof AccessControlException)) {
throw new RuntimeException(e);
}
}
return null;
}
});
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
@Test
public void testFileToDir() {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
touchFile(SOURCE_PATH + "/src/file");
mkdirs(TARGET_PATH + "/src/file");
try {
copyMapper.setup(context);
copyMapper.map(new Text("/src/file"),
fs.getFileStatus(new Path(SOURCE_PATH + "/src/file")),
context);
} catch (IOException e) {
Assert.assertTrue(e.getMessage().startsWith("Can't replace"));
}
} catch (Exception e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
}
}
private void doTestIgnoreFailures(boolean ignoreFailures) {
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
configuration.setBoolean(
DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(),ignoreFailures);
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
true);
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
true);
copyMapper.setup(context);
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
if (!fileStatus.isDirectory()) {
fs.delete(path, true);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
}
}
if (ignoreFailures) {
for (Text value : stubContext.getWriter().values()) {
Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:"));
}
}
Assert.assertTrue("There should have been an exception.", ignoreFailures);
}
catch (Exception e) {
Assert.assertTrue("Unexpected exception: " + e.getMessage(),
!ignoreFailures);
e.printStackTrace();
}
}
private static void deleteState() throws IOException {
pathList.clear();
nFiles = 0;
cluster.getFileSystem().delete(new Path(SOURCE_PATH), true);
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
}
@Test
public void testPreserveBlockSizeAndReplication() {
testPreserveBlockSizeAndReplicationImpl(true);
testPreserveBlockSizeAndReplicationImpl(false);
}
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
try {
deleteState();
createSourceData();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.noneOf(DistCpOptions.FileAttribute.class);
if (preserve) {
fileAttributes.add(DistCpOptions.FileAttribute.BLOCKSIZE);
fileAttributes.add(DistCpOptions.FileAttribute.REPLICATION);
}
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context);
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
}
// Check that the block-size/replication aren't preserved.
for (Path path : pathList) {
final Path targetPath = new Path(path.toString()
.replaceAll(SOURCE_PATH, TARGET_PATH));
final FileStatus source = fs.getFileStatus(path);
final FileStatus target = fs.getFileStatus(targetPath);
if (!source.isDirectory() ) {
Assert.assertTrue(preserve ||
source.getBlockSize() != target.getBlockSize());
Assert.assertTrue(preserve ||
source.getReplication() != target.getReplication());
Assert.assertTrue(!preserve ||
source.getBlockSize() == target.getBlockSize());
Assert.assertTrue(!preserve ||
source.getReplication() == target.getReplication());
}
}
}
catch (Exception e) {
Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
e.printStackTrace();
}
}
private static void changeUserGroup(String user, String group)
throws IOException {
FileSystem fs = cluster.getFileSystem();
FsPermission changedPermission = new FsPermission(
FsAction.ALL, FsAction.ALL, FsAction.ALL
);
for (Path path : pathList)
if (fs.isFile(path)) {
fs.setOwner(path, user, group);
fs.setPermission(path, changedPermission);
}
}
/**
* If a single file is being copied to a location where the file (of the same
* name) already exists, then the file shouldn't be skipped.
*/
@Test
public void testSingleFileCopy() {
try {
deleteState();
touchFile(SOURCE_PATH + "/1");
Path sourceFilePath = pathList.get(0);
Path targetFilePath = new Path(sourceFilePath.toString().replaceAll(
SOURCE_PATH, TARGET_PATH));
touchFile(targetFilePath.toString());
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
context.getConfiguration().set(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
targetFilePath.getParent().toString()); // Parent directory.
copyMapper.setup(context);
final FileStatus sourceFileStatus = fs.getFileStatus(sourceFilePath);
long before = fs.getFileStatus(targetFilePath).getModificationTime();
copyMapper.map(new Text(DistCpUtils.getRelativePath(
new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
long after = fs.getFileStatus(targetFilePath).getModificationTime();
Assert.assertTrue("File should have been skipped", before == after);
context.getConfiguration().set(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
targetFilePath.toString()); // Specify the file path.
copyMapper.setup(context);
before = fs.getFileStatus(targetFilePath).getModificationTime();
try { Thread.sleep(2); } catch (Throwable ignore) {}
copyMapper.map(new Text(DistCpUtils.getRelativePath(
new Path(SOURCE_PATH), sourceFilePath)), sourceFileStatus, context);
after = fs.getFileStatus(targetFilePath).getModificationTime();
Assert.assertTrue("File should have been overwritten.", before < after);
} catch (Exception exception) {
Assert.fail("Unexpected exception: " + exception.getMessage());
exception.printStackTrace();
}
}
@Test
public void testPreserveUserGroup() {
testPreserveUserGroupImpl(true);
testPreserveUserGroupImpl(false);
}
private void testPreserveUserGroupImpl(boolean preserve){
try {
deleteState();
createSourceData();
changeUserGroup("Michael", "Corleone");
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.noneOf(DistCpOptions.FileAttribute.class);
if (preserve) {
fileAttributes.add(DistCpOptions.FileAttribute.USER);
fileAttributes.add(DistCpOptions.FileAttribute.GROUP);
fileAttributes.add(DistCpOptions.FileAttribute.PERMISSION);
}
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context);
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
}
// Check that the user/group attributes are preserved
// (only) as necessary.
for (Path path : pathList) {
final Path targetPath = new Path(path.toString()
.replaceAll(SOURCE_PATH, TARGET_PATH));
final FileStatus source = fs.getFileStatus(path);
final FileStatus target = fs.getFileStatus(targetPath);
if (!source.isDirectory()) {
Assert.assertTrue(!preserve || source.getOwner().equals(target.getOwner()));
Assert.assertTrue(!preserve || source.getGroup().equals(target.getGroup()));
Assert.assertTrue(!preserve || source.getPermission().equals(target.getPermission()));
Assert.assertTrue( preserve || !source.getOwner().equals(target.getOwner()));
Assert.assertTrue( preserve || !source.getGroup().equals(target.getGroup()));
Assert.assertTrue( preserve || !source.getPermission().equals(target.getPermission()));
Assert.assertTrue(source.isDirectory() ||
source.getReplication() != target.getReplication());
}
}
}
catch (Exception e) {
Assert.assertTrue("Unexpected exception: " + e.getMessage(), false);
e.printStackTrace();
}
}
}