[ISSUE #10276] Fix PopConsumerService changeInvisibilityDuration losing CK record when visibilityTimeout collision (#10277)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index da3ccdc..9ab5eb6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -511,8 +511,10 @@
// No need to generate new records when the group does not exist,
// because these retry messages will not be consumed by anyone.
- if (brokerConfig.isPopReviveSkipIfGroupAbsent() &&
- !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId)) {
+ boolean skipWrite = brokerConfig.isPopReviveSkipIfGroupAbsent() &&
+ !brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(groupId);
+
+ if (skipWrite) {
log.info("PopConsumerService change invisibility skip, time={}, " +
"groupId={}, topicId={}, queueId={}, offset={}", popTime, groupId, topicId, queueId, offset);
} else {
@@ -525,7 +527,12 @@
}
}
- this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
+ // If the new CK has the same key as the old CK (same visibilityTimeout),
+ // the write already overwrites the old record in RocksDB, skip delete
+ // to avoid removing the newly written record.
+ if (skipWrite || ckRecord.getVisibilityTimeout() != ackRecord.getVisibilityTimeout()) {
+ this.popConsumerStore.deleteRecords(Collections.singletonList(ackRecord));
+ }
}
// Use broker escape bridge to support remote read