blob: 4ab33ef6cd074735bf68d559397155ac361e5291 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.prefetch;
import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.junit.Assert.assertEquals;
/**
* Applies the same set of tests to both S3ACachingInputStream and S3AInMemoryInputStream.
*/
public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
private static final int FILE_SIZE = 10;
private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
private final ExecutorServiceFuturePool futurePool =
new ExecutorServiceFuturePool(threadPool);
private final S3AInputStream.InputStreamCallbacks client =
MockS3ARemoteObject.createClient("bucket");
@Test
public void testArgChecks() throws Exception {
S3AReadOpContext readContext =
S3APrefetchFakes.createReadContext(futurePool, "key", 10, 10, 1);
S3ObjectAttributes attrs =
S3APrefetchFakes.createObjectAttributes("bucket", "key", 10);
S3AInputStreamStatistics stats =
readContext.getS3AStatisticsContext().newInputStreamStatistics();
// Should not throw.
new S3ACachingInputStream(readContext, attrs, client, stats);
ExceptionAsserts.assertThrows(
NullPointerException.class,
() -> new S3ACachingInputStream(null, attrs, client, stats));
ExceptionAsserts.assertThrows(
NullPointerException.class,
() -> new S3ACachingInputStream(readContext, null, client, stats));
ExceptionAsserts.assertThrows(
NullPointerException.class,
() -> new S3ACachingInputStream(readContext, attrs, null, stats));
ExceptionAsserts.assertThrows(
NullPointerException.class,
() -> new S3ACachingInputStream(readContext, attrs, client, null));
}
@Test
public void testRead0SizedFile() throws Exception {
S3ARemoteInputStream inputStream =
S3APrefetchFakes.createS3InMemoryInputStream(futurePool, "bucket",
"key", 0);
testRead0SizedFileHelper(inputStream, 9);
inputStream =
S3APrefetchFakes.createS3CachingInputStream(futurePool, "bucket", "key",
0, 5, 2);
testRead0SizedFileHelper(inputStream, 5);
}
private void testRead0SizedFileHelper(S3ARemoteInputStream inputStream,
int bufferSize)
throws Exception {
assertEquals(0, inputStream.available());
assertEquals(-1, inputStream.read());
assertEquals(-1, inputStream.read());
byte[] buffer = new byte[2];
assertEquals(-1, inputStream.read(buffer));
assertEquals(-1, inputStream.read());
}
@Test
public void testRead() throws Exception {
S3ARemoteInputStream inputStream =
S3APrefetchFakes.createS3InMemoryInputStream(futurePool, "bucket",
"key", FILE_SIZE);
testReadHelper(inputStream, FILE_SIZE);
inputStream =
S3APrefetchFakes.createS3CachingInputStream(futurePool, "bucket", "key",
FILE_SIZE, 5, 2);
testReadHelper(inputStream, 5);
}
private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize)
throws Exception {
assertEquals(bufferSize, inputStream.available());
assertEquals(0, inputStream.read());
assertEquals(1, inputStream.read());
byte[] buffer = new byte[2];
assertEquals(2, inputStream.read(buffer));
assertEquals(2, buffer[0]);
assertEquals(3, buffer[1]);
assertEquals(4, inputStream.read());
buffer = new byte[10];
int curPos = (int) inputStream.getPos();
int expectedRemainingBytes = (int) (FILE_SIZE - curPos);
int readStartOffset = 2;
assertEquals(
expectedRemainingBytes,
inputStream.read(buffer, readStartOffset, expectedRemainingBytes));
for (int i = 0; i < expectedRemainingBytes; i++) {
assertEquals(curPos + i, buffer[readStartOffset + i]);
}
assertEquals(-1, inputStream.read());
Thread.sleep(100);
assertEquals(-1, inputStream.read());
assertEquals(-1, inputStream.read());
assertEquals(-1, inputStream.read(buffer));
assertEquals(-1, inputStream.read(buffer, 1, 3));
}
@Test
public void testSeek() throws Exception {
S3ARemoteInputStream inputStream;
inputStream =
S3APrefetchFakes.createS3InMemoryInputStream(futurePool, "bucket",
"key", 9);
testSeekHelper(inputStream, 9, 9);
inputStream =
S3APrefetchFakes.createS3CachingInputStream(futurePool, "bucket", "key",
9, 5, 1);
testSeekHelper(inputStream, 5, 9);
}
private void testSeekHelper(S3ARemoteInputStream inputStream,
int bufferSize,
int fileSize)
throws Exception {
assertEquals(0, inputStream.getPos());
inputStream.seek(7);
assertEquals(7, inputStream.getPos());
inputStream.seek(0);
assertEquals(bufferSize, inputStream.available());
for (int i = 0; i < fileSize; i++) {
assertEquals(i, inputStream.read());
}
for (int i = 0; i < fileSize; i++) {
inputStream.seek(i);
for (int j = i; j < fileSize; j++) {
assertEquals(j, inputStream.read());
}
}
// Test invalid seeks.
ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));
}
@Test
public void testRandomSeek() throws Exception {
S3ARemoteInputStream inputStream;
inputStream =
S3APrefetchFakes.createS3InMemoryInputStream(futurePool, "bucket",
"key", 9);
testRandomSeekHelper(inputStream, 9, 9);
inputStream =
S3APrefetchFakes.createS3CachingInputStream(futurePool, "bucket", "key",
9, 5, 1);
testRandomSeekHelper(inputStream, 5, 9);
}
private void testRandomSeekHelper(S3ARemoteInputStream inputStream,
int bufferSize,
int fileSize)
throws Exception {
assertEquals(0, inputStream.getPos());
inputStream.seek(7);
assertEquals(7, inputStream.getPos());
inputStream.seek(0);
assertEquals(bufferSize, inputStream.available());
for (int i = 0; i < fileSize; i++) {
assertEquals(i, inputStream.read());
}
for (int i = 0; i < fileSize; i++) {
inputStream.seek(i);
for (int j = i; j < fileSize; j++) {
assertEquals(j, inputStream.read());
}
int seekFromEndPos = fileSize - i - 1;
inputStream.seek(seekFromEndPos);
for (int j = seekFromEndPos; j < fileSize; j++) {
assertEquals(j, inputStream.read());
}
}
}
@Test
public void testClose() throws Exception {
S3ARemoteInputStream inputStream =
S3APrefetchFakes.createS3InMemoryInputStream(futurePool, "bucket",
"key", 9);
testCloseHelper(inputStream, 9);
inputStream =
S3APrefetchFakes.createS3CachingInputStream(futurePool, "bucket", "key",
9, 5, 3);
testCloseHelper(inputStream, 5);
}
private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize)
throws Exception {
assertEquals(bufferSize, inputStream.available());
assertEquals(0, inputStream.read());
assertEquals(1, inputStream.read());
inputStream.close();
ExceptionAsserts.assertThrows(
IOException.class,
FSExceptionMessages.STREAM_IS_CLOSED,
() -> inputStream.available());
ExceptionAsserts.assertThrows(
IOException.class,
FSExceptionMessages.STREAM_IS_CLOSED,
() -> inputStream.read());
byte[] buffer = new byte[10];
ExceptionAsserts.assertThrows(
IOException.class,
FSExceptionMessages.STREAM_IS_CLOSED,
() -> inputStream.read(buffer));
// Verify a second close() does not throw.
inputStream.close();
}
}