[ISSUE #660] Add namespace in java client (#661)

* Add namespace for java client

* Add checkNotNull
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 2714810..042c352 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,17 +28,19 @@
     private final SessionCredentialsProvider sessionCredentialsProvider;
     private final Duration requestTimeout;
     private final boolean sslEnabled;
+    private final String namespace;
 
     /**
      * The caller is supposed to have validated the arguments and handled throwing exceptions or
      * logging warnings already, so we avoid repeating args check here.
      */
     ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
-        Duration requestTimeout, boolean sslEnabled) {
+        Duration requestTimeout, boolean sslEnabled, String namespace) {
         this.endpoints = endpoints;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
         this.sslEnabled = sslEnabled;
+        this.namespace = namespace;
     }
 
     public static ClientConfigurationBuilder newBuilder() {
@@ -60,4 +62,8 @@
     public boolean isSslEnabled() {
         return sslEnabled;
     }
+
+    public String getNamespace() {
+        return namespace;
+    }
 }
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index eb40c88..25cc54a 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -31,6 +31,7 @@
     private SessionCredentialsProvider sessionCredentialsProvider = null;
     private Duration requestTimeout = Duration.ofSeconds(3);
     private boolean sslEnabled = true;
+    private String namespace = "";
 
     /**
      * Configure the access point with which the SDK should communicate.
@@ -83,6 +84,16 @@
     }
 
     /**
+     * Configure namespace for client
+     * @param namespace namespace
+     * @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
+     */
+    public ClientConfigurationBuilder setNamespace(String namespace) {
+        this.namespace = checkNotNull(namespace, "namespace should not be null");
+        return this;
+    }
+
+    /**
      * Finalize the build of {@link ClientConfiguration}.
      *
      * @return the client configuration builder instance.
@@ -90,6 +101,6 @@
     public ClientConfiguration build() {
         checkNotNull(endpoints, "endpoints should not be null");
         checkNotNull(requestTimeout, "requestTimeout should not be null");
-        return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled);
+        return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 846f0ce..dac5fe0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -608,7 +608,10 @@
     }
 
     protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
-        Resource topicResource = Resource.newBuilder().setName(topic).build();
+        Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(topic)
+            .build();
         final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
             .setEndpoints(endpoints.toProtobuf()).build();
         final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index f992301..88b335c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -25,14 +25,16 @@
 import org.apache.rocketmq.client.java.route.Endpoints;
 
 public abstract class Settings {
+    protected final String namespace;
     protected final ClientId clientId;
     protected final ClientType clientType;
     protected final Endpoints accessPoint;
     protected volatile RetryPolicy retryPolicy;
     protected final Duration requestTimeout;
 
-    public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
-        Duration requestTimeout) {
+    public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
+        RetryPolicy retryPolicy, Duration requestTimeout) {
+        this.namespace = namespace;
         this.clientId = clientId;
         this.clientType = clientType;
         this.accessPoint = accessPoint;
@@ -40,8 +42,9 @@
         this.requestTimeout = requestTimeout;
     }
 
-    public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) {
-        this(clientId, clientType, accessPoint, null, requestTimeout);
+    public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
+        Duration requestTimeout) {
+        this(namespace, clientId, clientType, accessPoint, null, requestTimeout);
     }
 
     public abstract apache.rocketmq.v2.Settings toProtobuf();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index a807fd2..795c2ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -123,7 +123,10 @@
     }
 
     private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
-        final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
+        final Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(messageView.getTopic())
+            .build();
         final AckMessageEntry entry = AckMessageEntry.newBuilder()
             .setMessageId(messageView.getMessageId().toString())
             .setReceiptHandle(messageView.getReceiptHandle())
@@ -134,7 +137,9 @@
 
     private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
         Duration invisibleDuration) {
-        final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
+        final Resource topicResource = Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(messageView.getTopic()).build();
         return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
             .setReceiptHandle(messageView.getReceiptHandle())
             .setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
@@ -219,7 +224,10 @@
     }
 
     protected Resource getProtobufGroup() {
-        return Resource.newBuilder().setName(consumerGroup).build();
+        return Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(consumerGroup)
+            .build();
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 295367a..2cbc6d0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -127,9 +127,9 @@
         int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
         this.clientConfiguration = clientConfiguration;
-        Resource groupResource = new Resource(consumerGroup);
-        this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource,
-            clientConfiguration.getRequestTimeout(), subscriptionExpressions);
+        Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
+        this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+            endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.subscriptionExpressions = subscriptionExpressions;
         this.cacheAssignments = new ConcurrentHashMap<>();
@@ -261,7 +261,10 @@
     }
 
     private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
-        apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
+        apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
+            .setResourceNamespace(clientConfiguration.getNamespace())
+            .setName(topic)
+            .build();
         return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
             .setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
     }
@@ -500,7 +503,10 @@
     private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
         MessageViewImpl messageView) {
         final apache.rocketmq.v2.Resource topicResource =
-            apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(clientConfiguration.getNamespace())
+                .setName(messageView.getTopic())
+                .build();
         return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
             .setReceiptHandle(messageView.getReceiptHandle())
             .setMessageId(messageView.getMessageId().toString())
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 70338b0..26a66a1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -50,9 +50,9 @@
     private volatile int receiveBatchSize = 32;
     private volatile Duration longPollingTimeout = Duration.ofSeconds(30);
 
-    public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
+    public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
         Duration requestTimeout, Map<String, FilterExpression> subscriptionExpression) {
-        super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
+        super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
     }
@@ -75,7 +75,10 @@
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
             apache.rocketmq.v2.Resource topic =
-                apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+                apache.rocketmq.v2.Resource.newBuilder()
+                    .setResourceNamespace(namespace)
+                    .setName(entry.getKey())
+                    .build();
             final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
                 apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 5d6092a..e73774e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -74,9 +74,9 @@
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
-        Resource groupResource = new Resource(consumerGroup);
-        this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints,
-            groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
+        Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
+        this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
+            endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 0ee02ed..4719376 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -45,9 +45,9 @@
     private final Duration longPollingTimeout;
     private final Map<String, FilterExpression> subscriptionExpressions;
 
-    public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
+    public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
         Duration requestTimeout, Duration longPollingTimeout, Map<String, FilterExpression> subscriptionExpression) {
-        super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
+        super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
         this.group = group;
         this.subscriptionExpressions = subscriptionExpression;
         this.longPollingTimeout = longPollingTimeout;
@@ -59,7 +59,9 @@
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
             apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder()
-                .setName(entry.getKey()).build();
+                .setResourceNamespace(namespace)
+                .setName(entry.getKey())
+                .build();
             final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
                 apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 1db6e17..450a68d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -101,8 +101,8 @@
         TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy,
-            clientConfiguration.getRequestTimeout(), topics);
+        this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
+            retryPolicy, clientConfiguration.getRequestTimeout(), topics);
         this.checker = checker;
         this.publishingRouteDataCache = new ConcurrentHashMap<>();
     }
@@ -259,7 +259,10 @@
         String transactionId, final TransactionResolution resolution) throws ClientException {
         final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
             .setMessageId(messageId.toString()).setTransactionId(transactionId)
-            .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
+            .setTopic(apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(clientConfiguration.getNamespace())
+                .setName(generalMessage.getTopic())
+                .build());
         switch (resolution) {
             case COMMIT:
                 builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
@@ -415,7 +418,8 @@
      */
     private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
         final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
