blob: c0c7bbc4e7577e1664370fbd0606c67c00336b3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// We have it in this package because we could not mock the methods otherwise
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/** Implementation of {@link BufferPool} for testing. */
public class TestingBufferPool implements BufferPool {
public static final TestingBufferPool NO_OP = TestingBufferPool.builder().build();
private Consumer<Integer> reserveSegmentsConsumer;
private Runnable lazyDestroyRunnable;
private Supplier<Buffer> requestBufferSupplier;
private Supplier<BufferBuilder> requestBufferBuilderSupplier;
private Supplier<BufferBuilder> requestBufferBuilderBlockingSupplier;
private Function<Integer, BufferBuilder> requestBufferBuilderFunction;
private Function<Integer, BufferBuilder> requestBufferBuilderBlockingFunction;
private Function<BufferListener, Boolean> addBufferListenerFunction;
private Supplier<Boolean> isDestroyedSupplier;
private Supplier<MemorySegment> requestMemorySegmentSupplier;
private Supplier<MemorySegment> requestMemorySegmentBlockingSupplier;
private Supplier<Integer> getExpectedNumberOfMemorySegmentsSupplier;
private Supplier<Integer> getMinNumberOfMemorySegmentsSupplier;
private Supplier<Integer> getMaxNumberOfMemorySegmentsSupplier;
private Supplier<Integer> getNumBuffersSupplier;
private Consumer<Integer> setNumBuffersConsumer;
private Consumer<Integer> setMaxOverdraftBuffersPerGateConsumer;
private Supplier<Integer> getMaxOverdraftBuffersPerGateSupplier;
private Supplier<Integer> getNumberOfAvailableMemorySegmentsSupplier;
private Supplier<Integer> bestEffortGetNumOfUsedBuffersSupplier;
private Consumer<MemorySegment> recycleConsumer;
private Supplier<CompletableFuture<?>> getAvailableFutureSupplier;
private TestingBufferPool(
Consumer<Integer> reserveSegmentsConsumer,
Runnable lazyDestroyRunnable,
Supplier<Buffer> requestBufferSupplier,
Supplier<BufferBuilder> requestBufferBuilderSupplier,
Supplier<BufferBuilder> requestBufferBuilderBlockingSupplier,
Function<Integer, BufferBuilder> requestBufferBuilderFunction,
Function<Integer, BufferBuilder> requestBufferBuilderBlockingFunction,
Function<BufferListener, Boolean> addBufferListenerFunction,
Supplier<Boolean> isDestroyedSupplier,
Supplier<MemorySegment> requestMemorySegmentSupplier,
Supplier<MemorySegment> requestMemorySegmentBlockingSupplier,
Supplier<Integer> getExpectedNumberOfMemorySegmentsSupplier,
Supplier<Integer> getMinNumberOfMemorySegmentsSupplier,
Supplier<Integer> getMaxNumberOfMemorySegmentsSupplier,
Supplier<Integer> getNumBuffersSupplier,
Consumer<Integer> setNumBuffersConsumer,
Consumer<Integer> setMaxOverdraftBuffersPerGateConsumer,
Supplier<Integer> getMaxOverdraftBuffersPerGateSupplier,
Supplier<Integer> getNumberOfAvailableMemorySegmentsSupplier,
Supplier<Integer> bestEffortGetNumOfUsedBuffersSupplier,
Consumer<MemorySegment> recycleConsumer,
Supplier<CompletableFuture<?>> getAvailableFutureSupplier) {
this.reserveSegmentsConsumer = reserveSegmentsConsumer;
this.lazyDestroyRunnable = lazyDestroyRunnable;
this.requestBufferSupplier = requestBufferSupplier;
this.requestBufferBuilderSupplier = requestBufferBuilderSupplier;
this.requestBufferBuilderBlockingSupplier = requestBufferBuilderBlockingSupplier;
this.requestBufferBuilderFunction = requestBufferBuilderFunction;
this.requestBufferBuilderBlockingFunction = requestBufferBuilderBlockingFunction;
this.addBufferListenerFunction = addBufferListenerFunction;
this.isDestroyedSupplier = isDestroyedSupplier;
this.requestMemorySegmentSupplier = requestMemorySegmentSupplier;
this.requestMemorySegmentBlockingSupplier = requestMemorySegmentBlockingSupplier;
this.getExpectedNumberOfMemorySegmentsSupplier = getExpectedNumberOfMemorySegmentsSupplier;
this.getMinNumberOfMemorySegmentsSupplier = getMinNumberOfMemorySegmentsSupplier;
this.getMaxNumberOfMemorySegmentsSupplier = getMaxNumberOfMemorySegmentsSupplier;
this.getNumBuffersSupplier = getNumBuffersSupplier;
this.setNumBuffersConsumer = setNumBuffersConsumer;
this.setMaxOverdraftBuffersPerGateConsumer = setMaxOverdraftBuffersPerGateConsumer;
this.getMaxOverdraftBuffersPerGateSupplier = getMaxOverdraftBuffersPerGateSupplier;
this.getNumberOfAvailableMemorySegmentsSupplier =
getNumberOfAvailableMemorySegmentsSupplier;
this.bestEffortGetNumOfUsedBuffersSupplier = bestEffortGetNumOfUsedBuffersSupplier;
this.recycleConsumer = recycleConsumer;
this.getAvailableFutureSupplier = getAvailableFutureSupplier;
}
public static TestingBufferPool.Builder builder() {
return new TestingBufferPool.Builder();
}
@Override
public void reserveSegments(int numberOfSegmentsToReserve) throws IOException {
reserveSegmentsConsumer.accept(numberOfSegmentsToReserve);
}
@Override
public void lazyDestroy() {
lazyDestroyRunnable.run();
}
@Nullable
@Override
public Buffer requestBuffer() {
return requestBufferSupplier.get();
}
@Nullable
@Override
public BufferBuilder requestBufferBuilder() {
return requestBufferBuilderSupplier.get();
}
@Nullable
@Override
public BufferBuilder requestBufferBuilder(int targetChannel) {
return requestBufferBuilderFunction.apply(targetChannel);
}
@Override
public BufferBuilder requestBufferBuilderBlocking() throws InterruptedException {
return requestBufferBuilderBlockingSupplier.get();
}
@Override
public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
throws InterruptedException {
return requestBufferBuilderBlockingFunction.apply(targetChannel);
}
@Override
public boolean addBufferListener(BufferListener listener) {
return addBufferListenerFunction.apply(listener);
}
@Override
public boolean isDestroyed() {
return isDestroyedSupplier.get();
}
@Nullable
@Override
public MemorySegment requestMemorySegment() {
return requestMemorySegmentSupplier.get();
}
@Override
public MemorySegment requestMemorySegmentBlocking() throws InterruptedException {
return requestMemorySegmentBlockingSupplier.get();
}
@Override
public int getExpectedNumberOfMemorySegments() {
return getExpectedNumberOfMemorySegmentsSupplier.get();
}
@Override
public int getMinNumberOfMemorySegments() {
return getMinNumberOfMemorySegmentsSupplier.get();
}
@Override
public int getMaxNumberOfMemorySegments() {
return getMaxNumberOfMemorySegmentsSupplier.get();
}
@Override
public int getNumBuffers() {
return getNumBuffersSupplier.get();
}
@Override
public void setNumBuffers(int numBuffers) {
setNumBuffersConsumer.accept(numBuffers);
}
@Override
public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
setMaxOverdraftBuffersPerGateConsumer.accept(maxOverdraftBuffersPerGate);
}
@Override
public int getMaxOverdraftBuffersPerGate() {
return getMaxOverdraftBuffersPerGateSupplier.get();
}
@Override
public int getNumberOfAvailableMemorySegments() {
return getNumberOfAvailableMemorySegmentsSupplier.get();
}
@Override
public int bestEffortGetNumOfUsedBuffers() {
return bestEffortGetNumOfUsedBuffersSupplier.get();
}
@Override
public void recycle(MemorySegment memorySegment) {
recycleConsumer.accept(memorySegment);
}
@Override
public CompletableFuture<?> getAvailableFuture() {
return getAvailableFutureSupplier.get();
}
/** Builder for {@link TestingBufferPool}. */
public static class Builder {
private Consumer<Integer> reserveSegmentsConsumer = (ignore) -> {};
private Runnable lazyDestroyRunnable = () -> {};
private Supplier<Buffer> requestBufferSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<BufferBuilder> requestBufferBuilderSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<BufferBuilder> requestBufferBuilderBlockingSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Function<Integer, BufferBuilder> requestBufferBuilderFunction =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Function<Integer, BufferBuilder> requestBufferBuilderBlockingFunction =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Function<BufferListener, Boolean> addBufferListenerFunction =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Supplier<Boolean> isDestroyedSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<MemorySegment> requestMemorySegmentSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<MemorySegment> requestMemorySegmentBlockingSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getExpectedNumberOfMemorySegmentsSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getMinNumberOfMemorySegmentsSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getMaxNumberOfMemorySegmentsSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getNumBuffersSupplier = () -> 0;
private Consumer<Integer> setNumBuffersConsumer =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Consumer<Integer> setMaxOverdraftBuffersPerGateConsumer =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getMaxOverdraftBuffersPerGateSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> getNumberOfAvailableMemorySegmentsSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Supplier<Integer> bestEffortGetNumOfUsedBuffersSupplier =
() -> {
throw new UnsupportedOperationException();
};
private Consumer<MemorySegment> recycleConsumer =
(ignore) -> {
throw new UnsupportedOperationException();
};
private Supplier<CompletableFuture<?>> getAvailableFutureSupplier = () -> AVAILABLE;
private Builder() {}
public TestingBufferPool build() {
return new TestingBufferPool(
reserveSegmentsConsumer,
lazyDestroyRunnable,
requestBufferSupplier,
requestBufferBuilderSupplier,
requestBufferBuilderBlockingSupplier,
requestBufferBuilderFunction,
requestBufferBuilderBlockingFunction,
addBufferListenerFunction,
isDestroyedSupplier,
requestMemorySegmentSupplier,
requestMemorySegmentBlockingSupplier,
getExpectedNumberOfMemorySegmentsSupplier,
getMinNumberOfMemorySegmentsSupplier,
getMaxNumberOfMemorySegmentsSupplier,
getNumBuffersSupplier,
setNumBuffersConsumer,
setMaxOverdraftBuffersPerGateConsumer,
getMaxOverdraftBuffersPerGateSupplier,
getNumberOfAvailableMemorySegmentsSupplier,
bestEffortGetNumOfUsedBuffersSupplier,
recycleConsumer,
getAvailableFutureSupplier);
}
public Builder setReserveSegmentsConsumer(Consumer<Integer> reserveSegmentsConsumer) {
this.reserveSegmentsConsumer = reserveSegmentsConsumer;
return this;
}
public Builder setLazyDestroyRunnable(Runnable lazyDestroyRunnable) {
this.lazyDestroyRunnable = lazyDestroyRunnable;
return this;
}
public Builder setRequestBufferSupplier(Supplier<Buffer> requestBufferSupplier) {
this.requestBufferSupplier = requestBufferSupplier;
return this;
}
public Builder setRequestBufferBuilderSupplier(
Supplier<BufferBuilder> requestBufferBuilderSupplier) {
this.requestBufferBuilderSupplier = requestBufferBuilderSupplier;
return this;
}
public Builder setRequestBufferBuilderBlockingSupplier(
Supplier<BufferBuilder> requestBufferBuilderBlockingSupplier) {
this.requestBufferBuilderBlockingSupplier = requestBufferBuilderBlockingSupplier;
return this;
}
public Builder setRequestBufferBuilderFunction(
Function<Integer, BufferBuilder> requestBufferBuilderFunction) {
this.requestBufferBuilderFunction = requestBufferBuilderFunction;
return this;
}
public Builder setRequestBufferBuilderBlockingFunction(
Function<Integer, BufferBuilder> requestBufferBuilderBlockingFunction) {
this.requestBufferBuilderBlockingFunction = requestBufferBuilderBlockingFunction;
return this;
}
public Builder setAddBufferListenerFunction(
Function<BufferListener, Boolean> addBufferListenerFunction) {
this.addBufferListenerFunction = addBufferListenerFunction;
return this;
}
public Builder setIsDestroyedSupplier(Supplier<Boolean> isDestroyedSupplier) {
this.isDestroyedSupplier = isDestroyedSupplier;
return this;
}
public Builder setRequestMemorySegmentSupplier(
Supplier<MemorySegment> requestMemorySegmentSupplier) {
this.requestMemorySegmentSupplier = requestMemorySegmentSupplier;
return this;
}
public Builder setRequestMemorySegmentBlockingSupplier(
Supplier<MemorySegment> requestMemorySegmentBlockingSupplier) {
this.requestMemorySegmentBlockingSupplier = requestMemorySegmentBlockingSupplier;
return this;
}
public Builder setGetExpectedNumberOfMemorySegmentsSupplier(
Supplier<Integer> getExpectedNumberOfMemorySegmentsSupplier) {
this.getExpectedNumberOfMemorySegmentsSupplier =
getExpectedNumberOfMemorySegmentsSupplier;
return this;
}
public Builder setGetMinNumberOfMemorySegmentsSupplier(
Supplier<Integer> getMinNumberOfMemorySegmentsSupplier) {
this.getMinNumberOfMemorySegmentsSupplier = getMinNumberOfMemorySegmentsSupplier;
return this;
}
public Builder setGetMaxNumberOfMemorySegmentsSupplier(
Supplier<Integer> getMaxNumberOfMemorySegmentsSupplier) {
this.getMaxNumberOfMemorySegmentsSupplier = getMaxNumberOfMemorySegmentsSupplier;
return this;
}
public Builder setGetNumBuffersSupplier(Supplier<Integer> getNumBuffersSupplier) {
this.getNumBuffersSupplier = getNumBuffersSupplier;
return this;
}
public Builder setSetNumBuffersConsumer(Consumer<Integer> setNumBuffersConsumer) {
this.setNumBuffersConsumer = setNumBuffersConsumer;
return this;
}
public Builder setSetMaxOverdraftBuffersPerGateConsumer(
Consumer<Integer> setMaxOverdraftBuffersPerGateConsumer) {
this.setMaxOverdraftBuffersPerGateConsumer = setMaxOverdraftBuffersPerGateConsumer;
return this;
}
public Builder setGetMaxOverdraftBuffersPerGateSupplier(
Supplier<Integer> getMaxOverdraftBuffersPerGateSupplier) {
this.getMaxOverdraftBuffersPerGateSupplier = getMaxOverdraftBuffersPerGateSupplier;
return this;
}
public Builder setGetNumberOfAvailableMemorySegmentsSupplier(
Supplier<Integer> getNumberOfAvailableMemorySegmentsSupplier) {
this.getNumberOfAvailableMemorySegmentsSupplier =
getNumberOfAvailableMemorySegmentsSupplier;
return this;
}
public Builder setBestEffortGetNumOfUsedBuffersSupplier(
Supplier<Integer> bestEffortGetNumOfUsedBuffersSupplier) {
this.bestEffortGetNumOfUsedBuffersSupplier = bestEffortGetNumOfUsedBuffersSupplier;
return this;
}
public Builder setRecycleConsumer(Consumer<MemorySegment> recycleConsumer) {
this.recycleConsumer = recycleConsumer;
return this;
}
public Builder setGetAvailableFutureSupplier(
Supplier<CompletableFuture<?>> getAvailableFutureSupplier) {
this.getAvailableFutureSupplier = getAvailableFutureSupplier;
return this;
}
}
}