blob: 77eb60139a6c3dc563742218a40584025fddd00a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A buffer pool used to manage a number of {@link Buffer} instances from the
* {@link NetworkBufferPool}.
*
* <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
* free operation of the network stack by limiting the number of buffers per
* local buffer pool. It also implements the default mechanism for buffer
* recycling, which ensures that every buffer is ultimately returned to the
* network buffer pool.
*
* <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
* match its new size.
*/
class LocalBufferPool implements BufferPool {
private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);
/** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;
/** The minimum number of required segments for this pool. */
private final int numberOfRequiredMemorySegments;
/**
* The currently available memory segments. These are segments, which have been requested from
* the network buffer pool and are currently not handed out as Buffer instances.
*
* <p><strong>BEWARE:</strong> Take special care with the interactions between this lock and
* locks acquired before entering this class vs. locks being acquired during calls to external
* code inside this class, e.g. with
* {@link org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#bufferQueue}
* via the {@link #registeredListeners} callback.
*/
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
/**
* Buffer availability listeners, which need to be notified when a Buffer becomes available.
* Listeners can only be registered at a time/state where no Buffer instance was available.
*/
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();
/** Maximum number of network buffers to allocate. */
private final int maxNumberOfMemorySegments;
/** The current size of this pool. */
private int currentPoolSize;
/**
* Number of all memory segments, which have been requested from the network buffer pool and are
* somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
*/
private int numberOfRequestedMemorySegments;
private boolean isDestroyed;
private BufferPoolOwner owner;
/**
* Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
* network buffers being available.
*
* @param networkBufferPool
* global network buffer pool to get buffers from
* @param numberOfRequiredMemorySegments
* minimum number of network buffers
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE);
}
/**
* Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal and maximal
* number of network buffers being available.
*
* @param networkBufferPool
* global network buffer pool to get buffers from
* @param numberOfRequiredMemorySegments
* minimum number of network buffers
* @param maxNumberOfMemorySegments
* maximum number of network buffers to allocate
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
int maxNumberOfMemorySegments) {
checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
"Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
checkArgument(maxNumberOfMemorySegments > 0,
"Maximum number of memory segments (%s) should be larger than 0.",
maxNumberOfMemorySegments);
LOG.debug("Using a local buffer pool with {}-{} buffers",
numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
this.networkBufferPool = networkBufferPool;
this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
this.currentPoolSize = numberOfRequiredMemorySegments;
this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
}
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@Override
public boolean isDestroyed() {
synchronized (availableMemorySegments) {
return isDestroyed;
}
}
@Override
public int getMemorySegmentSize() {
return networkBufferPool.getMemorySegmentSize();
}
@Override
public int getNumberOfRequiredMemorySegments() {
return numberOfRequiredMemorySegments;
}
@Override
public int getMaxNumberOfMemorySegments() {
return maxNumberOfMemorySegments;
}
@Override
public int getNumberOfAvailableMemorySegments() {
synchronized (availableMemorySegments) {
return availableMemorySegments.size();
}
}
@Override
public int getNumBuffers() {
synchronized (availableMemorySegments) {
return currentPoolSize;
}
}
@Override
public int bestEffortGetNumOfUsedBuffers() {
return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
}
@Override
public void setBufferPoolOwner(BufferPoolOwner owner) {
synchronized (availableMemorySegments) {
checkState(this.owner == null, "Buffer pool owner has already been set.");
this.owner = checkNotNull(owner);
}
}
@Override
public Buffer requestBuffer() throws IOException {
try {
return toBuffer(requestMemorySegment(false));
}
catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public Buffer requestBufferBlocking() throws IOException, InterruptedException {
return toBuffer(requestMemorySegment(true));
}
@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
return toBufferBuilder(requestMemorySegment(true));
}
private Buffer toBuffer(MemorySegment memorySegment) {
if (memorySegment == null) {
return null;
}
return new NetworkBuffer(memorySegment, this);
}
private BufferBuilder toBufferBuilder(MemorySegment memorySegment) {
if (memorySegment == null) {
return null;
}
return new BufferBuilder(memorySegment, this);
}
private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException {
synchronized (availableMemorySegments) {
returnExcessMemorySegments();
boolean askToRecycle = owner != null;
// fill availableMemorySegments with at least one element, wait if required
while (availableMemorySegments.isEmpty()) {
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
}
if (numberOfRequestedMemorySegments < currentPoolSize) {
final MemorySegment segment = networkBufferPool.requestMemorySegment();
if (segment != null) {
numberOfRequestedMemorySegments++;
return segment;
}
}
if (askToRecycle) {
owner.releaseMemory(1);
}
if (isBlocking) {
availableMemorySegments.wait(2000);
}
else {
return null;
}
}
return availableMemorySegments.poll();
}
}
@Override
public void recycle(MemorySegment segment) {
BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
}
else {
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
return;
}
}
}
// We do not know which locks have been acquired before the recycle() or are needed in the
// notification and which other threads also access them.
// -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676)
boolean success = false;
boolean needMoreBuffers = false;
try {
needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
success = true;
} catch (Throwable ignored) {
// handled below, under the lock
}
if (!success || needMoreBuffers) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
// cleanup tasks how they would have been done if we only had one synchronized block
if (needMoreBuffers) {
listener.notifyBufferDestroyed();
}
if (!success) {
returnMemorySegment(segment);
}
} else {
if (needMoreBuffers) {
registeredListeners.add(listener);
}
if (!success) {
if (numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
} else {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
}
}
}
}
}
}
/**
* Destroy is called after the produce or consume phase of a task finishes.
*/
@Override
public void lazyDestroy() {
// NOTE: if you change this logic, be sure to update recycle() as well!
synchronized (availableMemorySegments) {
if (!isDestroyed) {
MemorySegment segment;
while ((segment = availableMemorySegments.poll()) != null) {
returnMemorySegment(segment);
}
BufferListener listener;
while ((listener = registeredListeners.poll()) != null) {
listener.notifyBufferDestroyed();
}
isDestroyed = true;
}
}
try {
networkBufferPool.destroyBufferPool(this);
} catch (IOException e) {
ExceptionUtils.rethrow(e);
}
}
@Override
public boolean addBufferListener(BufferListener listener) {
synchronized (availableMemorySegments) {
if (!availableMemorySegments.isEmpty() || isDestroyed) {
return false;
}
registeredListeners.add(listener);
return true;
}
}
@Override
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}
returnExcessMemorySegments();
// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
}
}
}
@Override
public String toString() {
synchronized (availableMemorySegments) {
return String.format(
"[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d, destroyed: %s]",
currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
availableMemorySegments.size(), maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
}
}
// ------------------------------------------------------------------------
private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);
numberOfRequestedMemorySegments--;
networkBufferPool.recycle(segment);
}
private void returnExcessMemorySegments() {
assert Thread.holdsLock(availableMemorySegments);
while (numberOfRequestedMemorySegments > currentPoolSize) {
MemorySegment segment = availableMemorySegments.poll();
if (segment == null) {
return;
}
returnMemorySegment(segment);
}
}
}