-            .map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+            .map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
+            .collect(Collectors.toList());
         return SendMessageRequest.newBuilder().addAllMessages(messages).build();
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index f1605c3..29159ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -45,9 +45,9 @@
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
     private volatile boolean validateMessageType = true;
 
-    public PublishingSettings(ClientId clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
-        Duration requestTimeout, Set<String> topics) {
-        super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
+    public PublishingSettings(String namespace, ClientId clientId, Endpoints accessPoint,
+        ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, Set<String> topics) {
+        super(namespace, clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
         this.topics = topics;
     }
 
@@ -62,8 +62,13 @@
     @Override
     public apache.rocketmq.v2.Settings toProtobuf() {
         final Publishing publishing = Publishing.newBuilder()
-            .addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
-                .collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
+            .addAllTopics(topics.stream().map(name -> Resource.newBuilder()
+                    .setResourceNamespace(namespace)
+                    .setName(name)
+                    .build())
+                .collect(Collectors.toList()))
+            .setValidateMessageType(validateMessageType)
+            .build();
         final apache.rocketmq.v2.Settings.Builder builder = apache.rocketmq.v2.Settings.newBuilder()
             .setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 0795c82..6af6d73 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -87,7 +87,7 @@
      * <p>This method should be invoked before each message sending, because the born time is reset before each
      * invocation, which means that it should not be invoked ahead of time.
      */
-    public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
+    public apache.rocketmq.v2.Message toProtobuf(String namespace, MessageQueueImpl mq) {
         final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder =
             apache.rocketmq.v2.SystemProperties.newBuilder()
                 // Message keys
@@ -112,7 +112,7 @@
         // Message group
         this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
         final SystemProperties systemProperties = systemPropertiesBuilder.build();
-        Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
+        Resource topicResource = Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
         return apache.rocketmq.v2.Message.newBuilder()
             // Topic
             .setTopic(topicResource)
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 20f31f8..a771fc2 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -41,12 +41,12 @@
 
     @Test
     public void testToProtobuf() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER);
@@ -54,26 +54,32 @@
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
         Assert.assertEquals(subscriptionsList.size(), 1);
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
     public void testToProtobufWithSqlExpression() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
 
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), ClientType.PUSH_CONSUMER);
@@ -81,14 +87,20 @@
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
         Assert.assertEquals(subscriptionsList.size(), 1);
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
@@ -115,7 +127,7 @@
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
-        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(clientId,
+        final PushSubscriptionSettings pushSubscriptionSettings = new PushSubscriptionSettings(FAKE_NAMESPACE, clientId,
             fakeEndpoints(), groupResource, requestTimeout, subscriptionExpression);
         pushSubscriptionSettings.sync(settings);
     }
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 06cf6e9..d3ea287 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -39,21 +39,24 @@
 
     @Test
     public void testToProtobuf() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression());
         final Duration requestTimeout = Duration.ofSeconds(3);
         final Duration longPollingTimeout = Duration.ofSeconds(15);
