blob: bab07f4f9ec838c39cabe6f37c7e00262f2ee68d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.prefetch;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.impl.prefetch.BlockCache;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
/**
* Provides 'fake' implementations of S3ARemoteInputStream variants.
*
* These implementations avoid accessing the following real resources:
* -- S3 store
* -- local filesystem
*
* This arrangement allows thorough multi-threaded testing of those
* implementations without accessing external resources. It also helps
* avoid test flakiness introduced by external factors.
*/
public final class S3APrefetchFakes {
private S3APrefetchFakes() {
}
public static final String E_TAG = "eTag";
public static final String OWNER = "owner";
public static final String VERSION_ID = "v1";
public static final long MODIFICATION_TIME = 0L;
public static final ChangeDetectionPolicy CHANGE_POLICY =
ChangeDetectionPolicy.createPolicy(
ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.None,
false);
public static S3AFileStatus createFileStatus(String key, long fileSize) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
long blockSize = fileSize;
return new S3AFileStatus(
fileSize, MODIFICATION_TIME, path, blockSize, OWNER, E_TAG, VERSION_ID);
}
public static S3ObjectAttributes createObjectAttributes(
String bucket,
String key,
long fileSize) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
String encryptionKey = "";
return new S3ObjectAttributes(
bucket,
path,
key,
S3AEncryptionMethods.NONE,
encryptionKey,
E_TAG,
VERSION_ID,
fileSize);
}
public static S3AReadOpContext createReadContext(
ExecutorServiceFuturePool futurePool,
String key,
int fileSize,
int prefetchBlockSize,
int prefetchBlockCount) {
S3AFileStatus fileStatus = createFileStatus(key, fileSize);
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
FileSystem.Statistics statistics = new FileSystem.Statistics("s3a");
S3AStatisticsContext statisticsContext = new EmptyS3AStatisticsContext();
RetryPolicy retryPolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(3, 10,
TimeUnit.MILLISECONDS);
return new S3AReadOpContext(
path,
new Invoker(retryPolicy, Invoker.LOG_EVENT),
statistics,
statisticsContext,
fileStatus,
new VectoredIOContext()
.setMinSeekForVectoredReads(1)
.setMaxReadSizeForVectoredReads(1)
.build(),
emptyStatisticsStore(),
futurePool,
prefetchBlockSize,
prefetchBlockCount)
.withChangeDetectionPolicy(
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false))
.withInputPolicy(S3AInputPolicy.Normal);
}
public static URI createUri(String bucket, String key) {
return URI.create(String.format("s3a://%s/%s", bucket, key));
}
public static ChangeTracker createChangeTracker(
String bucket,
String key,
long fileSize) {
return new ChangeTracker(
createUri(bucket, key).toString(),
CHANGE_POLICY,
new CountingChangeTracker(),
createObjectAttributes(bucket, key, fileSize));
}
public static S3ObjectInputStream createS3ObjectInputStream(byte[] buffer) {
return new S3ObjectInputStream(new ByteArrayInputStream(buffer), null);
}
public static S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
String bucket,
String key) {
S3Object object = new S3Object() {
@Override
public S3ObjectInputStream getObjectContent() {
return createS3ObjectInputStream(new byte[8]);
}
@Override
public ObjectMetadata getObjectMetadata() {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setHeader("ETag", E_TAG);
return metadata;
}
};
return new S3AInputStream.InputStreamCallbacks() {
@Override
public S3Object getObject(GetObjectRequest request) {
return object;
}
@Override
public <T> CompletableFuture<T> submit(CallableRaisingIOE<T> operation) {
return null;
}
@Override
public GetObjectRequest newGetRequest(String key) {
return new GetObjectRequest(bucket, key);
}
@Override
public void close() {
}
};
}
public static S3ARemoteInputStream createInputStream(
Class<? extends S3ARemoteInputStream> clazz,
ExecutorServiceFuturePool futurePool,
String bucket,
String key,
int fileSize,
int prefetchBlockSize,
int prefetchBlockCount) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(key);
S3AFileStatus fileStatus = createFileStatus(key, fileSize);
S3ObjectAttributes s3ObjectAttributes =
createObjectAttributes(bucket, key, fileSize);
S3AReadOpContext s3AReadOpContext = createReadContext(
futurePool,
key,
fileSize,
prefetchBlockSize,
prefetchBlockCount);
S3AInputStream.InputStreamCallbacks callbacks =
createInputStreamCallbacks(bucket, key);
S3AInputStreamStatistics stats =
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics();
if (clazz == FakeS3AInMemoryInputStream.class) {
return new FakeS3AInMemoryInputStream(s3AReadOpContext,
s3ObjectAttributes, callbacks, stats);
} else if (clazz == FakeS3ACachingInputStream.class) {
return new FakeS3ACachingInputStream(s3AReadOpContext, s3ObjectAttributes,
callbacks, stats);
}
throw new RuntimeException("Unsupported class: " + clazz);
}
public static FakeS3AInMemoryInputStream createS3InMemoryInputStream(
ExecutorServiceFuturePool futurePool,
String bucket,
String key,
int fileSize) {
return (FakeS3AInMemoryInputStream) createInputStream(
FakeS3AInMemoryInputStream.class, futurePool, bucket, key, fileSize, 1,
1);
}
public static FakeS3ACachingInputStream createS3CachingInputStream(
ExecutorServiceFuturePool futurePool,
String bucket,
String key,
int fileSize,
int prefetchBlockSize,
int prefetchBlockCount) {
return (FakeS3ACachingInputStream) createInputStream(
FakeS3ACachingInputStream.class,
futurePool,
bucket,
key,
fileSize,
prefetchBlockSize,
prefetchBlockCount);
}
public static class FakeS3AInMemoryInputStream
extends S3AInMemoryInputStream {
public FakeS3AInMemoryInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
}
@Override
protected S3ARemoteObject getS3File() {
randomDelay(200);
return new MockS3ARemoteObject(
(int) this.getS3ObjectAttributes().getLen(), false);
}
}
public static class FakeS3FilePerBlockCache extends SingleFilePerBlockCache {
private final Map<Path, byte[]> files;
private final int readDelay;
private final int writeDelay;
public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
super(new EmptyS3AStatisticsContext().newInputStreamStatistics());
this.files = new ConcurrentHashMap<>();
this.readDelay = readDelay;
this.writeDelay = writeDelay;
}
@Override
protected int readFile(Path path, ByteBuffer buffer) {
byte[] source = this.files.get(path);
randomDelay(this.readDelay);
buffer.put(source);
return source.length;
}
@Override
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
byte[] dest = new byte[buffer.limit()];
randomDelay(this.writeDelay);
buffer.rewind();
buffer.get(dest);
this.files.put(path, dest);
}
private long fileCount = 0;
@Override
protected Path getCacheFilePath() throws IOException {
fileCount++;
return Paths.get(Long.toString(fileCount));
}
@Override
public void close() throws IOException {
this.files.clear();
}
}
private static final Random RANDOM = new Random();
private static void randomDelay(int delay) {
try {
Thread.sleep(RANDOM.nextInt(delay));
} catch (InterruptedException e) {
}
}
public static class FakeS3ACachingBlockManager
extends S3ACachingBlockManager {
public FakeS3ACachingBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize) {
super(futurePool, reader, blockData, bufferPoolSize,
new EmptyS3AStatisticsContext().newInputStreamStatistics());
}
@Override
public int read(ByteBuffer buffer, long offset, int size)
throws IOException {
randomDelay(100);
return this.getReader().read(buffer, offset, size);
}
@Override
protected BlockCache createCache() {
final int readDelayMs = 50;
final int writeDelayMs = 200;
return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);
}
}
public static class FakeS3ACachingInputStream extends S3ACachingInputStream {
public FakeS3ACachingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
}
@Override
protected S3ARemoteObject getS3File() {
randomDelay(200);
return new MockS3ARemoteObject(
(int) this.getS3ObjectAttributes().getLen(), false);
}
@Override
protected S3ACachingBlockManager createBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize) {
return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
bufferPoolSize);
}
}
}