Add client function to list topics in a namespace. (#101)

* Add client function to list topics in a namespace.

* Fix import ordering.

* Pass pointer to struct instead of interface.

* Ensure producer is removed from connection listener on close.

* Remove dead code.
diff --git a/pulsar/impl_client.go b/pulsar/client_impl.go
similarity index 73%
rename from pulsar/impl_client.go
rename to pulsar/client_impl.go
index b9869a5..9cb3442 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/client_impl.go
@@ -18,10 +18,11 @@
 package pulsar
 
 import (
+	"errors"
 	"fmt"
 	"net/url"
 
-	"github.com/pkg/errors"
+	"github.com/golang/protobuf/proto"
 
 	log "github.com/sirupsen/logrus"
 
@@ -89,36 +90,36 @@
 	return c, nil
 }
 
-func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
-	producer, err := newProducer(client, &options)
+func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
+	producer, err := newProducer(c, &options)
 	if err == nil {
-		client.handlers[producer] = true
+		c.handlers[producer] = true
 	}
 	return producer, err
 }
 
-func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
-	consumer, err := newConsumer(client, options)
+func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
+	consumer, err := newConsumer(c, options)
 	if err != nil {
 		return nil, err
 	}
-	client.handlers[consumer] = true
+	c.handlers[consumer] = true
 	return consumer, nil
 }
 
-func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
+func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
 	// TODO: Implement reader
 	return nil, nil
 }
 
-func (client *client) TopicPartitions(topic string) ([]string, error) {
+func (c *client) TopicPartitions(topic string) ([]string, error) {
 	topicName, err := internal.ParseTopicName(topic)
 	if err != nil {
 		return nil, err
 	}
 
-	id := client.rpcClient.NewRequestID()
-	res, err := client.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
+	id := c.rpcClient.NewRequestID()
+	res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
 		&pb.CommandPartitionedTopicMetadata{
 			RequestId: &id,
 			Topic:     &topicName.Name,
@@ -143,8 +144,26 @@
 	return []string{topicName.Name}, nil
 }
 
-func (client *client) Close() {
-	for handler := range client.handlers {
+func (c *client) Close() {
+	for handler := range c.handlers {
 		handler.Close()
 	}
 }
+
+func (c *client) namespaceTopics(namespace string) ([]string, error) {
+	id := c.rpcClient.NewRequestID()
+	req := &pb.CommandGetTopicsOfNamespace{
+		RequestId: proto.Uint64(id),
+		Namespace: proto.String(namespace),
+		Mode:      pb.CommandGetTopicsOfNamespace_PERSISTENT.Enum(),
+	}
+	res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_TOPICS_OF_NAMESPACE, req)
+	if err != nil {
+		return nil, err
+	}
+	if res.Response.Error != nil {
+		return []string{}, newError(ResultLookupError, res.Response.GetError().String())
+	}
+
+	return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
+}
diff --git a/pulsar/impl_client_test.go b/pulsar/client_impl_test.go
similarity index 69%
rename from pulsar/impl_client_test.go
rename to pulsar/client_impl_test.go
index 7c28da5..d9849bd 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/client_impl_test.go
@@ -19,10 +19,9 @@
 
 import (
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"io/ioutil"
 	"testing"
-
-	"github.com/stretchr/testify/assert"
 )
 
 func TestClient(t *testing.T) {
@@ -201,7 +200,7 @@
 	defer client.Close()
 
 	// Create topic with 5 partitions
-	httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
+	httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
 		5)
 
 	partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
@@ -222,3 +221,94 @@
 	assert.Equal(t, len(partitions), 1)
 	assert.Equal(t, partitions[0], topic)
 }
+
+func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {
+	c, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	if err != nil {
+		t.Errorf("failed to create client error: %+v", err)
+		return
+	}
+	defer c.Close()
+	ci := c.(*client)
+
+	// fetch from namespace that does not exist
+	name := generateRandomName()
+	topics, err := ci.namespaceTopics(fmt.Sprintf("%s/%s", name, name))
+	assert.Equal(t, 0, len(topics))
+}
+
+func TestNamespaceTopics(t *testing.T) {
+	name := generateRandomName()
+	namespace := fmt.Sprintf("public/%s", name)
+	namespaceUrl := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
+	err := httpPut(namespaceUrl, anonymousNamespacePolicy())
+	if err != nil {
+		t.Fatal()
+	}
+	defer func() {
+		_ = httpDelete(fmt.Sprintf("admin/v2/namespaces/%s", namespace))
+	}()
+
+	// create topics
+	topic1 := fmt.Sprintf("%s/topic-1", namespace)
+	if err := httpPut("admin/v2/persistent/"+topic1, nil); err != nil {
+		t.Fatal(err)
+	}
+	topic2 := fmt.Sprintf("%s/topic-2", namespace)
+	if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = httpDelete("admin/v2/persistent/"+topic1, "admin/v2/persistent/"+topic2)
+	}()
+
+	c, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	if err != nil {
+		t.Errorf("failed to create client error: %+v", err)
+		return
+	}
+	defer c.Close()
+	ci := c.(*client)
+
+	topics, err := ci.namespaceTopics(namespace)
+	if err != nil {
+		t.Fatal(err)
+	}
+	assert.Equal(t, 2, len(topics))
+
+	// add a non-persistent topic
+	topicName := fmt.Sprintf("non-persistent://%s/testNonPersistentTopic", namespace)
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	topics, err = ci.namespaceTopics(namespace)
+	if err != nil {
+		t.Fatal(err)
+	}
+	assert.Equal(t, 2, len(topics))
+}
+
+func anonymousNamespacePolicy() map[string]interface{} {
+	return map[string]interface{}{
+		"auth_policies": map[string]interface{}{
+			"namespace_auth": map[string]interface{}{
+				"anonymous": []string{"produce", "consume"},
+			},
+		},
+		"replication_clusters": []string{"standalone"},
+	}
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 5d09242..4e2ffd9 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -399,8 +399,8 @@
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {
+	defer req.waitGroup.Done()
 	if p.state != producerReady {
-		req.waitGroup.Done()
 		return
 	}
 
@@ -417,12 +417,11 @@
 		p.log.WithError(err).Warn("Failed to close producer")
 	} else {
 		p.log.Info("Closed producer")
-		p.state = producerClosed
-		p.cnx.UnregisterListener(p.producerID)
-		p.batchFlushTicker.Stop()
 	}
 
-	req.waitGroup.Done()
+	p.state = producerClosed
+	p.cnx.UnregisterListener(p.producerID)
+	p.batchFlushTicker.Stop()
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index c046f43..2880170 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -182,6 +182,8 @@
 		cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
 	case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
 		cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
+	case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE:
+		cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace)
 	default:
 		log.Panic("Missing command type: ", cmdType)
 	}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7ee42e4..d6c8599 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -426,7 +426,10 @@
 
 func TestMessageRouter(t *testing.T) {
 	// Create topic with 5 partitions
-	httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+	err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+	if err != nil {
+		t.Fatal(err)
+	}
 	client, err := NewClient(ClientOptions{
 		URL: serviceURL,
 	})
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3e1f2ea..5b2063c 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,17 +21,21 @@
 	"bytes"
 	"encoding/json"
 	"fmt"
-	"log"
 	"net/http"
+	"path"
 	"strings"
 	"testing"
 	"time"
+
+	pkgerrors "github.com/pkg/errors"
 )
 
 const (
 	serviceURL    = "pulsar://localhost:6650"
 	serviceURLTLS = "pulsar+ssl://localhost:6651"
 
+	webServiceURL = "http://localhost:8080"
+
 	caCertsPath       = "../integration-tests/certs/cacert.pem"
 	tlsClientCertPath = "../integration-tests/certs/client-cert.pem"
 	tlsClientKeyPath  = "../integration-tests/certs/client-key.pem"
@@ -46,13 +50,52 @@
 	return fmt.Sprintf("private/auth/my-topic-%v", time.Now().Nanosecond())
 }
 
