blob: f2af116330f3f0bc3f629199b159de78423a16a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils .*;
/**
* Write data into a page blob and verify you can read back all of it
* or just a part of it.
*/
public class ITestReadAndSeekPageBlobAfterWrite extends AbstractAzureScaleTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestReadAndSeekPageBlobAfterWrite.class);
private FileSystem fs;
private byte[] randomData;
// Page blob physical page size
private static final int PAGE_SIZE = PageBlobFormatHelpers.PAGE_SIZE;
// Size of data on page (excluding header)
private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test
private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
private Random rand = new Random();
// A key with a prefix under /pageBlobs, which for the test file system will
// force use of a page blob.
private static final String KEY = "/pageBlobs/file.dat";
// path of page blob file to read and write
private Path blobPath;
@Override
public void setUp() throws Exception {
super.setUp();
fs = getTestAccount().getFileSystem();
// Make sure we are using an integral number of pages.
assertEquals(0, MAX_BYTES % PAGE_SIZE);
// load an in-memory array of random data
randomData = new byte[PAGE_SIZE * MAX_PAGES];
rand.nextBytes(randomData);
blobPath = blobPath("ITestReadAndSeekPageBlobAfterWrite");
}
@Override
public void tearDown() throws Exception {
deleteQuietly(fs, blobPath, true);
super.tearDown();
}
/**
* Make sure the file name (key) is a page blob file name. If anybody changes that,
* we need to come back and update this test class.
*/
@Test
public void testIsPageBlobFileName() {
AzureNativeFileSystemStore store = ((NativeAzureFileSystem) fs).getStore();
String[] a = blobPath.toUri().getPath().split("/");
String key2 = a[1] + "/";
assertTrue("Not a page blob: " + blobPath, store.isPageBlobKey(key2));
}
/**
* For a set of different file sizes, write some random data to a page blob,
* read it back, and compare that what was read is the same as what was written.
*/
@Test
public void testReadAfterWriteRandomData() throws IOException {
// local shorthand
final int pds = PAGE_DATA_SIZE;
// Test for sizes at and near page boundaries
int[] dataSizes = {
// on first page
0, 1, 2, 3,
// Near first physical page boundary (because the implementation
// stores PDS + the page header size bytes on each page).
pds - 1, pds, pds + 1, pds + 2, pds + 3,
// near second physical page boundary
(2 * pds) - 1, (2 * pds), (2 * pds) + 1, (2 * pds) + 2, (2 * pds) + 3,
// near tenth physical page boundary
(10 * pds) - 1, (10 * pds), (10 * pds) + 1, (10 * pds) + 2, (10 * pds) + 3,
// test one big size, >> 4MB (an internal buffer size in the code)
MAX_BYTES
};
for (int i : dataSizes) {
testReadAfterWriteRandomData(i);
}
}
private void testReadAfterWriteRandomData(int size) throws IOException {
writeRandomData(size);
readRandomDataAndVerify(size);
}
/**
* Read "size" bytes of data and verify that what was read and what was written
* are the same.
*/
private void readRandomDataAndVerify(int size) throws AzureException, IOException {
byte[] b = new byte[size];
FSDataInputStream stream = fs.open(blobPath);
int bytesRead = stream.read(b);
stream.close();
assertEquals(bytesRead, size);
// compare the data read to the data written
assertTrue(comparePrefix(randomData, b, size));
}
// return true if the beginning "size" values of the arrays are the same
private boolean comparePrefix(byte[] a, byte[] b, int size) {
if (a.length < size || b.length < size) {
return false;
}
for (int i = 0; i < size; i++) {
if (a[i] != b[i]) {
return false;
}
}
return true;
}
// Write a specified amount of random data to the file path for this test class.
private void writeRandomData(int size) throws IOException {
OutputStream output = fs.create(blobPath);
output.write(randomData, 0, size);
output.close();
}
/**
* Write data to a page blob, open it, seek, and then read a range of data.
* Then compare that the data read from that range is the same as the data originally written.
*/
@Test
public void testPageBlobSeekAndReadAfterWrite() throws IOException {
writeRandomData(PAGE_SIZE * MAX_PAGES);
int recordSize = 100;
byte[] b = new byte[recordSize];
try(FSDataInputStream stream = fs.open(blobPath)) {
// Seek to a boundary around the middle of the 6th page
int seekPosition = 5 * PAGE_SIZE + 250;
stream.seek(seekPosition);
// Read a record's worth of bytes and verify results
int bytesRead = stream.read(b);
verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
// Seek to another spot and read a record greater than a page
seekPosition = 10 * PAGE_SIZE + 250;
stream.seek(seekPosition);
recordSize = 1000;
b = new byte[recordSize];
bytesRead = stream.read(b);
verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
// Read the last 100 bytes of the file
recordSize = 100;
seekPosition = PAGE_SIZE * MAX_PAGES - recordSize;
stream.seek(seekPosition);
b = new byte[recordSize];
bytesRead = stream.read(b);
verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
// Read past the end of the file and we should get only partial data.
recordSize = 100;
seekPosition = PAGE_SIZE * MAX_PAGES - recordSize + 50;
stream.seek(seekPosition);
b = new byte[recordSize];
bytesRead = stream.read(b);
assertEquals(50, bytesRead);
// compare last 50 bytes written with those read
byte[] tail = Arrays.copyOfRange(randomData, seekPosition, randomData.length);
assertTrue(comparePrefix(tail, b, 50));
}
}
// Verify that reading a record of data after seeking gives the expected data.
private void verifyReadRandomData(byte[] b, int bytesRead, int seekPosition, int recordSize) {
byte[] originalRecordData =
Arrays.copyOfRange(randomData, seekPosition, seekPosition + recordSize + 1);
assertEquals(recordSize, bytesRead);
assertTrue(comparePrefix(originalRecordData, b, recordSize));
}
// Test many small flushed writes interspersed with periodic hflush calls.
// For manual testing, increase NUM_WRITES to a large number.
// The goal for a long-running manual test is to make sure that it finishes
// and the close() call does not time out. It also facilitates debugging into
// hflush/hsync.
@Test
public void testManySmallWritesWithHFlush() throws IOException {
writeAndReadOneFile(50, 100, 20);
}
/**
* Write a total of numWrites * recordLength data to a file, read it back,
* and check to make sure what was read is the same as what was written.
* The syncInterval is the number of writes after which to call hflush to
* force the data to storage.
*/
private void writeAndReadOneFile(int numWrites,
int recordLength, int syncInterval) throws IOException {
// A lower bound on the minimum time we think it will take to do
// a write to Azure storage.
final long MINIMUM_EXPECTED_TIME = 20;
LOG.info("Writing " + numWrites * recordLength + " bytes to " + blobPath.getName());
FSDataOutputStream output = fs.create(blobPath);
int writesSinceHFlush = 0;
try {
// Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream,
// to test concurrent execution gates.
output.flush();
output.hflush();
for (int i = 0; i < numWrites; i++) {
output.write(randomData, i * recordLength, recordLength);
writesSinceHFlush++;
output.flush();
if ((i % syncInterval) == 0) {
output.hflush();
writesSinceHFlush = 0;
}
}
} finally {
long start = Time.monotonicNow();
output.close();
long end = Time.monotonicNow();
LOG.debug("close duration = " + (end - start) + " msec.");
if (writesSinceHFlush > 0) {
assertTrue(String.format(
"close duration with >= 1 pending write is %d, less than minimum expected of %d",
end - start, MINIMUM_EXPECTED_TIME),
end - start >= MINIMUM_EXPECTED_TIME);
}
}
// Read the data back and check it.
FSDataInputStream stream = fs.open(blobPath);
int SIZE = numWrites * recordLength;
byte[] b = new byte[SIZE];
try {
stream.seek(0);
stream.read(b, 0, SIZE);
verifyReadRandomData(b, SIZE, 0, SIZE);
} finally {
stream.close();
}
// delete the file
fs.delete(blobPath, false);
}
// Test writing to a large file repeatedly as a stress test.
// Set the repetitions to a larger number for manual testing
// for a longer stress run.
@Test
public void testLargeFileStress() throws IOException {
int numWrites = 32;
int recordSize = 1024 * 1024;
int syncInterval = 10;
int repetitions = 1;
for (int i = 0; i < repetitions; i++) {
writeAndReadOneFile(numWrites, recordSize, syncInterval);
}
}
// Write to a file repeatedly to verify that it extends.
// The page blob file should start out at 128MB and finish at 256MB.
public void testFileSizeExtension() throws IOException {
final int writeSize = 1024 * 1024;
final int numWrites = 129;
final byte dataByte = 5;
byte[] data = new byte[writeSize];
Arrays.fill(data, dataByte);
try (FSDataOutputStream output = fs.create(blobPath)) {
for (int i = 0; i < numWrites; i++) {
output.write(data);
output.hflush();
LOG.debug("total writes = " + (i + 1));
}
}
// Show that we wrote more than the default page blob file size.
assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
// Verify we can list the new size. That will prove we expanded the file.
FileStatus[] status = fs.listStatus(blobPath);
assertEquals("File size hasn't changed " + status,
numWrites * writeSize, status[0].getLen());
LOG.debug("Total bytes written to " + blobPath + " = " + status[0].getLen());
fs.delete(blobPath, false);
}
}