KAFKA-19259: Async consumer fetch intermittent delays on console consumer (#19980)
There’s a difference in the two consumers’ `pollForFetches()` methods in
this case: `ClassicKafkaConsumer` doesn't block waiting for data in the
fetch buffer, but `AsyncKafkaConsumer` does.
In `ClassicKafkaConsumer.pollForFetches()`, after enqueuing the `FETCH`
request, the consumer makes a call to `ConsumerNetworkClient.poll()`. In
most cases `poll()` returns almost immediately because it successfully
sent the `FETCH` request. So even when the `pollTimeout` value is, e.g.
3000, the call to `ConsumerNetworkClient.poll()` doesn't block that long
waiting for a response.
After sending out a `FETCH` request, `AsyncKafkaConsumer` then calls
`FetchBuffer.awaitNotEmpty()` and proceeds to block there for the full
length of the timeout. In some cases, the response to the `FETCH` comes
back with no results, which doesn't unblock
`FetchBuffer.awaitNotEmpty()`. So because the application thread is
still waiting for data in the buffer, it remains blocked, preventing any
more `FETCH` requests from being sent, causing the long pauses in the
console consumer.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index c69c9c3..13e681c 100644
--- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -63,6 +63,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +110,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
@ClusterTestDefaults(
types = {Type.KRAFT},
@@ -1593,6 +1595,75 @@
}
}
+ @ClusterTest
+ public void testClassicConsumerStallBetweenPoll() throws Exception {
+ testStallBetweenPoll(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerStallBetweenPoll() throws Exception {
+ testStallBetweenPoll(GroupProtocol.CONSUMER);
+ }
+
+ /**
+ * This test is to prove that the intermittent stalling that has been experienced when using the asynchronous
+ * consumer, as filed under KAFKA-19259, have been fixed.
+ *
+ * <p/>
+ *
+ * The basic idea is to have one thread that produces a record every 500 ms. and the main thread that consumes
+ * records without pausing between polls for much more than the produce delay. In the test case filed in
+ * KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds despite records being produced every second.
+ */
+ private void testStallBetweenPoll(GroupProtocol groupProtocol) throws Exception {
+ var testTopic = "stall-test-topic";
+ var numPartitions = 6;
+ cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
+
+ // The producer must produce slowly to tickle the scenario.
+ var produceDelay = 500;
+
+ var executor = Executors.newScheduledThreadPool(1);
+
+ try (var producer = cluster.producer()) {
+ // Start a thread running that produces records at a relative trickle.
+ executor.scheduleWithFixedDelay(
+ () -> producer.send(new ProducerRecord<>(testTopic, TestUtils.randomBytes(64))),
+ 0,
+ produceDelay,
+ TimeUnit.MILLISECONDS
+ );
+
+ Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
+
+ // Assign a tolerance for how much time is allowed to pass between Consumer.poll() calls given that there
+ // should be *at least* one record to read every second.
+ var pollDelayTolerance = 2000;
+
+ try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
+ consumer.subscribe(List.of(testTopic));
+
+ // This is here to allow the consumer time to settle the group membership/assignment.
+ awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
+
+ // Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
+ // exceed the delay threshold defined above.
+ var beforePoll = System.currentTimeMillis();
+ consumer.poll(Duration.ofSeconds(5));
+ consumer.poll(Duration.ofSeconds(5));
+ var afterPoll = System.currentTimeMillis();
+ var pollDelay = afterPoll - beforePoll;
+
+ if (pollDelay > pollDelayTolerance)
+ fail("Detected a stall of " + pollDelay + " ms between Consumer.poll() invocations despite a Producer producing records every " + produceDelay + " ms");
+ } finally {
+ executor.shutdownNow();
+ // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
+ assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor did not terminate");
+ }
+ }
+ }
+
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 9d96712..e3e52f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -147,6 +147,7 @@
* @param data {@link FetchSessionHandler.FetchRequestData} that represents the session data
* @param resp {@link ClientResponse} from which the {@link FetchResponse} will be retrieved
*/
+ @SuppressWarnings("NPathComplexity")
protected void handleFetchSuccess(final Node fetchTarget,
final FetchSessionHandler.FetchRequestData data,
final ClientResponse resp) {
@@ -174,6 +175,8 @@
final Set<TopicPartition> partitions = new HashSet<>(responseData.keySet());
final FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions);
+ boolean needsWakeup = true;
+
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
TopicPartition partition = entry.getKey();
@@ -220,8 +223,14 @@
metricAggregator,
fetchOffset);
fetchBuffer.add(completedFetch);
+ needsWakeup = false;
}
+ // "Wake" the fetch buffer on any response, even if it's empty, to allow the consumer to not block
+ // indefinitely waiting on the fetch buffer to get data.
+ if (needsWakeup)
+ fetchBuffer.wakeup();
+
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
List<Node> leaderNodes = new ArrayList<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 5512c96..93ed987 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1811,7 +1811,7 @@
// use of a shorter, dedicated "pollTimer" here which updates "timer" so that calling method (poll) will
// correctly handle the overall timeout.
try {
- fetchBuffer.awaitNotEmpty(pollTimer);
+ fetchBuffer.awaitWakeup(pollTimer);
} catch (InterruptException e) {
log.trace("Interrupt during fetch", e);
throw e;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
index 23adf9c..6cf5bc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
@@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -51,7 +52,7 @@
private final Logger log;
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final Lock lock;
- private final Condition notEmptyCondition;
+ private final Condition blockingCondition;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private final AtomicBoolean wokenup = new AtomicBoolean(false);
@@ -62,7 +63,7 @@
this.log = logContext.logger(FetchBuffer.class);
this.completedFetches = new ConcurrentLinkedQueue<>();
this.lock = new ReentrantLock();
- this.notEmptyCondition = lock.newCondition();
+ this.blockingCondition = lock.newCondition();
}
/**
@@ -95,13 +96,7 @@
}
void add(CompletedFetch completedFetch) {
- try {
- lock.lock();
- completedFetches.add(completedFetch);
- notEmptyCondition.signalAll();
- } finally {
- lock.unlock();
- }
+ addAll(List.of(completedFetch));
}
void addAll(Collection<CompletedFetch> completedFetches) {
@@ -111,7 +106,8 @@
try {
lock.lock();
this.completedFetches.addAll(completedFetches);
- notEmptyCondition.signalAll();
+ wokenup.set(true);
+ blockingCondition.signalAll();
} finally {
lock.unlock();
}
@@ -154,23 +150,23 @@
}
/**
- * Allows the caller to await presence of data in the buffer. The method will block, returning only
+ * Allows the caller to await a response from the broker for requested data. The method will block, returning only
* under one of the following conditions:
*
* <ol>
- * <li>The buffer was already non-empty on entry</li>
- * <li>The buffer was populated during the wait</li>
+ * <li>The buffer was already woken</li>
+ * <li>The buffer was woken during the wait</li>
* <li>The remaining time on the {@link Timer timer} elapsed</li>
* <li>The thread was interrupted</li>
* </ol>
*
* @param timer Timer that provides time to wait
*/
- void awaitNotEmpty(Timer timer) {
+ void awaitWakeup(Timer timer) {
try {
lock.lock();
- while (isEmpty() && !wokenup.compareAndSet(true, false)) {
+ while (!wokenup.compareAndSet(true, false)) {
// Update the timer before we head into the loop in case it took a while to get the lock.
timer.update();
@@ -185,7 +181,7 @@
break;
}
- if (!notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
+ if (!blockingCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
break;
}
}
@@ -198,10 +194,10 @@
}
void wakeup() {
- wokenup.set(true);
try {
lock.lock();
- notEmptyCondition.signalAll();
+ wokenup.set(true);
+ blockingCondition.signalAll();
} finally {
lock.unlock();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
index 9d6b0c2..5b2f6d6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -177,7 +177,7 @@
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
final Thread waitingThread = new Thread(() -> {
final Timer timer = time.timer(Duration.ofMinutes(1));
- fetchBuffer.awaitNotEmpty(timer);
+ fetchBuffer.awaitWakeup(timer);
});
waitingThread.start();
fetchBuffer.wakeup();