| commit | 52800a3b92b561aa51efd8d5abbec7d053067633 | [log] [tgz] |
|---|---|---|
| author | Claus Ibsen <claus.ibsen@gmail.com> | Fri Jul 05 12:40:03 2024 +0200 |
| committer | Claus Ibsen <claus.ibsen@gmail.com> | Fri Jul 05 12:40:03 2024 +0200 |
| tree | e5a35c57724992296eaa54951ec2cf331301ac9c | |
| parent | 2d1eea8c7c6c6ba3abeadb7fe6de2fd6a9bb3e27 [diff] |
CAMEL-20227: camel-kafka - Kafka offset advances when using Pausable EIP and messages are lost when resumed
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index d93b616..2719a2b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -346,8 +346,8 @@ ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { if (!consumerListener.afterConsume(consumer)) { - // because the consumer will just poll the same messages again (then we need to avoid consuming asap - // and need to simulate a poll duration + // because the consumer will just poll the same messages again + // there we need to avoid consuming asap, by simulating 1 idle poll duration try { Thread.sleep(pollTimeoutMs); } catch (InterruptedException e) {
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java index 3ad26fc..60ac902 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerPauseableEeiIT.java
@@ -70,12 +70,14 @@ @RepeatedTest(1) public void kafkaPauseableEip() throws Exception { MockEndpoint result = contextExtension.getMockEndpoint("mock:result"); + // we receive all 15 records result.expectedMessageCount(15); sendRecords(0, 15, TOPIC); result.assertIsSatisfied(); + // but some were temporary paused so the counter should be higher assertTrue(count > 20); }