blob: 8e60540126b72a801d22247d09846be5d73425a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.shell;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.Assert;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.Assert.assertEquals;
/**
* Test for copyFromLocal.
*/
public class TestCopyFromLocal {
private static final String FROM_DIR_NAME = "fromDir";
private static final String TO_DIR_NAME = "toDir";
private static FileSystem fs;
private static Path testDir;
private static Configuration conf;
public static int initialize(Path dir) throws Exception {
fs.mkdirs(dir);
Path fromDirPath = new Path(dir, FROM_DIR_NAME);
fs.mkdirs(fromDirPath);
Path toDirPath = new Path(dir, TO_DIR_NAME);
fs.mkdirs(toDirPath);
int numTotalFiles = 0;
int numDirs = RandomUtils.nextInt(5);
for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
fs.mkdirs(subDirPath);
int numFiles = RandomUtils.nextInt(10);
for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
numTotalFiles++;
Path subFile = new Path(subDirPath, "file" + fileCount);
fs.createNewFile(subFile);
FSDataOutputStream output = fs.create(subFile, true);
for(int i = 0; i < 100; ++i) {
output.writeInt(i);
output.writeChar('\n');
}
output.close();
}
}
return numTotalFiles;
}
@BeforeClass
public static void init() throws Exception {
conf = new Configuration(false);
conf.set("fs.file.impl", LocalFileSystem.class.getName());
fs = FileSystem.getLocal(conf);
testDir = new FileSystemTestHelper().getTestRootPath(fs);
// don't want scheme on the path, just an absolute path
testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
FileSystem.setDefaultUri(conf, fs.getUri());
fs.setWorkingDirectory(testDir);
}
@AfterClass
public static void cleanup() throws Exception {
fs.delete(testDir, true);
fs.close();
}
private void run(CommandWithDestination cmd, String... args) {
cmd.setConf(conf);
assertEquals(0, cmd.run(args));
}
@Test(timeout = 10000)
public void testCopyFromLocal() throws Exception {
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
TestCopyFromLocal.initialize(dir);
run(new TestMultiThreadedCopy(1, 0),
new Path(dir, FROM_DIR_NAME).toString(),
new Path(dir, TO_DIR_NAME).toString());
}
@Test(timeout = 10000)
public void testCopyFromLocalWithThreads() throws Exception {
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
int numFiles = TestCopyFromLocal.initialize(dir);
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
int randThreads = RandomUtils.nextInt(maxThreads - 1) + 1;
String numThreads = Integer.toString(randThreads);
run(new TestMultiThreadedCopy(randThreads,
randThreads == 1 ? 0 : numFiles), "-t", numThreads,
new Path(dir, FROM_DIR_NAME).toString(),
new Path(dir, TO_DIR_NAME).toString());
}
@Test(timeout = 10000)
public void testCopyFromLocalWithThreadWrong() throws Exception {
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
int numFiles = TestCopyFromLocal.initialize(dir);
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
String numThreads = Integer.toString(maxThreads * 2);
run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads,
new Path(dir, FROM_DIR_NAME).toString(),
new Path(dir, TO_DIR_NAME).toString());
}
@Test(timeout = 10000)
public void testCopyFromLocalWithZeroThreads() throws Exception {
Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
TestCopyFromLocal.initialize(dir);
run(new TestMultiThreadedCopy(1, 0), "-t", "0",
new Path(dir, FROM_DIR_NAME).toString(),
new Path(dir, TO_DIR_NAME).toString());
}
private class TestMultiThreadedCopy extends CopyFromLocal {
public static final String NAME = "testCopyFromLocal";
private int expectedThreads;
private int expectedCompletedTaskCount;
TestMultiThreadedCopy(int expectedThreads,
int expectedCompletedTaskCount) {
this.expectedThreads = expectedThreads;
this.expectedCompletedTaskCount = expectedCompletedTaskCount;
}
@Override
protected void processArguments(LinkedList<PathData> args)
throws IOException {
// Check if the correct number of threads are spawned
Assert.assertEquals(expectedThreads, getNumThreads());
super.processArguments(args);
// Once the copy is complete, check following
// 1) number of completed tasks are same as expected
// 2) There are no active tasks in the executor
// 3) Executor has shutdown correctly
ThreadPoolExecutor executor = getExecutor();
Assert.assertEquals(expectedCompletedTaskCount,
executor.getCompletedTaskCount());
Assert.assertEquals(0, executor.getActiveCount());
Assert.assertTrue(executor.isTerminated());
}
}
}