blob: b5e0efe0649ecd49b31efa73d7291f949ff95218 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestHDFSConcat {
public static final Log LOG = LogFactory.getLog(TestHDFSConcat.class);
private static final short REPL_FACTOR = 2;
private MiniDFSCluster cluster;
private NamenodeProtocols nn;
private DistributedFileSystem dfs;
private static final long blockSize = 512;
private static final Configuration conf;
static {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
}
@Before
public void startUpCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
nn = cluster.getNameNodeRpc();
assertNotNull("Failed to get NameNode", nn);
}
@After
public void shutDownCluster() throws IOException {
if(dfs != null) {
dfs.close();
dfs = null;
}
if(cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
}
/**
* Concatenates 10 files into one
* Verifies the final size, deletion of the file, number of blocks
* @throws IOException
*/
@Test
public void testConcat() throws IOException, InterruptedException {
final int numFiles = 10;
long fileLen = blockSize*3;
HdfsFileStatus fStatus;
FSDataInputStream stm;
String trg = "/trg";
Path trgPath = new Path(trg);
DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
fStatus = nn.getFileInfo(trg);
long trgLen = fStatus.getLen();
long trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
Path [] files = new Path[numFiles];
byte[][] bytes = new byte[numFiles + 1][(int) fileLen];
LocatedBlocks [] lblocks = new LocatedBlocks[numFiles];
long [] lens = new long [numFiles];
stm = dfs.open(trgPath);
stm.readFully(0, bytes[0]);
stm.close();
int i;
for(i=0; i<files.length; i++) {
files[i] = new Path("/file"+i);
Path path = files[i];
System.out.println("Creating file " + path);
// make files with different content
DFSTestUtil.createFile(dfs, path, fileLen, REPL_FACTOR, i);
fStatus = nn.getFileInfo(path.toUri().getPath());
lens[i] = fStatus.getLen();
assertEquals(trgLen, lens[i]); // file of the same length.
lblocks[i] = nn.getBlockLocations(path.toUri().getPath(), 0, lens[i]);
//read the file
stm = dfs.open(path);
stm.readFully(0, bytes[i + 1]);
//bytes[i][10] = 10;
stm.close();
}
// check permissions -try the operation with the "wrong" user
final UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
"theDoctor", new String[] { "tardis" });
DistributedFileSystem hdfs =
(DistributedFileSystem)DFSTestUtil.getFileSystemAs(user1, conf);
try {
hdfs.concat(trgPath, files);
fail("Permission exception expected");
} catch (IOException ie) {
System.out.println("Got expected exception for permissions:"
+ ie.getLocalizedMessage());
// expected
}
// check count update
ContentSummary cBefore = dfs.getContentSummary(trgPath.getParent());
// resort file array, make INode id not sorted.
for (int j = 0; j < files.length / 2; j++) {
Path tempPath = files[j];
files[j] = files[files.length - 1 - j];
files[files.length - 1 - j] = tempPath;
byte[] tempBytes = bytes[1 + j];
bytes[1 + j] = bytes[files.length - 1 - j + 1];
bytes[files.length - 1 - j + 1] = tempBytes;
}
// now concatenate
dfs.concat(trgPath, files);
// verify count
ContentSummary cAfter = dfs.getContentSummary(trgPath.getParent());
assertEquals(cBefore.getFileCount(), cAfter.getFileCount()+files.length);
// verify other stuff
long totalLen = trgLen;
long totalBlocks = trgBlocks;
for(i=0; i<files.length; i++) {
totalLen += lens[i];
totalBlocks += lblocks[i].locatedBlockCount();
}
System.out.println("total len=" + totalLen + "; totalBlocks=" + totalBlocks);
fStatus = nn.getFileInfo(trg);
trgLen = fStatus.getLen(); // new length
// read the resulting file
stm = dfs.open(trgPath);
byte[] byteFileConcat = new byte[(int)trgLen];
stm.readFully(0, byteFileConcat);
stm.close();
trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
//verifications
// 1. number of blocks
assertEquals(trgBlocks, totalBlocks);
// 2. file lengths
assertEquals(trgLen, totalLen);
// 3. removal of the src file
for(Path p: files) {
fStatus = nn.getFileInfo(p.toUri().getPath());
assertNull("File " + p + " still exists", fStatus); // file shouldn't exist
// try to create fie with the same name
DFSTestUtil.createFile(dfs, p, fileLen, REPL_FACTOR, 1);
}
// 4. content
checkFileContent(byteFileConcat, bytes);
// add a small file (less then a block)
Path smallFile = new Path("/sfile");
int sFileLen = 10;
DFSTestUtil.createFile(dfs, smallFile, sFileLen, REPL_FACTOR, 1);
dfs.concat(trgPath, new Path [] {smallFile});
fStatus = nn.getFileInfo(trg);
trgLen = fStatus.getLen(); // new length
// check number of blocks
trgBlocks = nn.getBlockLocations(trg, 0, trgLen).locatedBlockCount();
assertEquals(trgBlocks, totalBlocks+1);
// and length
assertEquals(trgLen, totalLen+sFileLen);
}
/**
* Test that the concat operation is properly persisted in the
* edit log, and properly replayed on restart.
*/
@Test
public void testConcatInEditLog() throws Exception {
final Path TEST_DIR = new Path("/testConcatInEditLog");
final long FILE_LEN = blockSize;
// 1. Concat some files
Path[] srcFiles = new Path[3];
for (int i = 0; i < srcFiles.length; i++) {
Path path = new Path(TEST_DIR, "src-" + i);
DFSTestUtil.createFile(dfs, path, FILE_LEN, REPL_FACTOR, 1);
srcFiles[i] = path;
}
Path targetFile = new Path(TEST_DIR, "target");
DFSTestUtil.createFile(dfs, targetFile, FILE_LEN, REPL_FACTOR, 1);
dfs.concat(targetFile, srcFiles);
// 2. Verify the concat operation basically worked, and record
// file status.
assertTrue(dfs.exists(targetFile));
FileStatus origStatus = dfs.getFileStatus(targetFile);
// 3. Restart NN to force replay from edit log
cluster.restartNameNode(true);
// 4. Verify concat operation was replayed correctly and file status
// did not change.
assertTrue(dfs.exists(targetFile));
assertFalse(dfs.exists(srcFiles[0]));
FileStatus statusAfterRestart = dfs.getFileStatus(targetFile);
assertEquals(origStatus.getModificationTime(),
statusAfterRestart.getModificationTime());
}
// compare content
private void checkFileContent(byte[] concat, byte[][] bytes ) {
int idx=0;
boolean mismatch = false;
for(byte [] bb: bytes) {
for(byte b: bb) {
if(b != concat[idx++]) {
mismatch=true;
break;
}
}
if(mismatch)
break;
}
assertFalse("File content of concatenated file is different", mismatch);
}
// test case when final block is not of a full length
@Test
public void testConcatNotCompleteBlock() throws IOException {
long trgFileLen = blockSize*3;
long srcFileLen = blockSize*3+20; // block at the end - not full
// create first file
String name1="/trg", name2="/src";
Path filePath1 = new Path(name1);
DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
HdfsFileStatus fStatus = nn.getFileInfo(name1);
long fileLen = fStatus.getLen();
assertEquals(fileLen, trgFileLen);
//read the file
FSDataInputStream stm = dfs.open(filePath1);
byte[] byteFile1 = new byte[(int)trgFileLen];
stm.readFully(0, byteFile1);
stm.close();
LocatedBlocks lb1 = nn.getBlockLocations(name1, 0, trgFileLen);
Path filePath2 = new Path(name2);
DFSTestUtil.createFile(dfs, filePath2, srcFileLen, REPL_FACTOR, 1);
fStatus = nn.getFileInfo(name2);
fileLen = fStatus.getLen();
assertEquals(srcFileLen, fileLen);
// read the file
stm = dfs.open(filePath2);
byte[] byteFile2 = new byte[(int)srcFileLen];
stm.readFully(0, byteFile2);
stm.close();
LocatedBlocks lb2 = nn.getBlockLocations(name2, 0, srcFileLen);
System.out.println("trg len="+trgFileLen+"; src len="+srcFileLen);
// move the blocks
dfs.concat(filePath1, new Path [] {filePath2});
long totalLen = trgFileLen + srcFileLen;
fStatus = nn.getFileInfo(name1);
fileLen = fStatus.getLen();
// read the resulting file
stm = dfs.open(filePath1);
byte[] byteFileConcat = new byte[(int)fileLen];
stm.readFully(0, byteFileConcat);
stm.close();
LocatedBlocks lbConcat = nn.getBlockLocations(name1, 0, fileLen);
//verifications
// 1. number of blocks
assertEquals(lbConcat.locatedBlockCount(),
lb1.locatedBlockCount() + lb2.locatedBlockCount());
// 2. file lengths
System.out.println("file1 len="+fileLen+"; total len="+totalLen);
assertEquals(fileLen, totalLen);
// 3. removal of the src file
fStatus = nn.getFileInfo(name2);
assertNull("File "+name2+ "still exists", fStatus); // file shouldn't exist
// 4. content
checkFileContent(byteFileConcat, new byte [] [] {byteFile1, byteFile2});
}
/**
* test illegal args cases
*/
@Test
public void testIllegalArg() throws IOException {
long fileLen = blockSize*3;
Path parentDir = new Path ("/parentTrg");
assertTrue(dfs.mkdirs(parentDir));
Path trg = new Path(parentDir, "trg");
DFSTestUtil.createFile(dfs, trg, fileLen, REPL_FACTOR, 1);
// must be in the same dir
{
// create first file
Path dir1 = new Path ("/dir1");
assertTrue(dfs.mkdirs(dir1));
Path src = new Path(dir1, "src");
DFSTestUtil.createFile(dfs, src, fileLen, REPL_FACTOR, 1);
try {
dfs.concat(trg, new Path [] {src});
fail("didn't fail for src and trg in different directories");
} catch (Exception e) {
// expected
}
}
// non existing file
try {
dfs.concat(trg, new Path [] {new Path("test1/a")}); // non existing file
fail("didn't fail with invalid arguments");
} catch (Exception e) {
//expected
}
// empty arg list
try {
dfs.concat(trg, new Path [] {}); // empty array
fail("didn't fail with invalid arguments");
} catch (Exception e) {
// exspected
}
// the source file's preferred block size cannot be greater than the target
{
final Path src1 = new Path(parentDir, "src1");
DFSTestUtil.createFile(dfs, src1, fileLen, REPL_FACTOR, 0L);
final Path src2 = new Path(parentDir, "src2");
// create a file whose preferred block size is greater than the target
DFSTestUtil.createFile(dfs, src2, 1024, fileLen,
dfs.getDefaultBlockSize(trg) * 2, REPL_FACTOR, 0L);
try {
dfs.concat(trg, new Path[] {src1, src2});
fail("didn't fail for src with greater preferred block size");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("preferred block size", e);
}
}
}
/**
* make sure we update the quota correctly after concat
*/
@Test
public void testConcatWithQuotaDecrease() throws IOException {
final short srcRepl = 3; // note this is different with REPL_FACTOR
final int srcNum = 10;
final Path foo = new Path("/foo");
final Path[] srcs = new Path[srcNum];
final Path target = new Path(foo, "target");
DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
for (int i = 0; i < srcNum; i++) {
srcs[i] = new Path(foo, "src" + i);
DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
}
ContentSummary summary = dfs.getContentSummary(foo);
Assert.assertEquals(11, summary.getFileCount());
Assert.assertEquals(blockSize * REPL_FACTOR +
blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
dfs.concat(target, srcs);
summary = dfs.getContentSummary(foo);
Assert.assertEquals(1, summary.getFileCount());
Assert.assertEquals(
blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
summary.getSpaceConsumed());
}
@Test
public void testConcatWithQuotaIncrease() throws IOException {
final short repl = 3;
final int srcNum = 10;
final Path foo = new Path("/foo");
final Path bar = new Path(foo, "bar");
final Path[] srcs = new Path[srcNum];
final Path target = new Path(bar, "target");
DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L);
final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR;
dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota);
for (int i = 0; i < srcNum; i++) {
srcs[i] = new Path(bar, "src" + i);
DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L);
}
ContentSummary summary = dfs.getContentSummary(bar);
Assert.assertEquals(11, summary.getFileCount());
Assert.assertEquals(dsQuota, summary.getSpaceConsumed());
try {
dfs.concat(target, srcs);
fail("QuotaExceededException expected");
} catch (RemoteException e) {
Assert.assertTrue(
e.unwrapRemoteException() instanceof QuotaExceededException);
}
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
dfs.concat(target, srcs);
summary = dfs.getContentSummary(bar);
Assert.assertEquals(1, summary.getFileCount());
Assert.assertEquals(blockSize * repl * (srcNum + 1),
summary.getSpaceConsumed());
}
@Test
public void testConcatRelativeTargetPath() throws IOException {
Path dir = new Path("/dir");
Path trg = new Path("trg");
Path src = new Path(dir, "src");
dfs.setWorkingDirectory(dir);
DFSTestUtil.createFile(dfs, trg, blockSize, REPL_FACTOR, 1);
DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
dfs.concat(trg, new Path[]{src});
assertEquals(blockSize * 2, dfs.getFileStatus(trg).getLen());
assertFalse(dfs.exists(src));
}
@Test(timeout = 30000)
public void testConcatReservedRelativePaths() throws IOException {
String testPathDir = "/.reserved/raw/ezone";
Path dir = new Path(testPathDir);
dfs.mkdirs(dir);
Path trg = new Path(testPathDir, "trg");
Path src = new Path(testPathDir, "src");
DFSTestUtil.createFile(dfs, trg, blockSize, REPL_FACTOR, 1);
DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1);
try {
dfs.concat(trg, new Path[] { src });
Assert.fail("Must throw Exception!");
} catch (IOException e) {
String errMsg = "Concat operation doesn't support "
+ FSDirectory.DOT_RESERVED_STRING + " relative path : " + trg;
GenericTestUtils.assertExceptionContains(errMsg, e);
}
}
}