blob: 9b223f878fa48131ccc6188ba21a44fe6fd8dfc4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.swift;
import org.apache.commons.httpclient.Header;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.swift.http.SwiftProtocolConstants;
import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
import org.apache.hadoop.fs.swift.util.SwiftTestUtils;
import org.apache.hadoop.fs.swift.util.SwiftUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import static org.apache.hadoop.fs.swift.util.SwiftTestUtils.assertPathExists;
import static org.apache.hadoop.fs.swift.util.SwiftTestUtils.readDataset;
/**
* Test partitioned uploads.
* This is done by forcing a very small partition size and verifying that it
* is picked up.
*/
public class TestSwiftFileSystemPartitionedUploads extends
SwiftFileSystemBaseTest {
public static final String WRONG_PARTITION_COUNT =
"wrong number of partitions written into ";
public static final int PART_SIZE = 1;
public static final int PART_SIZE_BYTES = PART_SIZE * 1024;
public static final int BLOCK_SIZE = 1024;
private URI uri;
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
//set the partition size to 1 KB
conf.setInt(SwiftProtocolConstants.SWIFT_PARTITION_SIZE, PART_SIZE);
return conf;
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testPartitionPropertyPropagatesToConf() throws Throwable {
assertEquals(1,
getConf().getInt(SwiftProtocolConstants.SWIFT_PARTITION_SIZE,
0));
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testPartionPropertyPropagatesToStore() throws Throwable {
assertEquals(1, fs.getStore().getPartsizeKB());
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUpload() throws Throwable {
final Path path = new Path("/test/testFilePartUpload");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status;
final Path qualifiedPath = path.makeQualified(fs);
status = fs.getFileStatus(qualifiedPath);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
//last bit of test -which seems to play up on partitions, which we download
//to a skip
try {
validatePathLen(path, len);
} catch (AssertionError e) {
//downgrade to a skip
throw new AssumptionViolatedException(e, null);
}
} finally {
IOUtils.closeStream(out);
}
}
/**
* tests functionality for big files ( > 5Gb) upload
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testFilePartUploadNoLengthCheck() throws IOException, URISyntaxException {
final Path path = new Path("/test/testFilePartUploadLengthCheck");
int len = 8192;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
try {
int totalPartitionsToWrite = len / PART_SIZE_BYTES;
assertPartitionsWritten("Startup", out, 0);
//write 2048
int firstWriteLen = 2048;
out.write(src, 0, firstWriteLen);
//assert
long expected = getExpectedPartitionsWritten(firstWriteLen,
PART_SIZE_BYTES,
false);
SwiftUtils.debug(LOG, "First write: predict %d partitions written",
expected);
assertPartitionsWritten("First write completed", out, expected);
//write the rest
int remainder = len - firstWriteLen;
SwiftUtils.debug(LOG, "remainder: writing: %d bytes", remainder);
out.write(src, firstWriteLen, remainder);
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, false);
assertPartitionsWritten("Remaining data", out, expected);
out.close();
expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
assertPartitionsWritten("Stream closed", out, expected);
Header[] headers = fs.getStore().getObjectHeaders(path, true);
for (Header header : headers) {
LOG.info(header.toString());
}
byte[] dest = readDataset(fs, path, len);
LOG.info("Read dataset from " + path + ": data length =" + len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
FileStatus status = fs.getFileStatus(path);
//now see what block location info comes back.
//This will vary depending on the Swift version, so the results
//aren't checked -merely that the test actually worked
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, len);
assertNotNull("Null getFileBlockLocations()", locations);
assertTrue("empty array returned for getFileBlockLocations()",
locations.length > 0);
} finally {
IOUtils.closeStream(out);
}
}
private FileStatus validatePathLen(Path path, int len) throws IOException {
//verify that the length is what was written in a direct status check
final Path qualifiedPath = path.makeQualified(fs);
FileStatus[] parentDirListing = fs.listStatus(qualifiedPath.getParent());
StringBuilder listing = lsToString(parentDirListing);
String parentDirLS = listing.toString();
FileStatus status = fs.getFileStatus(qualifiedPath);
assertEquals("Length of written file " + qualifiedPath
+ " from status check " + status
+ " in dir " + listing,
len,
status.getLen());
String fileInfo = qualifiedPath + " " + status;
assertFalse("File claims to be a directory " + fileInfo,
status.isDir());
FileStatus listedFileStat = resolveChild(parentDirListing, qualifiedPath);
assertNotNull("Did not find " + path + " in " + parentDirLS,
listedFileStat);
//file is in the parent dir. Now validate it's stats
assertEquals("Wrong len for " + path + " in listing " + parentDirLS,
len,
listedFileStat.getLen());
listedFileStat.toString();
return status;
}
private FileStatus resolveChild(FileStatus[] parentDirListing,
Path childPath) {
FileStatus listedFileStat = null;
for (FileStatus stat : parentDirListing) {
if (stat.getPath().equals(childPath)) {
listedFileStat = stat;
}
}
return listedFileStat;
}
private StringBuilder lsToString(FileStatus[] parentDirListing) {
StringBuilder listing = new StringBuilder();
for (FileStatus stat : parentDirListing) {
listing.append(stat).append("\n");
}
return listing;
}
/**
* Calculate the #of partitions expected from the upload
* @param uploaded number of bytes uploaded
* @param partSizeBytes the partition size
* @param closed whether or not the stream has closed
* @return the expected number of partitions, for use in assertions.
*/
private int getExpectedPartitionsWritten(long uploaded,
int partSizeBytes,
boolean closed) {
//#of partitions in total
int partitions = (int) (uploaded / partSizeBytes);
//#of bytes past the last partition
int remainder = (int) (uploaded % partSizeBytes);
if (closed) {
//all data is written, so if there was any remainder, it went up
//too
return partitions + ((remainder > 0) ? 1 : 0);
} else {
//not closed. All the remainder is buffered,
return partitions;
}
}
private int getBufferSize() {
return fs.getConf().getInt("io.file.buffer.size", 4096);
}
/**
* Test sticks up a very large partitioned file and verifies that
* it comes back unchanged.
* @throws Throwable
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testManyPartitionedFile() throws Throwable {
final Path path = new Path("/test/testManyPartitionedFile");
int len = PART_SIZE_BYTES * 15;
final byte[] src = SwiftTestUtils.dataset(len, 32, 144);
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
BLOCK_SIZE);
out.write(src, 0, src.length);
int expected =
getExpectedPartitionsWritten(len, PART_SIZE_BYTES, true);
out.close();
assertPartitionsWritten("write completed", out, expected);
assertEquals("too few bytes written", len,
SwiftNativeFileSystem.getBytesWritten(out));
assertEquals("too few bytes uploaded", len,
SwiftNativeFileSystem.getBytesUploaded(out));
//now we verify that the data comes back. If it
//doesn't, it means that the ordering of the partitions
//isn't right
byte[] dest = readDataset(fs, path, len);
//compare data
SwiftTestUtils.compareByteArrays(src, dest, len);
//finally, check the data
FileStatus[] stats = fs.listStatus(path);
assertEquals("wrong entry count in "
+ SwiftTestUtils.dumpStats(path.toString(), stats),
expected, stats.length);
}
/**
* Test that when a partitioned file is overwritten by a smaller one,
* all the old partitioned files go away
* @throws Throwable
*/
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testOverwritePartitionedFile() throws Throwable {
final Path path = new Path("/test/testOverwritePartitionedFile");
final int len1 = 8192;
final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
FSDataOutputStream out = fs.create(path,
false,
getBufferSize(),
(short) 1,
1024);
out.write(src1, 0, len1);
out.close();
long expected = getExpectedPartitionsWritten(len1,
PART_SIZE_BYTES,
false);
assertPartitionsWritten("initial upload", out, expected);
assertExists("Exists", path);
FileStatus status = fs.getFileStatus(path);
assertEquals("Length", len1, status.getLen());
//now write a shorter file with a different dataset
final int len2 = 4095;
final byte[] src2 = SwiftTestUtils.dataset(len2, 'a', 'z');
out = fs.create(path,
true,
getBufferSize(),
(short) 1,
1024);
out.write(src2, 0, len2);
out.close();
status = fs.getFileStatus(path);
assertEquals("Length", len2, status.getLen());
byte[] dest = readDataset(fs, path, len2);
//compare data
SwiftTestUtils.compareByteArrays(src2, dest, len2);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeleteSmallPartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeleteSmallPartitionedFile");
final int len1 = 1024;
final byte[] src1 = SwiftTestUtils.dataset(len1, 'A', 'Z');
SwiftTestUtils.writeDataset(fs, path, src1, len1, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertPathDoesNotExist("partition 0002 found under " + ls, part_0002);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testDeletePartitionedFile() throws Throwable {
final Path path = new Path("/test/testDeletePartitionedFile");
SwiftTestUtils.writeDataset(fs, path, data, data.length, 1024, false);
assertExists("Exists", path);
Path part_0001 = new Path(path, SwiftUtils.partitionFilenameFromNumber(1));
Path part_0002 = new Path(path, SwiftUtils.partitionFilenameFromNumber(2));
String ls = SwiftTestUtils.ls(fs, path);
assertExists("Partition 0001 Exists in " + ls, part_0001);
assertExists("Partition 0002 Exists in " + ls, part_0001);
fs.delete(path, false);
assertPathDoesNotExist("deleted file still there", path);
ls = SwiftTestUtils.ls(fs, path);
assertPathDoesNotExist("partition 0001 file still under " + ls, part_0001);
assertPathDoesNotExist("partition 0002 file still under " + ls, part_0002);
}
@Test(timeout = SWIFT_BULK_IO_TEST_TIMEOUT)
public void testRenamePartitionedFile() throws Throwable {
Path src = new Path("/test/testRenamePartitionedFileSrc");
int len = data.length;
SwiftTestUtils.writeDataset(fs, src, data, len, 1024, false);
assertExists("Exists", src);
String partOneName = SwiftUtils.partitionFilenameFromNumber(1);
Path srcPart = new Path(src, partOneName);
Path dest = new Path("/test/testRenamePartitionedFileDest");
Path destPart = new Path(src, partOneName);
assertExists("Partition Exists", srcPart);
fs.rename(src, dest);
assertPathExists(fs, "dest file missing", dest);
FileStatus status = fs.getFileStatus(dest);
assertEquals("Length of renamed file is wrong", len, status.getLen());
byte[] destData = readDataset(fs, dest, len);
//compare data
SwiftTestUtils.compareByteArrays(data, destData, len);
String srcLs = SwiftTestUtils.ls(fs, src);
String destLs = SwiftTestUtils.ls(fs, dest);
assertPathDoesNotExist("deleted file still found in " + srcLs, src);
assertPathDoesNotExist("partition file still found in " + srcLs, srcPart);
}
}