-        final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId,
-            fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE,
+            clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
         final Settings settings = simpleSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos()));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos()));
         final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
@@ -61,27 +64,33 @@
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.TAG);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
     @Test
     public void testToProtobufWithSqlExpression() {
-        Resource groupResource = new Resource(FAKE_CONSUMER_GROUP_0);
+        Resource groupResource = new Resource(FAKE_NAMESPACE, FAKE_CONSUMER_GROUP_0);
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
         subscriptionExpression.put(FAKE_TOPIC_0, new FilterExpression("(a > 10 AND a < 100) OR (b IS NOT NULL AND "
             + "b=TRUE)", FilterExpressionType.SQL92));
         final Duration requestTimeout = Duration.ofSeconds(3);
         final Duration longPollingTimeout = Duration.ofSeconds(15);
-        final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId,
-            fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
+        final SimpleSubscriptionSettings simpleSubscriptionSettings = new SimpleSubscriptionSettings(FAKE_NAMESPACE,
+            clientId, fakeEndpoints(), groupResource, requestTimeout, longPollingTimeout, subscriptionExpression);
         final Settings settings = simpleSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), ClientType.SIMPLE_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), Durations.fromNanos(requestTimeout.toNanos()));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_CONSUMER_GROUP_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_CONSUMER_GROUP_0)
+                .build());
         Assert.assertFalse(subscription.getFifo());
         Assert.assertEquals(subscription.getLongPollingTimeout(), Durations.fromNanos(longPollingTimeout.toNanos()));
         final List<SubscriptionEntry> subscriptionsList = subscription.getSubscriptionsList();
@@ -89,7 +98,10 @@
         final SubscriptionEntry subscriptionEntry = subscriptionsList.get(0);
         Assert.assertEquals(subscriptionEntry.getExpression().getType(), FilterType.SQL);
         Assert.assertEquals(subscriptionEntry.getTopic(),
-            apache.rocketmq.v2.Resource.newBuilder().setName(FAKE_TOPIC_0).build());
+            apache.rocketmq.v2.Resource.newBuilder()
+                .setResourceNamespace(FAKE_NAMESPACE)
+                .setName(FAKE_TOPIC_0)
+                .build());
     }
 
 }
\ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ef6723c..8b74d04 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -86,6 +86,7 @@
 import org.mockito.Mockito;
 
 public class TestBase {
+    protected static final String FAKE_NAMESPACE = "foo-bar-namespace";
     protected static final ClientId FAKE_CLIENT_ID = new ClientId();
 
     protected static final String FAKE_TOPIC_0 = "foo-bar-topic-0";
@@ -191,6 +192,14 @@
         return new MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
     }
 
+    protected MessageQueueImpl fakeMessageQueueImpl(String namespace, String topic) {
+        return new MessageQueueImpl(fakePbMessageQueue0(
+            Resource.newBuilder()
+                .setResourceNamespace(namespace)
+                .setName(topic)
+                .build()));
+    }
+
     protected MessageQueueImpl fakeMessageQueueImpl0() {
         return new MessageQueueImpl(fakePbMessageQueue0());
     }
@@ -379,8 +388,8 @@
     }
 
     protected PublishingSettings fakeProducerSettings() {
-        return new PublishingSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(),
-            Duration.ofSeconds(1), new HashSet<>());
+        return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, fakeEndpoints(),
+            fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
     }
 
     protected SendReceiptImpl fakeSendReceiptImpl(