Add metadata to produce and subscribe commands. (#117)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index bbd09aa..752b0f3 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -140,6 +140,7 @@
}
receiverQueueSize := options.ReceiverQueueSize
+ metadata := options.Properties
var wg sync.WaitGroup
ch := make(chan ConsumerError, numPartitions)
wg.Add(numPartitions)
@@ -162,6 +163,7 @@
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
+ metadata: metadata,
}
cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bbf027..a217d55 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -58,6 +58,7 @@
partitionIdx int
receiverQueueSize int
nackRedeliveryDelay time.Duration
+ metadata map[string]string
}
type partitionConsumer struct {
@@ -524,6 +525,9 @@
InitialPosition: initialPosition.Enum(),
Schema: nil,
}
+ if len(pc.options.metadata) > 0 {
+ cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
+ }
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f342db4..5ff734c 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -759,3 +759,38 @@
consumer.Ack(msg)
}
}
+
+func TestConsumerMetadata(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ topic := newTopicName()
+ props := map[string]string{
+ "key1": "value1",
+ }
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Properties: props,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer consumer.Close()
+ stats, err := topicStats(topic)
+ if err != nil {
+ t.Fatal(err)
+ }
+ subs := stats["subscriptions"].(map[string]interface{})
+ meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+ assert.Equal(t, len(props), len(meta))
+ for k, v := range props {
+ mv := meta[k].(string)
+ assert.Equal(t, v, mv)
+ }
+}
diff --git a/pulsar/helper.go b/pulsar/helper.go
index 2517585..2591f52 100644
--- a/pulsar/helper.go
+++ b/pulsar/helper.go
@@ -22,6 +22,8 @@
pkgerrors "github.com/pkg/errors"
+ "github.com/golang/protobuf/proto"
+
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
@@ -61,3 +63,16 @@
return errs
}
+
+func toKeyValues(metadata map[string]string) []*pb.KeyValue {
+ kvs := make([]*pb.KeyValue, 0, len(metadata))
+ for k, v := range metadata {
+ kv := &pb.KeyValue{
+ Key: proto.String(k),
+ Value: proto.String(v),
+ }
+ kvs = append(kvs, kv)
+ }
+
+ return kvs
+}
diff --git a/pulsar/helper_test.go b/pulsar/helper_test.go
new file mode 100644
index 0000000..3240fdf
--- /dev/null
+++ b/pulsar/helper_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestToKeyValues(t *testing.T) {
+ meta := map[string]string{
+ "key1": "value1",
+ }
+
+ kvs := toKeyValues(meta)
+ assert.Equal(t, 1, len(kvs))
+ assert.Equal(t, "key1", *kvs[0].Key)
+ assert.Equal(t, "value1", *kvs[0].Value)
+
+ meta = map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ }
+ kvs = toKeyValues(meta)
+ assert.Equal(t, 2, len(kvs))
+ for _, kv := range kvs {
+ v := meta[*kv.Key]
+ assert.Equal(t, v, *kv.Value)
+ }
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 474401a..c5345cb 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -125,16 +125,19 @@
p.log.Debug("Lookup result: ", lr)
id := p.client.rpcClient.NewRequestID()
- res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, &pb.CommandProducer{
- RequestId: &id,
- Topic: &p.topic,
+ cmdProducer := &pb.CommandProducer{
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(p.topic),
Encrypted: nil,
- Metadata: nil,
- ProducerId: &p.producerID,
+ ProducerId: proto.Uint64(p.producerID),
ProducerName: p.producerName,
Schema: nil,
- })
+ }
+ if len(p.options.Properties) > 0 {
+ cmdProducer.Metadata = toKeyValues(p.options.Properties)
+ }
+ res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer")
return err
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 196c7f3..bc3f8ea 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -523,3 +523,38 @@
})
assert.NotNil(t, err, "expected error when creating producer with same name")
}
+
+func TestProducerMetadata(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ topic := newTopicName()
+ props := map[string]string{
+ "key1": "value1",
+ }
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Name: "my-producer",
+ Properties: props,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer producer.Close()
+ stats, err := topicStats(topic)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ meta := stats["publishers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+ assert.Equal(t, len(props), len(meta))
+ for k, v := range props {
+ mv := meta[k].(string)
+ assert.Equal(t, v, mv)
+ }
+}
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 330845d..d8cdc6d 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,6 +21,9 @@
"bytes"
"encoding/json"
"fmt"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+ "io"
+ "io/ioutil"
"net/http"
"path"
"strings"
@@ -54,61 +57,63 @@
return webServiceURL + "/" + path.Join(parts...)
}
+func jsonHeaders() http.Header {
+ headers := http.Header{}
+ headers.Add("Content-Type", "application/json")
+ headers.Add("Accept", "application/json")
+ return headers
+}
+
func httpDelete(requestPaths ...string) error {
- client := http.DefaultClient
var errs error
- doFn := func(requestPath string) error {
- endpoint := testEndpoint(requestPath)
- req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
- if err != nil {
- return err
- }
-
- req.Header = map[string][]string{
- "Content-Type": {"application/json"},
- }
-
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- if resp.StatusCode > 299 {
- return fmt.Errorf("failed to delete topic status code: %d", resp.StatusCode)
- }
- if resp.Body != nil {
- _ = resp.Body.Close()
- }
- return nil
- }
for _, requestPath := range requestPaths {
- if err := doFn(requestPath); err != nil {
- err = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
+ if err := httpDo(http.MethodDelete, requestPath, nil, nil); err != nil {
+ errs = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
}
}
return errs
}
func httpPut(requestPath string, body interface{}) error {
- client := http.DefaultClient
+ return httpDo(http.MethodPut, requestPath, body, nil)
+}
- data, _ := json.Marshal(body)
+func httpGet(requestPath string, out interface{}) error {
+ return httpDo(http.MethodGet, requestPath, nil, out)
+}
+
+func httpDo(method string, requestPath string, in interface{}, out interface{}) error {
+ client := http.DefaultClient
endpoint := testEndpoint(requestPath)
- req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(data))
+ var body io.Reader
+ inBytes, err := json.Marshal(in)
+ if err != nil {
+ return err
+ }
+ body = bytes.NewReader(inBytes)
+ req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return err
}
- req.Header = map[string][]string{
- "Content-Type": {"application/json"},
- }
-
+ req.Header = jsonHeaders()
resp, err := client.Do(req)
if err != nil {
return err
}
- if resp.Body != nil {
- _ = resp.Body.Close()
+ defer resp.Body.Close()
+ if resp.StatusCode > 299 || resp.StatusCode < 200 {
+ return fmt.Errorf("http error status code: %d", resp.StatusCode)
}
+
+ if out != nil {
+ outBytes, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ return json.Unmarshal(outBytes, out)
+ }
+
return nil
}
@@ -147,3 +152,18 @@
func deleteTopic(topic string) error {
return httpDelete("admin/v2/persistent/" + fmt.Sprintf("%s?force=true", topic))
}
+
+func topicStats(topic string) (map[string]interface{}, error) {
+ var metadata map[string]interface{}
+ err := httpGet("admin/v2/persistent/"+topicPath(topic)+"/stats", &metadata)
+ return metadata, err
+}
+
+func topicPath(topic string) string {
+ tn, _ := internal.ParseTopicName(topic)
+ idx := strings.LastIndex(tn.Name, "/")
+ if idx > 0 {
+ return tn.Namespace + "/" + tn.Name[idx:]
+ }
+ return tn.Name
+}