blob: 189357f6bd04fd85cac8bc67e870fc689a62879e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.impl.prefetch;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkState;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
/**
* Manages a fixed pool of {@code ByteBuffer} instances.
* <p>
* Avoids creating a new buffer if a previously created buffer is already available.
*/
public class BufferPool implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
/**
* Max number of buffers in this pool.
*/
private final int size;
/**
* Size in bytes of each buffer.
*/
private final int bufferSize;
/*
Invariants for internal state.
-- a buffer is either in this.pool or in this.allocated
-- transition between this.pool <==> this.allocated must be atomic
-- only one buffer allocated for a given blockNumber
*/
/**
* Underlying bounded resource pool.
*/
private BoundedResourcePool<ByteBuffer> pool;
/**
* Allows associating metadata to each buffer in the pool.
*/
private Map<BufferData, ByteBuffer> allocated;
/**
* Prefetching stats.
*/
private PrefetchingStatistics prefetchingStatistics;
/**
* Initializes a new instance of the {@code BufferPool} class.
* @param size number of buffer in this pool.
* @param bufferSize size in bytes of each buffer.
* @param prefetchingStatistics statistics for this stream.
* @throws IllegalArgumentException if size is zero or negative.
* @throws IllegalArgumentException if bufferSize is zero or negative.
*/
public BufferPool(int size,
int bufferSize,
PrefetchingStatistics prefetchingStatistics) {
Validate.checkPositiveInteger(size, "size");
Validate.checkPositiveInteger(bufferSize, "bufferSize");
this.size = size;
this.bufferSize = bufferSize;
this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.pool = new BoundedResourcePool<ByteBuffer>(size) {
@Override
public ByteBuffer createNew() {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
prefetchingStatistics.memoryAllocated(bufferSize);
return buffer;
}
};
}
/**
* Gets a list of all blocks in this pool.
* @return a list of all blocks in this pool.
*/
public List<BufferData> getAll() {
synchronized (allocated) {
return Collections.unmodifiableList(new ArrayList<>(allocated.keySet()));
}
}
/**
* Acquires a {@code ByteBuffer}; blocking if necessary until one becomes available.
* @param blockNumber the id of the block to acquire.
* @return the acquired block's {@code BufferData}.
*/
public synchronized BufferData acquire(int blockNumber) {
BufferData data;
final int maxRetryDelayMs = 600 * 1000;
final int statusUpdateDelayMs = 120 * 1000;
Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);
do {
if (retryer.updateStatus()) {
if (LOG.isDebugEnabled()) {
LOG.debug("waiting to acquire block: {}", blockNumber);
LOG.debug("state = {}", this);
}
releaseReadyBlock(blockNumber);
}
data = tryAcquire(blockNumber);
}
while ((data == null) && retryer.continueRetry());
if (data != null) {
return data;
} else {
String message =
String.format("Wait failed for acquire(%d)", blockNumber);
throw new IllegalStateException(message);
}
}
/**
* Acquires a buffer if one is immediately available. Otherwise returns null.
* @param blockNumber the id of the block to try acquire.
* @return the acquired block's {@code BufferData} or null.
*/
public synchronized BufferData tryAcquire(int blockNumber) {
return acquireHelper(blockNumber, false);
}
private synchronized BufferData acquireHelper(int blockNumber,
boolean canBlock) {
checkNotNegative(blockNumber, "blockNumber");
releaseDoneBlocks();
BufferData data = find(blockNumber);
if (data != null) {
return data;
}
ByteBuffer buffer = canBlock ? pool.acquire() : pool.tryAcquire();
if (buffer == null) {
return null;
}
buffer.clear();
data = new BufferData(blockNumber, buffer.duplicate());
synchronized (allocated) {
checkState(find(blockNumber) == null, "buffer data already exists");
allocated.put(data, buffer);
}
return data;
}
/**
* Releases resources for any blocks marked as 'done'.
*/
private synchronized void releaseDoneBlocks() {
for (BufferData data : getAll()) {
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
release(data);
}
}
}
/**
* If no blocks were released after calling releaseDoneBlocks() a few times,
* we may end up waiting forever. To avoid that situation, we try releasing
* a 'ready' block farthest away from the given block.
*/
private synchronized void releaseReadyBlock(int blockNumber) {
BufferData releaseTarget = null;
for (BufferData data : getAll()) {
if (data.stateEqualsOneOf(BufferData.State.READY)) {
if (releaseTarget == null) {
releaseTarget = data;
} else {
if (distance(data, blockNumber) > distance(releaseTarget,
blockNumber)) {
releaseTarget = data;
}
}
}
}
if (releaseTarget != null) {
LOG.warn("releasing 'ready' block: {}", releaseTarget);
releaseTarget.setDone();
}
}
private int distance(BufferData data, int blockNumber) {
return Math.abs(data.getBlockNumber() - blockNumber);
}
/**
* Releases a previously acquired resource.
* @param data the {@code BufferData} instance to release.
* @throws IllegalArgumentException if data is null.
* @throws IllegalArgumentException if data cannot be released due to its state.
*/
public synchronized void release(BufferData data) {
checkNotNull(data, "data");
synchronized (data) {
checkArgument(
canRelease(data),
String.format("Unable to release buffer: %s", data));
ByteBuffer buffer = allocated.get(data);
if (buffer == null) {
// Likely released earlier.
return;
}
buffer.clear();
pool.release(buffer);
allocated.remove(data);
}
releaseDoneBlocks();
}
@Override
public synchronized void close() {
for (BufferData data : getAll()) {
Future<Void> actionFuture = data.getActionFuture();
if (actionFuture != null) {
actionFuture.cancel(true);
}
}
int currentPoolSize = pool.numCreated();
pool.close();
pool = null;
allocated.clear();
allocated = null;
prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
}
// For debugging purposes.
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(pool.toString());
sb.append("\n");
List<BufferData> allData = new ArrayList<>(getAll());
Collections.sort(allData,
(d1, d2) -> d1.getBlockNumber() - d2.getBlockNumber());
for (BufferData data : allData) {
sb.append(data.toString());
sb.append("\n");
}
return sb.toString();
}
// Number of ByteBuffers created so far.
public synchronized int numCreated() {
return pool.numCreated();
}
// Number of ByteBuffers available to be acquired.
public synchronized int numAvailable() {
releaseDoneBlocks();
return pool.numAvailable();
}
private BufferData find(int blockNumber) {
synchronized (allocated) {
for (BufferData data : allocated.keySet()) {
if ((data.getBlockNumber() == blockNumber)
&& !data.stateEqualsOneOf(BufferData.State.DONE)) {
return data;
}
}
}
return null;
}
private boolean canRelease(BufferData data) {
return data.stateEqualsOneOf(
BufferData.State.DONE,
BufferData.State.READY);
}
}