Add metadata to produce and subscribe commands. (#117)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index bbd09aa..752b0f3 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -140,6 +140,7 @@
 	}
 
 	receiverQueueSize := options.ReceiverQueueSize
+	metadata := options.Properties
 	var wg sync.WaitGroup
 	ch := make(chan ConsumerError, numPartitions)
 	wg.Add(numPartitions)
@@ -162,6 +163,7 @@
 				partitionIdx:        idx,
 				receiverQueueSize:   receiverQueueSize,
 				nackRedeliveryDelay: nackRedeliveryDelay,
+				metadata:            metadata,
 			}
 			cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
 			ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bbf027..a217d55 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -58,6 +58,7 @@
 	partitionIdx        int
 	receiverQueueSize   int
 	nackRedeliveryDelay time.Duration
+	metadata            map[string]string
 }
 
 type partitionConsumer struct {
@@ -524,6 +525,9 @@
 		InitialPosition: initialPosition.Enum(),
 		Schema:          nil,
 	}
+	if len(pc.options.metadata) > 0 {
+		cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
+	}
 
 	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
 		pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f342db4..5ff734c 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -759,3 +759,38 @@
 		consumer.Ack(msg)
 	}
 }
+
+func TestConsumerMetadata(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	topic := newTopicName()
+	props := map[string]string{
+		"key1": "value1",
+	}
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		Properties:       props,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+	stats, err := topicStats(topic)
+	if err != nil {
+		t.Fatal(err)
+	}
+	subs := stats["subscriptions"].(map[string]interface{})
+	meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+	assert.Equal(t, len(props), len(meta))
+	for k, v := range props {
+		mv := meta[k].(string)
+		assert.Equal(t, v, mv)
+	}
+}
diff --git a/pulsar/helper.go b/pulsar/helper.go
index 2517585..2591f52 100644
--- a/pulsar/helper.go
+++ b/pulsar/helper.go
@@ -22,6 +22,8 @@
 
 	pkgerrors "github.com/pkg/errors"
 
+	"github.com/golang/protobuf/proto"
+
 	"github.com/apache/pulsar-client-go/pkg/pb"
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
@@ -61,3 +63,16 @@
 
 	return errs
 }
+
+func toKeyValues(metadata map[string]string) []*pb.KeyValue {
+	kvs := make([]*pb.KeyValue, 0, len(metadata))
+	for k, v := range metadata {
+		kv := &pb.KeyValue{
+			Key:   proto.String(k),
+			Value: proto.String(v),
+		}
+		kvs = append(kvs, kv)
+	}
+
+	return kvs
+}
diff --git a/pulsar/helper_test.go b/pulsar/helper_test.go
new file mode 100644
index 0000000..3240fdf
--- /dev/null
+++ b/pulsar/helper_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestToKeyValues(t *testing.T) {
+	meta := map[string]string{
+		"key1": "value1",
+	}
+
+	kvs := toKeyValues(meta)
+	assert.Equal(t, 1, len(kvs))
+	assert.Equal(t, "key1", *kvs[0].Key)
+	assert.Equal(t, "value1", *kvs[0].Value)
+
+	meta = map[string]string{
+		"key1": "value1",
+		"key2": "value2",
+	}
+	kvs = toKeyValues(meta)
+	assert.Equal(t, 2, len(kvs))
+	for _, kv := range kvs {
+		v := meta[*kv.Key]
+		assert.Equal(t, v, *kv.Value)
+	}
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 474401a..c5345cb 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -125,16 +125,19 @@
 
 	p.log.Debug("Lookup result: ", lr)
 	id := p.client.rpcClient.NewRequestID()
-	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, &pb.CommandProducer{
-		RequestId:    &id,
-		Topic:        &p.topic,
+	cmdProducer := &pb.CommandProducer{
+		RequestId:    proto.Uint64(id),
+		Topic:        proto.String(p.topic),
 		Encrypted:    nil,
-		Metadata:     nil,
-		ProducerId:   &p.producerID,
+		ProducerId:   proto.Uint64(p.producerID),
 		ProducerName: p.producerName,
 		Schema:       nil,
-	})
+	}
+	if len(p.options.Properties) > 0 {
+		cmdProducer.Metadata = toKeyValues(p.options.Properties)
+	}
 