-func httpPut(url string, body interface{}) {
+func testEndpoint(parts ...string) string {
+	return webServiceURL + "/" + path.Join(parts...)
+}
+
+func httpDelete(requestPaths ...string) error {
+	client := http.DefaultClient
+	var errs error
+	doFn := func(requestPath string) error {
+		endpoint := testEndpoint(requestPath)
+		req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
+		if err != nil {
+			return err
+		}
+
+		req.Header = map[string][]string{
+			"Content-Type": {"application/json"},
+		}
+
+		resp, err := client.Do(req)
+		if err != nil {
+			return err
+		}
+		if resp.StatusCode > 299 {
+			return fmt.Errorf("failed to delete topic status code: %d", resp.StatusCode)
+		}
+		if resp.Body != nil {
+			_ = resp.Body.Close()
+		}
+		return nil
+	}
+	for _, requestPath := range requestPaths {
+		if err := doFn(requestPath); err != nil {
+			err = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
+		}
+	}
+	return errs
+}
+
+func httpPut(requestPath string, body interface{}) error {
 	client := http.DefaultClient
 
 	data, _ := json.Marshal(body)
-	req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(data))
+	endpoint := testEndpoint(requestPath)
+	req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(data))
 	if err != nil {
-		log.Fatal(err)
+		return err
 	}
 
 	req.Header = map[string][]string{
@@ -61,17 +104,18 @@
 
 	resp, err := client.Do(req)
 	if err != nil {
-		log.Fatal(err)
+		return err
 	}
 	if resp.Body != nil {
 		_ = resp.Body.Close()
 	}
+	return nil
 }
 
-func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+func makeHTTPCall(t *testing.T, method string, url string, body string) {
 	client := http.Client{}
 
-	req, err := http.NewRequest(method, urls, strings.NewReader(body))
+	req, err := http.NewRequest(method, url, strings.NewReader(body))
 	if err != nil {
 		t.Fatal(err)
 	}