[ISSUE #660] Add namespace in java client (#661)
* Add namespace for java client
* Add checkNotNull
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 2714810..042c352 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,17 +28,19 @@
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
private final boolean sslEnabled;
+ private final String namespace;
/**
* The caller is supposed to have validated the arguments and handled throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
- Duration requestTimeout, boolean sslEnabled) {
+ Duration requestTimeout, boolean sslEnabled, String namespace) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
+ this.namespace = namespace;
}
public static ClientConfigurationBuilder newBuilder() {
@@ -60,4 +62,8 @@
public boolean isSslEnabled() {
return sslEnabled;
}
+
+ public String getNamespace() {
+ return namespace;
+ }
}
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index eb40c88..25cc54a 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -31,6 +31,7 @@
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
+ private String namespace = "";
/**
* Configure the access point with which the SDK should communicate.
@@ -83,6 +84,16 @@
}
/**
+ * Configure namespace for client
+ * @param namespace namespace
+ * @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
+ */
+ public ClientConfigurationBuilder setNamespace(String namespace) {
+ this.namespace = checkNotNull(namespace, "namespace should not be null");
+ return this;
+ }
+
+ /**
* Finalize the build of {@link ClientConfiguration}.
*
* @return the client configuration builder instance.
@@ -90,6 +101,6 @@
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
- return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled);
+ return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 846f0ce..dac5fe0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -608,7 +608,10 @@
}
protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
- Resource topicResource = Resource.newBuilder().setName(topic).build();
+ Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(topic)
+ .build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).build();
final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index f992301..88b335c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -25,14 +25,16 @@
import org.apache.rocketmq.client.java.route.Endpoints;
public abstract class Settings {
+ protected final String namespace;
protected final ClientId clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;
- public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
- Duration requestTimeout) {
+ public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
+ RetryPolicy retryPolicy, Duration requestTimeout) {
+ this.namespace = namespace;
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
@@ -40,8 +42,9 @@
this.requestTimeout = requestTimeout;
}
- public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) {
- this(clientId, clientType, accessPoint, null, requestTimeout);
+ public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
+ Duration requestTimeout) {
+ this(namespace, clientId, clientType, accessPoint, null, requestTimeout);
}
public abstract apache.rocketmq.v2.Settings toProtobuf();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a807fd2..795c2ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -123,7 +123,10 @@
}
private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
- final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
+ final Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic())
+ .build();
final AckMessageEntry entry = AckMessageEntry.newBuilder()
.setMessageId(messageView.getMessageId().toString())
.setReceiptHandle(messageView.getReceiptHandle())
@@ -134,7 +137,9 @@
private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
Duration invisibleDuration) {
- final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
+ final Resource topicResource = Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic()).build();
return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
@@ -219,7 +224,10 @@
}
protected Resource getProtobufGroup() {
- return Resource.newBuilder().setName(consumerGroup).build();
+ return Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(consumerGroup)
+ .build();
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 295367a..2cbc6d0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -127,9 +127,9 @@
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
- Resource groupResource = new Resource(consumerGroup);
- this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource,
- clientConfiguration.getRequestTimeout(), subscriptionExpressions);
+ Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
+ this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+ endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
@@ -261,7 +261,10 @@
}
private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
- apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
+ apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(topic)
+ .build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
@@ -500,7 +503,10 @@
private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
MessageViewImpl messageView) {
final apache.rocketmq.v2.Resource topicResource =
- apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(messageView.getTopic())
+ .build();
return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 70338b0..26a66a1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -50,9 +50,9 @@
private volatile int receiveBatchSize = 32;
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
- public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
+ public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Map<String, FilterExpression> subscriptionExpression) {
- super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
+ super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
}
@@ -75,7 +75,10 @@
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic =
- apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(entry.getKey())
+ .build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 5d6092a..e73774e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -74,9 +74,9 @@
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
- Resource groupResource = new Resource(consumerGroup);
- this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints,
- groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
+ Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
+ this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+ endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 0ee02ed..4719376 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -45,9 +45,9 @@
private final Duration longPollingTimeout;
private final Map<String, FilterExpression> subscriptionExpressions;
- public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
+ public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Duration longPollingTimeout, Map<String, FilterExpression> subscriptionExpression) {
- super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
+ super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
this.longPollingTimeout = longPollingTimeout;
@@ -59,7 +59,9 @@
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder()
- .setName(entry.getKey()).build();
+ .setResourceNamespace(namespace)
+ .setName(entry.getKey())
+ .build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1db6e17..450a68d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -101,8 +101,8 @@
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
- this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy,
- clientConfiguration.getRequestTimeout(), topics);
+ this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
+ retryPolicy, clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
@@ -259,7 +259,10 @@
String transactionId, final TransactionResolution resolution) throws ClientException {
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
- .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
+ .setTopic(apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(clientConfiguration.getNamespace())
+ .setName(generalMessage.getTopic())
+ .build());
switch (resolution) {
case COMMIT:
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
@@ -415,7 +418,8 @@
*/
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
- .map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+ .map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
+ .collect(Collectors.toList());
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index f1605c3..29159ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -45,9 +45,9 @@
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
private volatile boolean validateMessageType = true;
- public PublishingSettings(ClientId clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
- Duration requestTimeout, Set<String> topics) {
- super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
+ public PublishingSettings(String namespace, ClientId clientId, Endpoints accessPoint,
+ ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, Set<String> topics) {
+ super(namespace, clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
this.topics = topics;
}
@@ -62,8 +62,13 @@
@Override
public apache.rocketmq.v2.Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
- .addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
- .collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
+ .addAllTopics(topics.stream().map(name -> Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(name)
+ .build())
+ .collect(Collectors.toList()))
+ .setValidateMessageType(validateMessageType)
+ .build();
final apache.rocketmq.v2.Settings.Builder builder = apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 0795c82..6af6d73 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -87,7 +87,7 @@
* <p>This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
- public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
+ public apache.rocketmq.v2.Message toProtobuf(String namespace, MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
@@ -112,7 +112,7 @@
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
final SystemProperties systemProperties = systemPropertiesBuilder.build();
- Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
+ Resource topicResource = Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
// Topic
.setTopic(topicResource)
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 20f31f8..a771fc2 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -41,12 +41,12 @@
@Test
public void testToProtobuf() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER);
@@ -54,26 +54,32 @@
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
Assert.assertEquals(subscriptionsList.size(), 1);
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG);
Assert.assertEquals(subscriptionEntry.getTopic(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
public void testToProtobufWithSqlExpression() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER);
@@ -81,14 +87,20 @@
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
Assert.assertEquals(subscriptionsList.size(), 1);
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL);
Assert.assertEquals(subscriptionEntry.getTopic(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
@@ -115,7 +127,7 @@
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
- final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+ final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
pushSubscriptionSettings.sync(settings);
}
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 06cf6e9..d3ea287 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -39,21 +39,24 @@
@Test
public void testToProtobuf() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
final Duration requestTimeout = Duration.ofSeconds(3);
final Duration longPollingTimeout = Duration.ofSeconds(15);
- final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId,
- fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE,
+ clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
final Settings settings = simpleSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos()));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos()));
final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
@@ -61,27 +64,33 @@
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG);
Assert.assertEquals(subscriptionEntry.getTopic(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
@Test
public void testToProtobufWithSqlExpression() {
- Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+ Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
+ "b=TRUE)", FilterExpressionType.SQL92));
final Duration requestTimeout = Duration.ofSeconds(3);
final Duration longPollingTimeout = Duration.ofSeconds(15);
- final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId,
- fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
+ final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE,
+ clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
final Settings settings = simpleSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos()));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_CONSUMER_GROUP_0)
+ .build());
Assert.assertFalse(subscription.getFifo());
Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos()));
final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
@@ -89,7 +98,10 @@
final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL);
Assert.assertEquals(subscriptionEntry.getTopic(),
- apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+ apache.rocketmq.v2.Resource.newBuilder()
+ .setResourceNamespace(FAKE_NAMESPACE)
+ .setName(FAKE_TOPIC_0)
+ .build());
}
}
\ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ef6723c..8b74d04 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -86,6 +86,7 @@
import org.mockito.Mockito;
public class TestBase {
+ protected static final String FAKE_NAMESPACE = "foo-bar-namespace";
protected static final ClientId FAKE_CLIENT_ID = new ClientId();
protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0";
@@ -191,6 +192,14 @@
return new MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
}
+ protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String topic) {
+ return new MessageQueueImpl(fakePbMessageQueue0(
+ Resource.newBuilder()
+ .setResourceNamespace(namespace)
+ .setName(topic)
+ .build()));
+ }
+
protected MessageQueueImpl fakeMessageQueueImpl0() {
return new MessageQueueImpl(fakePbMessageQueue0());
}
@@ -379,8 +388,8 @@
}
protected PublishingSettings fakeProducerSettings() {
- return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(),
- Duration.ofSeconds(1), new HashSet<>());
+ return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, fakeEndpoints(),
+ fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
}
protected SendReceiptImpl fakeSendReceiptImpl(