blob: 83ab2102bf669e85ece10be2f92ecc3685b22544 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
/**
* Look at the performance of S3a operations.
*/
public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
private static final Logger LOG = LoggerFactory.getLogger(
ITestS3AInputStreamPerformance.class);
private S3AFileSystem s3aFS;
private Path testData;
private FileStatus testDataStatus;
private FSDataInputStream in;
private S3AInstrumentation.InputStreamStatistics streamStatistics;
public static final int BLOCK_SIZE = 32 * 1024;
public static final int BIG_BLOCK_SIZE = 256 * 1024;
/** Tests only run if the there is a named test file that can be read. */
private boolean testDataAvailable = true;
private String assumptionMessage = "test file";
/**
* Open the FS and the test data. The input stream is always set up here.
* @throws IOException IO Problems.
*/
@Before
public void openFS() throws IOException {
Configuration conf = getConf();
conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024);
conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024);
String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
if (testFile.isEmpty()) {
assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
LOG.warn(assumptionMessage);
testDataAvailable = false;
} else {
testData = new Path(testFile);
LOG.info("Using {} as input stream source", testData);
Path path = this.testData;
bindS3aFS(path);
try {
testDataStatus = s3aFS.getFileStatus(this.testData);
} catch (IOException e) {
LOG.warn("Failed to read file {} specified in {}",
testFile, KEY_CSVTEST_FILE, e);
throw e;
}
}
}
private void bindS3aFS(Path path) throws IOException {
s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
}
/**
* Cleanup: close the stream, close the FS.
*/
@After
public void cleanup() {
describe("cleanup");
IOUtils.closeStream(in);
IOUtils.closeStream(s3aFS);
}
/**
* Declare that the test requires the CSV test dataset.
*/
private void requireCSVTestData() {
assume(assumptionMessage, testDataAvailable);
}
/**
* Open the test file with the read buffer specified in the setting.
* {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy
* @return the stream, wrapping an S3a one
* @throws IOException IO problems
*/
FSDataInputStream openTestFile() throws IOException {
return openTestFile(S3AInputPolicy.Normal, 0);
}
/**
* Open the test file with the read buffer specified in the setting
* {@link #KEY_READ_BUFFER_SIZE}.
* This includes the {@link #requireCSVTestData()} assumption; so
* if called before any FS op, will automatically skip the test
* if the CSV file is absent.
*
* @param inputPolicy input policy to use
* @param readahead readahead/buffer size
* @return the stream, wrapping an S3a one
* @throws IOException IO problems
*/
FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
throws IOException {
requireCSVTestData();
return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
}
/**
* Open a test file with the read buffer specified in the setting
* {@link org.apache.hadoop.fs.s3a.S3ATestConstants#KEY_READ_BUFFER_SIZE}.
*
* @param path path to open
* @param inputPolicy input policy to use
* @param readahead readahead/buffer size
* @return the stream, wrapping an S3a one
* @throws IOException IO problems
*/
private FSDataInputStream openDataFile(S3AFileSystem fs,
Path path,
S3AInputPolicy inputPolicy,
long readahead) throws IOException {
int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
DEFAULT_READ_BUFFER_SIZE);
S3AInputPolicy policy = fs.getInputPolicy();
fs.setInputPolicy(inputPolicy);
try {
FSDataInputStream stream = fs.open(path, bufferSize);
if (readahead >= 0) {
stream.setReadahead(readahead);
}
streamStatistics = getInputStreamStatistics(stream);
return stream;
} finally {
fs.setInputPolicy(policy);
}
}
/**
* Assert that the stream was only ever opened once.
*/
protected void assertStreamOpenedExactlyOnce() {
assertOpenOperationCount(1);
}
/**
* Make an assertion count about the number of open operations.
* @param expected the expected number
*/
private void assertOpenOperationCount(long expected) {
assertEquals("open operations in\n" + in,
expected, streamStatistics.openOperations);
}
/**
* Log how long an IOP took, by dividing the total time by the
* count of operations, printing in a human-readable form.
* @param operation operation being measured
* @param timer timing data
* @param count IOP count.
*/
protected void logTimePerIOP(String operation,
NanoTimer timer,
long count) {
LOG.info("Time per {}: {} nS",
operation, toHuman(timer.duration() / count));
}
@Test
public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
requireCSVTestData();
int blockSize = _1MB;
describe("Open the test file %s and read it in blocks of size %d",
testData, blockSize);
long len = testDataStatus.getLen();
in = openTestFile();
byte[] block = new byte[blockSize];
NanoTimer timer2 = new NanoTimer();
long count = 0;
// implicitly rounding down here
long blockCount = len / blockSize;
long totalToRead = blockCount * blockSize;
long minimumBandwidth = 128 * 1024;
int maxResetCount = 4;
int resetCount = 0;
for (long i = 0; i < blockCount; i++) {
int offset = 0;
int remaining = blockSize;
long blockId = i + 1;
NanoTimer blockTimer = new NanoTimer();
int reads = 0;
while (remaining > 0) {
NanoTimer readTimer = new NanoTimer();
int bytesRead = in.read(block, offset, remaining);
reads++;
if (bytesRead == 1) {
break;
}
remaining -= bytesRead;
offset += bytesRead;
count += bytesRead;
readTimer.end();
if (bytesRead != 0) {
LOG.debug("Bytes in read #{}: {} , block bytes: {}," +
" remaining in block: {}" +
" duration={} nS; ns/byte: {}, bandwidth={} MB/s",
reads, bytesRead, blockSize - remaining, remaining,
readTimer.duration(),
readTimer.nanosPerOperation(bytesRead),
readTimer.bandwidthDescription(bytesRead));
} else {
LOG.warn("0 bytes returned by read() operation #{}", reads);
}
}
blockTimer.end("Reading block %d in %d reads", blockId, reads);
String bw = blockTimer.bandwidthDescription(blockSize);
LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw);
if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
LOG.warn("Bandwidth {} too low on block {}: resetting connection",
bw, blockId);
Assert.assertTrue("Bandwidth of " + bw +" too low after "
+ resetCount + " attempts", resetCount <= maxResetCount);
resetCount++;
// reset the connection
getS3AInputStream(in).resetConnection();
}
}
timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount);
LOG.info("Overall Bandwidth {} MB/s; reset connections {}",
timer2.bandwidth(totalToRead), resetCount);
logStreamStatistics();
}
/**
* Work out the bandwidth in bytes/second.
* @param timer timer measuring the duration
* @param bytes bytes
* @return the number of bytes/second of the recorded operation
*/
public static double bandwidth(NanoTimer timer, long bytes) {
return bytes * 1.0e9 / timer.duration();
}
@Test
public void testLazySeekEnabled() throws Throwable {
describe("Verify that seeks do not trigger any IO");
in = openTestFile();
long len = testDataStatus.getLen();
NanoTimer timer = new NanoTimer();
long blockCount = len / BLOCK_SIZE;
for (long i = 0; i < blockCount; i++) {
in.seek(in.getPos() + BLOCK_SIZE - 1);
}
in.seek(0);
blockCount++;
timer.end("Time to execute %d seeks", blockCount);
logTimePerIOP("seek()", timer, blockCount);
logStreamStatistics();
assertOpenOperationCount(0);
assertEquals("bytes read", 0, streamStatistics.bytesRead);
}
@Test
public void testReadaheadOutOfRange() throws Throwable {
try {
in = openTestFile();
in.setReadahead(-1L);
fail("Stream should have rejected the request "+ in);
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testReadWithNormalPolicy() throws Throwable {
describe("Read big blocks with a big readahead");
executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
S3AInputPolicy.Normal);
assertStreamOpenedExactlyOnce();
}
@Test
public void testDecompressionSequential128K() throws Throwable {
describe("Decompress with a 128K readahead");
executeDecompression(128 * 1024, S3AInputPolicy.Sequential);
assertStreamOpenedExactlyOnce();
}
/**
* Execute a decompression + line read with the given input policy.
* @param readahead byte readahead
* @param inputPolicy read policy
* @throws IOException IO Problems
*/
private void executeDecompression(long readahead,
S3AInputPolicy inputPolicy) throws IOException {
CompressionCodecFactory factory
= new CompressionCodecFactory(getConf());
CompressionCodec codec = factory.getCodec(testData);
long bytesRead = 0;
int lines = 0;
FSDataInputStream objectIn = openTestFile(inputPolicy, readahead);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try (LineReader lineReader = new LineReader(
codec.createInputStream(objectIn), getConf())) {
Text line = new Text();
int read;
while ((read = lineReader.readLine(line)) > 0) {
bytesRead += read;
lines++;
}
} catch (EOFException eof) {
// done
}
timer.end("Time to read %d lines [%d bytes expanded, %d raw]" +
" with readahead = %d",
lines,
bytesRead,
testDataStatus.getLen(),
readahead);
logTimePerIOP("line read", timer, lines);
logStreamStatistics();
}
private void logStreamStatistics() {
LOG.info(String.format("Stream Statistics%n{}"), streamStatistics);
}
/**
* Execute a seek+read sequence.
* @param blockSize block size for seeks
* @param readahead what the readahead value of the stream should be
* @throws IOException IO problems
*/
protected void executeSeekReadSequence(long blockSize,
long readahead,
S3AInputPolicy policy) throws IOException {
in = openTestFile(policy, readahead);
long len = testDataStatus.getLen();
NanoTimer timer = new NanoTimer();
long blockCount = len / blockSize;
LOG.info("Reading {} blocks, readahead = {}",
blockCount, readahead);
for (long i = 0; i < blockCount; i++) {
in.seek(in.getPos() + blockSize - 1);
// this is the read
assertTrue(in.read() >= 0);
}
timer.end("Time to execute %d seeks of distance %d with readahead = %d",
blockCount,
blockSize,
readahead);
logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount);
LOG.info("Effective bandwidth {} MB/S",
timer.bandwidthDescription(streamStatistics.bytesRead -
streamStatistics.bytesSkippedOnSeek));
logStreamStatistics();
}
public static final int _4K = 4 * 1024;
public static final int _8K = 8 * 1024;
public static final int _16K = 16 * 1024;
public static final int _32K = 32 * 1024;
public static final int _64K = 64 * 1024;
public static final int _128K = 128 * 1024;
public static final int _256K = 256 * 1024;
public static final int _1MB = 1024 * 1024;
public static final int _2MB = 2 * _1MB;
public static final int _10MB = _1MB * 10;
public static final int _5MB = _1MB * 5;
private static final int[][] RANDOM_IO_SEQUENCE = {
{_2MB, _128K},
{_128K, _128K},
{_5MB, _64K},
{_1MB, _1MB},
};
@Test
public void testRandomIORandomPolicy() throws Throwable {
executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
assertEquals("streams aborted in " + streamStatistics,
0, streamStatistics.aborted);
}
@Test
public void testRandomIONormalPolicy() throws Throwable {
long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
assertEquals("streams aborted in " + streamStatistics,
4, streamStatistics.aborted);
}
/**
* Execute the random IO {@code readFully(pos, bytes[])} sequence defined by
* {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used
* in the timing too
* @param policy read policy
* @param expectedOpenCount expected number of stream openings
* @throws IOException IO problems
* @return the timer
*/
private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy,
long expectedOpenCount)
throws IOException {
describe("Random IO with policy \"%s\"", policy);
byte[] buffer = new byte[_1MB];
long totalBytesRead = 0;
in = openTestFile(policy, 0);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
for (int[] action : RANDOM_IO_SEQUENCE) {
int position = action[0];
int range = action[1];
in.readFully(position, buffer, 0, range);
totalBytesRead += range;
}
int reads = RANDOM_IO_SEQUENCE.length;
timer.end("Time to execute %d reads of total size %d bytes",
reads,
totalBytesRead);
in.close();
assertOpenOperationCount(expectedOpenCount);
logTimePerIOP("byte read", timer, totalBytesRead);
LOG.info("Effective bandwidth {} MB/S",
timer.bandwidthDescription(streamStatistics.bytesRead -
streamStatistics.bytesSkippedOnSeek));
logStreamStatistics();
return timer;
}
S3AInputStream getS3aStream() {
return (S3AInputStream) in.getWrappedStream();
}
@Test
public void testRandomReadOverBuffer() throws Throwable {
describe("read over a buffer, making sure that the requests" +
" spans readahead ranges");
int datasetLen = _32K;
S3AFileSystem fs = getFileSystem();
Path dataFile = path("testReadOverBuffer.bin");
byte[] sourceData = dataset(datasetLen, 0, 64);
// relies on the field 'fs' referring to the R/W FS
writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true);
byte[] buffer = new byte[datasetLen];
int readahead = _8K;
int halfReadahead = _4K;
in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
LOG.info("Starting initial reads");
S3AInputStream s3aStream = getS3aStream();
assertEquals(readahead, s3aStream.getReadahead());
byte[] oneByte = new byte[1];
assertEquals(1, in.read(0, oneByte, 0, 1));
// make some assertions about the current state
assertEquals("remaining in\n" + in,
readahead - 1, s3aStream.remainingInCurrentRequest());
assertEquals("range start in\n" + in,
0, s3aStream.getContentRangeStart());
assertEquals("range finish in\n" + in,
readahead, s3aStream.getContentRangeFinish());
assertStreamOpenedExactlyOnce();
describe("Starting sequence of positioned read calls over\n%s", in);
NanoTimer readTimer = new NanoTimer();
int currentPos = halfReadahead;
int offset = currentPos;
int bytesRead = 0;
int readOps = 0;
// make multiple read() calls
while (bytesRead < halfReadahead) {
int length = buffer.length - offset;
int read = in.read(currentPos, buffer, offset, length);
bytesRead += read;
offset += read;
readOps++;
assertEquals("open operations on request #" + readOps
+ " after reading " + bytesRead
+ " current position in stream " + currentPos
+ " in\n" + fs
+ "\n " + in,
1, streamStatistics.openOperations);
for (int i = currentPos; i < currentPos + read; i++) {
assertEquals("Wrong value from byte " + i,
sourceData[i], buffer[i]);
}
currentPos += read;
}
assertStreamOpenedExactlyOnce();
// assert at the end of the original block
assertEquals(readahead, currentPos);
readTimer.end("read %d in %d operations", bytesRead, readOps);
bandwidth(readTimer, bytesRead);
LOG.info("Time per byte(): {} nS",
toHuman(readTimer.nanosPerOperation(bytesRead)));
LOG.info("Time per read(): {} nS",
toHuman(readTimer.nanosPerOperation(readOps)));
describe("read last byte");
// read one more
int read = in.read(currentPos, buffer, bytesRead, 1);
assertTrue("-1 from last read", read >= 0);
assertOpenOperationCount(2);
assertEquals("Wrong value from read ", sourceData[currentPos],
(int) buffer[currentPos]);
currentPos++;
// now scan all the way to the end of the file, using single byte read()
// calls
describe("read() to EOF over \n%s", in);
long readCount = 0;
NanoTimer timer = new NanoTimer();
LOG.info("seeking");
in.seek(currentPos);
LOG.info("reading");
while(currentPos < datasetLen) {
int r = in.read();
assertTrue("Negative read() at position " + currentPos + " in\n" + in,
r >= 0);
buffer[currentPos] = (byte)r;
assertEquals("Wrong value from read from\n" + in,
sourceData[currentPos], r);
currentPos++;
readCount++;
}
timer.end("read %d bytes", readCount);
bandwidth(timer, readCount);
LOG.info("Time per read(): {} nS",
toHuman(timer.nanosPerOperation(readCount)));
assertEquals("last read in " + in, -1, in.read());
}
}