[fix][broker][branch-2.10] limit the memory used by reads end-to-end
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index b946dc0..f3848b6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 5401ae3..3e7401b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -29,7 +29,6 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -72,7 +71,6 @@
public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
this.manager = manager;
this.ml = ml;
- this.pendingReadsManager = new PendingReadsManager(this);
this.interceptor = ml.getManagedLedgerInterceptor();
this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds();
this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -281,14 +279,14 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
- asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
+ asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null);
}
- void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+ void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
final AsyncCallbacks.ReadEntriesCallback callback =
- handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
+ handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader,
originalCallback, ctx, handle);
if (callback == null) {
return;
@@ -371,8 +369,10 @@
private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh,
long firstEntry, long lastEntry,
boolean shouldCacheEntry,
- AsyncCallbacks.ReadEntriesCallback originalCallback,
- Object ctx, InflightReadsLimiter.Handle handle) {
+ AsyncCallbacks.ReadEntriesCallback
+ originalCallback,
+ Object ctx,
+ InflightReadsLimiter.Handle handle) {
InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter();
if (pendingReadsLimiter.isDisabled()) {
return originalCallback;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 2b69581..e134512 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 86db5eb..23f6ca9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -527,12 +527,9 @@
}
long size = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(size);
- if (sendMessagesToConsumers(readType, entries)) {
- updatePendingBytesToDispatch(-size);
- readMoreEntriesAsync();
- } else {
- updatePendingBytesToDispatch(-size);
- } }
+ sendMessagesToConsumers(readType, entries);
+ updatePendingBytesToDispatch(-size);
+ }
protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 2c90e89..6f826cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -93,11 +93,8 @@
.getNextValidPosition((PositionImpl) entry.getPosition()));
long size = entry.getLength();
updatePendingBytesToDispatch(size);
- if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
- readMoreEntriesAsync();
- } else {
- updatePendingBytesToDispatch(-size);
- }
+ sendMessagesToConsumers(readType, Lists.newArrayList(entry));
+ updatePendingBytesToDispatch(-size);
ctx.recycle();
}