Bump rocketmq-proto to 2.0.2 (#364)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index c4ee975..3efcb54 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -242,17 +242,19 @@
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
- FilterExpression filterExpression) {
+ FilterExpression filterExpression, Duration longPollingTimeout) {
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+ .setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(true).build();
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, MessageQueueImpl mq,
- FilterExpression filterExpression, Duration invisibleDuration) {
+ FilterExpression filterExpression, Duration invisibleDuration, Duration longPollingTimeout) {
final com.google.protobuf.Duration duration = Durations.fromNanos(invisibleDuration.toNanos());
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+ .setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 8443c29..ff4a13d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -219,7 +219,9 @@
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
- final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
+ final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
+ final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
+ longPollingTimeout);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
@@ -227,7 +229,7 @@
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
- consumer.getPushConsumerSettings().getLongPollingTimeout());
+ longPollingTimeout);
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 4b7ddb1..5d6092a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -202,7 +202,7 @@
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
- invisibleDuration);
+ invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 4e497cd..b4a2971 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -134,7 +134,7 @@
when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
- any(FilterExpression.class))).thenReturn(request);
+ any(FilterExpression.class), any(Duration.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
diff --git a/java/pom.xml b/java/pom.xml
index 689b3df..b97290e 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,7 +47,7 @@
~ 1. Whether it is essential, because the current shaded jar is fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
- <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
+ <rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<annotations-api.version>6.0.53</annotations-api.version>
<protobuf.version>3.21.7</protobuf.version>
<grpc.version>1.50.0</grpc.version>