golang: Add namespace in Resource and metadata (#753)

* metadata和resource支持namespace字段

* add ut

---------

Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/golang/client.go b/golang/client.go
index 71c4819..9f60e06 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -377,7 +377,8 @@
 func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest {
 	return &v2.QueryRouteRequest{
 		Topic: &v2.Resource{
-			Name: topic,
+			Name:              topic,
+			ResourceNamespace: cli.config.NameSpace,
 		},
 		Endpoints: cli.accessPoint,
 	}
@@ -599,6 +600,8 @@
 		innerMD.VersionValue,
 		innerMD.ClintID,
 		cli.clientID,
+		innerMD.NameSpace,
+		cli.config.NameSpace,
 		innerMD.DateTime,
 		now,
 		innerMD.Authorization,
diff --git a/golang/client_test.go b/golang/client_test.go
index 4549bdf..716ca8c 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -298,7 +298,8 @@
 func Test_routeEqual(t *testing.T) {
 	oldMq := &v2.MessageQueue{
 		Topic: &v2.Resource{
-			Name: "topic-test",
+			Name:              "topic-test",
+			ResourceNamespace: "ns-test",
 		},
 		Id:         0,
 		Permission: v2.Permission_READ_WRITE,
@@ -313,7 +314,8 @@
 	}
 	newMq := &v2.MessageQueue{
 		Topic: &v2.Resource{
-			Name: "topic-test",
+			Name:              "topic-test",
+			ResourceNamespace: "ns-test",
 		},
 		Id:         0,
 		Permission: v2.Permission_READ_WRITE,
diff --git a/golang/metadata/metadata.go b/golang/metadata/metadata.go
index 5a8b919..ccc6627 100644
--- a/golang/metadata/metadata.go
+++ b/golang/metadata/metadata.go
@@ -18,11 +18,11 @@
 package metadata
 
 const (
-	LanguageKey = "x-mq-language"
-	ProtocolKey = "x-mq-protocol"
-	RequestID   = "x-mq-request-id"
-	VersionKey  = "x-mq-client-version"
-	// NameSpace     = "x-mq-namespace"
+	LanguageKey   = "x-mq-language"
+	ProtocolKey   = "x-mq-protocol"
+	RequestID     = "x-mq-request-id"
+	VersionKey    = "x-mq-client-version"
+	NameSpace     = "x-mq-namespace"
 	DateTime      = "x-mq-date-time"
 	ClintID       = "x-mq-client-id"
 	Authorization = "authorization"
diff --git a/golang/producer.go b/golang/producer.go
index f2fbf59..3e878a9 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -155,7 +155,8 @@
 	}
 	for _, topic := range po.topics {
 		topicResource := &v2.Resource{
-			Name: topic,
+			Name:              topic,
+			ResourceNamespace: config.NameSpace,
 		}
 		p.pSetting.topics.Store(topic, topicResource)
 	}
@@ -287,7 +288,7 @@
 		var err error
 		pubMessage = uMsg.pubMsg
 		if uMsg.pubMsg == nil {
-			pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
+			pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled)
 			if err != nil {
 				return nil, err
 			}
@@ -315,7 +316,8 @@
 	}
 	if _, ok := p.pSetting.topics.Load(topicName); !ok {
 		p.pSetting.topics.Store(topicName, &v2.Resource{
-			Name: topicName,
+			Name:              topicName,
+			ResourceNamespace: p.cli.config.NameSpace,
 		})
 	}
 	pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
@@ -362,7 +364,7 @@
 		return nil, fmt.Errorf("producer is not running")
 	}
 	t := transaction.(*transactionImpl)
-	pubMessage, err := t.tryAddMessage(msg)
+	pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
 	if err != nil {
 		return nil, err
 	}
@@ -394,7 +396,8 @@
 	ctx = p.cli.Sign(ctx)
 	request := &v2.EndTransactionRequest{
 		Topic: &v2.Resource{
-			Name: messageCommon.topic,
+			Name:              messageCommon.topic,
+			ResourceNamespace: p.cli.config.NameSpace,
 		},
 		MessageId:     messageId,
 		TransactionId: transactionId,
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 456202a..ea55dd1 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -26,6 +26,7 @@
 )
 
 type PublishingMessage struct {
+	namespace    string
 	msg          *Message
 	encoding     v2.Encoding
 	messageId    string
@@ -33,7 +34,7 @@
 	traceContext *string
 }
 
-var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
+var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
 	if msg == nil {
 		return nil, fmt.Errorf("message is nil")
 	}
@@ -51,6 +52,8 @@
 	// No need to compress message body.
 	pMsg.encoding = v2.Encoding_IDENTITY
 
+	pMsg.namespace = namespace
+
 	// Generate message id.
 	pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
 	// Normal message.
@@ -84,7 +87,8 @@
 	msg := &v2.Message{
 		Topic: &v2.Resource{
 			// ResourceNamespace: b.conn.Config().NameSpace,
-			Name: pMsg.msg.Topic,
+			Name:              pMsg.msg.Topic,
+			ResourceNamespace: pMsg.namespace,
 		},
 		SystemProperties: &v2.SystemProperties{
 			Keys:          pMsg.msg.GetKeys(),
diff --git a/golang/publishing_message_test.go b/golang/publishing_message_test.go
new file mode 100644
index 0000000..5a030e4
--- /dev/null
+++ b/golang/publishing_message_test.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import "testing"
+
+func TestNewPublishingMessage(t *testing.T) {
+	namespace := "ns-test"
+	pSetting := &producerSettings{}
+	msg := &Message{}
+	pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
+	if err != nil {
+		t.Error(err)
+	}
+	v2Msg, err := pMsg.toProtobuf()
+	if err != nil {
+		t.Error(err)
+	}
+	if v2Msg.GetTopic().GetResourceNamespace() != namespace {
+		t.Error("namespace not equal")
+	}
+}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 8abfb41..f867696 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -75,10 +75,12 @@
 	ctx := sc.cli.Sign(context.Background())
 	request := &v2.ChangeInvisibleDurationRequest{
 		Topic: &v2.Resource{
-			Name: messageView.GetTopic(),
+			Name:              messageView.GetTopic(),
+			ResourceNamespace: sc.cli.config.NameSpace,
 		},
 		Group: &v2.Resource{
-			Name: sc.groupName,
+			Name:              sc.groupName,
+			ResourceNamespace: sc.cli.config.NameSpace,
 		},
 		ReceiptHandle:     messageView.GetReceiptHandle(),
 		InvisibleDuration: durationpb.New(invisibleDuration),
@@ -166,7 +168,8 @@
 
 	return &v2.ReceiveMessageRequest{
 		Group: &v2.Resource{
-			Name: sc.groupName,
+			Name:              sc.groupName,
+			ResourceNamespace: sc.cli.config.NameSpace,
 		},
 		MessageQueue: messageQueue,
 		FilterExpression: &v2.FilterExpression{
@@ -183,7 +186,8 @@
 	return &v2.AckMessageRequest{
 		Group: sc.scSettings.groupName,
 		Topic: &v2.Resource{
-			Name: messageView.GetTopic(),
+			Name:              messageView.GetTopic(),
+			ResourceNamespace: sc.cli.config.NameSpace,
 		},
 		Entries: []*v2.AckMessageEntry{
 			{
@@ -369,7 +373,8 @@
 		requestTimeout: sc.cli.opts.timeout,
 
 		groupName: &v2.Resource{
-			Name: sc.groupName,
+			Name:              sc.groupName,
+			ResourceNamespace: config.NameSpace,
 		},
 		longPollingTimeout:      scOpts.awaitDuration,
 		subscriptionExpressions: scOpts.subscriptionExpressions,
diff --git a/golang/simple_consumer_options.go b/golang/simple_consumer_options.go
index 857a419..253b3c3 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -156,7 +156,8 @@
 	subscriptions := make([]*v2.SubscriptionEntry, 0)
 	for k, v := range sc.subscriptionExpressions {
 		topic := &v2.Resource{
-			Name: k,
+			Name:              k,
+			ResourceNamespace: sc.groupName.GetResourceNamespace(),
 		}
 		filterExpression := &v2.FilterExpression{
 			Expression: v.expression,
diff --git a/golang/transaction.go b/golang/transaction.go
index f674110..8983961 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -88,7 +88,7 @@
 	return nil
 }
 
-func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) {
+func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) {
 	t.messagesLock.RLock()
 	if len(t.messages) > MAX_MESSAGE_NUM {
 		return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
@@ -100,7 +100,7 @@
 	if len(t.messages) > MAX_MESSAGE_NUM {
 		return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
 	}
-	pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true)
+	pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true)
 	if err != nil {
 		return nil, err
 	}