golang: Add namespace in Resource and metadata (#753)
* metadata和resource支持namespace字段
* add ut
---------
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/golang/client.go b/golang/client.go
index 71c4819..9f60e06 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -377,7 +377,8 @@
func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest {
return &v2.QueryRouteRequest{
Topic: &v2.Resource{
- Name: topic,
+ Name: topic,
+ ResourceNamespace: cli.config.NameSpace,
},
Endpoints: cli.accessPoint,
}
@@ -599,6 +600,8 @@
innerMD.VersionValue,
innerMD.ClintID,
cli.clientID,
+ innerMD.NameSpace,
+ cli.config.NameSpace,
innerMD.DateTime,
now,
innerMD.Authorization,
diff --git a/golang/client_test.go b/golang/client_test.go
index 4549bdf..716ca8c 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -298,7 +298,8 @@
func Test_routeEqual(t *testing.T) {
oldMq := &v2.MessageQueue{
Topic: &v2.Resource{
- Name: "topic-test",
+ Name: "topic-test",
+ ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
@@ -313,7 +314,8 @@
}
newMq := &v2.MessageQueue{
Topic: &v2.Resource{
- Name: "topic-test",
+ Name: "topic-test",
+ ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
diff --git a/golang/metadata/metadata.go b/golang/metadata/metadata.go
index 5a8b919..ccc6627 100644
--- a/golang/metadata/metadata.go
+++ b/golang/metadata/metadata.go
@@ -18,11 +18,11 @@
package metadata
const (
- LanguageKey = "x-mq-language"
- ProtocolKey = "x-mq-protocol"
- RequestID = "x-mq-request-id"
- VersionKey = "x-mq-client-version"
- // NameSpace = "x-mq-namespace"
+ LanguageKey = "x-mq-language"
+ ProtocolKey = "x-mq-protocol"
+ RequestID = "x-mq-request-id"
+ VersionKey = "x-mq-client-version"
+ NameSpace = "x-mq-namespace"
DateTime = "x-mq-date-time"
ClintID = "x-mq-client-id"
Authorization = "authorization"
diff --git a/golang/producer.go b/golang/producer.go
index f2fbf59..3e878a9 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -155,7 +155,8 @@
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
- Name: topic,
+ Name: topic,
+ ResourceNamespace: config.NameSpace,
}
p.pSetting.topics.Store(topic, topicResource)
}
@@ -287,7 +288,7 @@
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
- pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
+ pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
@@ -315,7 +316,8 @@
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
- Name: topicName,
+ Name: topicName,
+ ResourceNamespace: p.cli.config.NameSpace,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
@@ -362,7 +364,7 @@
return nil, fmt.Errorf("producer is not running")
}
t := transaction.(*transactionImpl)
- pubMessage, err := t.tryAddMessage(msg)
+ pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
if err != nil {
return nil, err
}
@@ -394,7 +396,8 @@
ctx = p.cli.Sign(ctx)
request := &v2.EndTransactionRequest{
Topic: &v2.Resource{
- Name: messageCommon.topic,
+ Name: messageCommon.topic,
+ ResourceNamespace: p.cli.config.NameSpace,
},
MessageId: messageId,
TransactionId: transactionId,
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 456202a..ea55dd1 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -26,6 +26,7 @@
)
type PublishingMessage struct {
+ namespace string
msg *Message
encoding v2.Encoding
messageId string
@@ -33,7 +34,7 @@
traceContext *string
}
-var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
+var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
if msg == nil {
return nil, fmt.Errorf("message is nil")
}
@@ -51,6 +52,8 @@
// No need to compress message body.
pMsg.encoding = v2.Encoding_IDENTITY
+ pMsg.namespace = namespace
+
// Generate message id.
pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
// Normal message.
@@ -84,7 +87,8 @@
msg := &v2.Message{
Topic: &v2.Resource{
// ResourceNamespace: b.conn.Config().NameSpace,
- Name: pMsg.msg.Topic,
+ Name: pMsg.msg.Topic,
+ ResourceNamespace: pMsg.namespace,
},
SystemProperties: &v2.SystemProperties{
Keys: pMsg.msg.GetKeys(),
diff --git a/golang/publishing_message_test.go b/golang/publishing_message_test.go
new file mode 100644
index 0000000..5a030e4
--- /dev/null
+++ b/golang/publishing_message_test.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import "testing"
+
+func TestNewPublishingMessage(t *testing.T) {
+ namespace := "ns-test"
+ pSetting := &producerSettings{}
+ msg := &Message{}
+ pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
+ if err != nil {
+ t.Error(err)
+ }
+ v2Msg, err := pMsg.toProtobuf()
+ if err != nil {
+ t.Error(err)
+ }
+ if v2Msg.GetTopic().GetResourceNamespace() != namespace {
+ t.Error("namespace not equal")
+ }
+}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 8abfb41..f867696 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -75,10 +75,12 @@
ctx := sc.cli.Sign(context.Background())
request := &v2.ChangeInvisibleDurationRequest{
Topic: &v2.Resource{
- Name: messageView.GetTopic(),
+ Name: messageView.GetTopic(),
+ ResourceNamespace: sc.cli.config.NameSpace,
},
Group: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: sc.cli.config.NameSpace,
},
ReceiptHandle: messageView.GetReceiptHandle(),
InvisibleDuration: durationpb.New(invisibleDuration),
@@ -166,7 +168,8 @@
return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: sc.cli.config.NameSpace,
},
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
@@ -183,7 +186,8 @@
return &v2.AckMessageRequest{
Group: sc.scSettings.groupName,
Topic: &v2.Resource{
- Name: messageView.GetTopic(),
+ Name: messageView.GetTopic(),
+ ResourceNamespace: sc.cli.config.NameSpace,
},
Entries: []*v2.AckMessageEntry{
{
@@ -369,7 +373,8 @@
requestTimeout: sc.cli.opts.timeout,
groupName: &v2.Resource{
- Name: sc.groupName,
+ Name: sc.groupName,
+ ResourceNamespace: config.NameSpace,
},
longPollingTimeout: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
diff --git a/golang/simple_consumer_options.go b/golang/simple_consumer_options.go
index 857a419..253b3c3 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -156,7 +156,8 @@
subscriptions := make([]*v2.SubscriptionEntry, 0)
for k, v := range sc.subscriptionExpressions {
topic := &v2.Resource{
- Name: k,
+ Name: k,
+ ResourceNamespace: sc.groupName.GetResourceNamespace(),
}
filterExpression := &v2.FilterExpression{
Expression: v.expression,
diff --git a/golang/transaction.go b/golang/transaction.go
index f674110..8983961 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -88,7 +88,7 @@
return nil
}
-func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) {
+func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) {
t.messagesLock.RLock()
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
@@ -100,7 +100,7 @@
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
}
- pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true)
+ pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true)
if err != nil {
return nil, err
}