Add client function to list topics in a namespace. (#101)
* Add client function to list topics in a namespace.
* Fix import ordering.
* Pass pointer to struct instead of interface.
* Ensure producer is removed from connection listener on close.
* Remove dead code.
diff --git a/pulsar/impl_client.go b/pulsar/client_impl.go
similarity index 73%
rename from pulsar/impl_client.go
rename to pulsar/client_impl.go
index b9869a5..9cb3442 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/client_impl.go
@@ -18,10 +18,11 @@
package pulsar
import (
+ "errors"
"fmt"
"net/url"
- "github.com/pkg/errors"
+ "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
@@ -89,36 +90,36 @@
return c, nil
}
-func (client *client) CreateProducer(options ProducerOptions) (Producer, error) {
- producer, err := newProducer(client, &options)
+func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
+ producer, err := newProducer(c, &options)
if err == nil {
- client.handlers[producer] = true
+ c.handlers[producer] = true
}
return producer, err
}
-func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
- consumer, err := newConsumer(client, options)
+func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
+ consumer, err := newConsumer(c, options)
if err != nil {
return nil, err
}
- client.handlers[consumer] = true
+ c.handlers[consumer] = true
return consumer, nil
}
-func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
+func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
// TODO: Implement reader
return nil, nil
}
-func (client *client) TopicPartitions(topic string) ([]string, error) {
+func (c *client) TopicPartitions(topic string) ([]string, error) {
topicName, err := internal.ParseTopicName(topic)
if err != nil {
return nil, err
}
- id := client.rpcClient.NewRequestID()
- res, err := client.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
+ id := c.rpcClient.NewRequestID()
+ res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_PARTITIONED_METADATA,
&pb.CommandPartitionedTopicMetadata{
RequestId: &id,
Topic: &topicName.Name,
@@ -143,8 +144,26 @@
return []string{topicName.Name}, nil
}
-func (client *client) Close() {
- for handler := range client.handlers {
+func (c *client) Close() {
+ for handler := range c.handlers {
handler.Close()
}
}
+
+func (c *client) namespaceTopics(namespace string) ([]string, error) {
+ id := c.rpcClient.NewRequestID()
+ req := &pb.CommandGetTopicsOfNamespace{
+ RequestId: proto.Uint64(id),
+ Namespace: proto.String(namespace),
+ Mode: pb.CommandGetTopicsOfNamespace_PERSISTENT.Enum(),
+ }
+ res, err := c.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_GET_TOPICS_OF_NAMESPACE, req)
+ if err != nil {
+ return nil, err
+ }
+ if res.Response.Error != nil {
+ return []string{}, newError(ResultLookupError, res.Response.GetError().String())
+ }
+
+ return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
+}
diff --git a/pulsar/impl_client_test.go b/pulsar/client_impl_test.go
similarity index 69%
rename from pulsar/impl_client_test.go
rename to pulsar/client_impl_test.go
index 7c28da5..d9849bd 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/client_impl_test.go
@@ -19,10 +19,9 @@
import (
"fmt"
+ "github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
-
- "github.com/stretchr/testify/assert"
)
func TestClient(t *testing.T) {
@@ -201,7 +200,7 @@
defer client.Close()
// Create topic with 5 partitions
- httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
+ httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
5)
partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
@@ -222,3 +221,94 @@
assert.Equal(t, len(partitions), 1)
assert.Equal(t, partitions[0], topic)
}
+
+func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {
+ c, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ if err != nil {
+ t.Errorf("failed to create client error: %+v", err)
+ return
+ }
+ defer c.Close()
+ ci := c.(*client)
+
+ // fetch from namespace that does not exist
+ name := generateRandomName()
+ topics, err := ci.namespaceTopics(fmt.Sprintf("%s/%s", name, name))
+ assert.Equal(t, 0, len(topics))
+}
+
+func TestNamespaceTopics(t *testing.T) {
+ name := generateRandomName()
+ namespace := fmt.Sprintf("public/%s", name)
+ namespaceUrl := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
+ err := httpPut(namespaceUrl, anonymousNamespacePolicy())
+ if err != nil {
+ t.Fatal()
+ }
+ defer func() {
+ _ = httpDelete(fmt.Sprintf("admin/v2/namespaces/%s", namespace))
+ }()
+
+ // create topics
+ topic1 := fmt.Sprintf("%s/topic-1", namespace)
+ if err := httpPut("admin/v2/persistent/"+topic1, nil); err != nil {
+ t.Fatal(err)
+ }
+ topic2 := fmt.Sprintf("%s/topic-2", namespace)
+ if err := httpPut("admin/v2/persistent/"+topic2, namespace); err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ _ = httpDelete("admin/v2/persistent/"+topic1, "admin/v2/persistent/"+topic2)
+ }()
+
+ c, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ if err != nil {
+ t.Errorf("failed to create client error: %+v", err)
+ return
+ }
+ defer c.Close()
+ ci := c.(*client)
+
+ topics, err := ci.namespaceTopics(namespace)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 2, len(topics))
+
+ // add a non-persistent topic
+ topicName := fmt.Sprintf("non-persistent://%s/testNonPersistentTopic", namespace)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ topics, err = ci.namespaceTopics(namespace)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, 2, len(topics))
+}
+
+func anonymousNamespacePolicy() map[string]interface{} {
+ return map[string]interface{}{
+ "auth_policies": map[string]interface{}{
+ "namespace_auth": map[string]interface{}{
+ "anonymous": []string{"produce", "consume"},
+ },
+ },
+ "replication_clusters": []string{"standalone"},
+ }
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 5d09242..4e2ffd9 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -399,8 +399,8 @@
}
func (p *partitionProducer) internalClose(req *closeProducer) {
+ defer req.waitGroup.Done()
if p.state != producerReady {
- req.waitGroup.Done()
return
}
@@ -417,12 +417,11 @@
p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
- p.state = producerClosed
- p.cnx.UnregisterListener(p.producerID)
- p.batchFlushTicker.Stop()
}
- req.waitGroup.Done()
+ p.state = producerClosed
+ p.cnx.UnregisterListener(p.producerID)
+ p.batchFlushTicker.Stop()
}
func (p *partitionProducer) LastSequenceID() int64 {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index c046f43..2880170 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -182,6 +182,8 @@
cmd.Unsubscribe = msg.(*pb.CommandUnsubscribe)
case pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES:
cmd.RedeliverUnacknowledgedMessages = msg.(*pb.CommandRedeliverUnacknowledgedMessages)
+ case pb.BaseCommand_GET_TOPICS_OF_NAMESPACE:
+ cmd.GetTopicsOfNamespace = msg.(*pb.CommandGetTopicsOfNamespace)
default:
log.Panic("Missing command type: ", cmdType)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7ee42e4..d6c8599 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -426,7 +426,10 @@
func TestMessageRouter(t *testing.T) {
// Create topic with 5 partitions
- httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+ err := httpPut("admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5)
+ if err != nil {
+ t.Fatal(err)
+ }
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3e1f2ea..5b2063c 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,17 +21,21 @@
"bytes"
"encoding/json"
"fmt"
- "log"
"net/http"
+ "path"
"strings"
"testing"
"time"
+
+ pkgerrors "github.com/pkg/errors"
)
const (
serviceURL = "pulsar://localhost:6650"
serviceURLTLS = "pulsar+ssl://localhost:6651"
+ webServiceURL = "http://localhost:8080"
+
caCertsPath = "../integration-tests/certs/cacert.pem"
tlsClientCertPath = "../integration-tests/certs/client-cert.pem"
tlsClientKeyPath = "../integration-tests/certs/client-key.pem"
@@ -46,13 +50,52 @@
return fmt.Sprintf("private/auth/my-topic-%v", time.Now().Nanosecond())
}
-func httpPut(url string, body interface{}) {
+func testEndpoint(parts ...string) string {
+ return webServiceURL + "/" + path.Join(parts...)
+}
+
+func httpDelete(requestPaths ...string) error {
+ client := http.DefaultClient
+ var errs error
+ doFn := func(requestPath string) error {
+ endpoint := testEndpoint(requestPath)
+ req, err := http.NewRequest(http.MethodDelete, endpoint, nil)
+ if err != nil {
+ return err
+ }
+
+ req.Header = map[string][]string{
+ "Content-Type": {"application/json"},
+ }
+
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode > 299 {
+ return fmt.Errorf("failed to delete topic status code: %d", resp.StatusCode)
+ }
+ if resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ return nil
+ }
+ for _, requestPath := range requestPaths {
+ if err := doFn(requestPath); err != nil {
+ err = pkgerrors.Wrapf(err, "unable to delete url: %s"+requestPath)
+ }
+ }
+ return errs
+}
+
+func httpPut(requestPath string, body interface{}) error {
client := http.DefaultClient
data, _ := json.Marshal(body)
- req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(data))
+ endpoint := testEndpoint(requestPath)
+ req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(data))
if err != nil {
- log.Fatal(err)
+ return err
}
req.Header = map[string][]string{
@@ -61,17 +104,18 @@
resp, err := client.Do(req)
if err != nil {
- log.Fatal(err)
+ return err
}
if resp.Body != nil {
_ = resp.Body.Close()
}
+ return nil
}
-func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+func makeHTTPCall(t *testing.T, method string, url string, body string) {
client := http.Client{}
- req, err := http.NewRequest(method, urls, strings.NewReader(body))
+ req, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
t.Fatal(err)
}