+	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
 	if err != nil {
 		p.log.WithError(err).Error("Failed to create producer")
 		return err
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 196c7f3..bc3f8ea 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -523,3 +523,38 @@
 	})
 	assert.NotNil(t, err, "expected error when creating producer with same name")
 }
+
+func TestProducerMetadata(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	topic := newTopicName()
+	props := map[string]string{
+		"key1": "value1",
+	}
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:      topic,
+		Name:       "my-producer",
+		Properties: props,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+	stats, err := topicStats(topic)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	meta := stats["publishers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+	assert.Equal(t, len(props), len(meta))
+	for k, v := range props {
+		mv := meta[k].(string)
+		assert.Equal(t, v, mv)
+	}
+}
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 330845d..d8cdc6d 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,6 +21,9 @@
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/apache/pulsar-client-go/pulsar/internal"
+	"io"
+	"io/ioutil"
 	"net/http"
 	"path"
 	"strings"
@@ -54,61 +57,63 @@
 	return webServiceURL + "/" + path.Join(parts...)
 }
 
+func jsonHeaders() http.Header {
+	headers := http.Header{}
+	headers.Add("Content-Type", "application/json")
+	headers.Add("Accept", "application/json")
+	return headers
+}
+
 func httpDelete(requestPaths ...string) error {
-	client := http.DefaultClient
 	var errs error
-	doFn := func(requestPath string) error {
-		endpoint := testEndpoint(requestPath)
-		req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
-		if err != nil {
-			return err
-		}
-
-		req.Header = map[string][]string{
-			"Content-Type": {"application/json"},
-		}
-
-		resp, err := client.Do(req)
-		if err != nil {
-			return err
-		}
-		if resp.StatusCode > 299 {
-			return fmt.Errorf("failed to delete topic status code: %d", resp.StatusCode)
-		}
-		if resp.Body != nil {
-			_ = resp.Body.Close()
-		}
-		return nil
-	}
 	for _, requestPath := range requestPaths {
-		if err := doFn(requestPath); err != nil {
-			err = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
+		if err := httpDo(http.MethodDelete, requestPath, nil, nil); err != nil {
+			errs = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
 		}
 	}
 	return errs
 }
 
 func httpPut(requestPath string, body interface{}) error {
-	client := http.DefaultClient
+	return httpDo(http.MethodPut, requestPath, body, nil)
+}
 
-	data, _ := json.Marshal(body)
+func httpGet(requestPath string, out interface{}) error {
+	return httpDo(http.MethodGet, requestPath, nil, out)
+}
+
+func httpDo(method string, requestPath string, in interface{}, out interface{}) error {
+	client := http.DefaultClient
 	endpoint := testEndpoint(requestPath)
-	req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(data))
+	var body io.Reader
+	inBytes, err := json.Marshal(in)
+	if err != nil {
+		return err
+	}
+	body = bytes.NewReader(inBytes)
+	req, err := http.NewRequest(method, endpoint, body)
 	if err != nil {
 		return err
 	}
 
-	req.Header = map[string][]string{
-		"Content-Type": {"application/json"},
-	}
-
+	req.Header = jsonHeaders()
 	resp, err := client.Do(req)
 	if err != nil {
 		return err
 	}
-	if resp.Body != nil {
-		_ = resp.Body.Close()
+	defer resp.Body.Close()
+	if resp.StatusCode > 299 || resp.StatusCode < 200 {
+		return fmt.Errorf("http error status code: %d", resp.StatusCode)
 	}
+
+	if out != nil {
+		outBytes, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return json.Unmarshal(outBytes, out)
+	}
+
 	return nil
 }
 
@@ -147,3 +152,18 @@
 func deleteTopic(topic string) error {
 	return httpDelete("admin/v2/persistent/" + fmt.Sprintf("%s?force=true", topic))
 }
+
+func topicStats(topic string) (map[string]interface{}, error) {
+	var metadata map[string]interface{}
+	err := httpGet("admin/v2/persistent/"+topicPath(topic)+"/stats", &metadata)
+	return metadata, err
+}
+
+func topicPath(topic string) string {
+	tn, _ := internal.ParseTopicName(topic)
+	idx := strings.LastIndex(tn.Name, "/")
+	if idx > 0 {
+		return tn.Namespace + "/" + tn.Name[idx:]
+	}
+	return tn.Name
+}