[Issue 177] add multiple hosts support (#484)
Fixes #177
Master Issue https://github.com/apache/pulsar/issues/3218
### Motivation
add multiple hosts support to go client
### Modifications
- add service uri & service name resolver
- add service name resolver to lookup service & rpc client
- add unit tests
- add integration tests
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
diff --git a/go.sum b/go.sum
index 7f61837..a14857b 100644
--- a/go.sum
+++ b/go.sum
@@ -27,6 +27,7 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5882dd4..ee8ff50 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -123,8 +123,10 @@
log: logger,
metrics: metrics,
}
- c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics)
- c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics)
+ serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
+
+ c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
+ c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics)
c.handlers = internal.NewClientHandlers()
return c, nil
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 661eac2..65732ea 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "context"
"crypto/tls"
"fmt"
"io/ioutil"
@@ -458,3 +459,58 @@
"replication_clusters": []string{"standalone"},
}
}
+
+func TestRetryWithMultipleHosts(t *testing.T) {
+ // Multi hosts included an unreached port and the actual port for verify retry logic
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6600,localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+ var msgIDs [][]byte
+
+ for i := 0; i < 10; i++ {
+ if msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ assert.Nil(t, err)
+ } else {
+ assert.NotNil(t, msgID)
+ msgIDs = append(msgIDs, msgID.Serialize())
+ }
+ }
+
+ assert.Equal(t, 10, len(msgIDs))
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "retry-multi-hosts-sub",
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ assert.Contains(t, msgIDs, msg.ID().Serialize())
+ consumer.Ack(msg)
+ }
+
+ err = consumer.Unsubscribe()
+ assert.Nil(t, err)
+
+}
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 54d30b3..29e1267 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -103,6 +103,10 @@
}
if err := cnx.waitUntilReady(); err != nil {
+ if !wasCached {
+ p.pool.Delete(key)
+ p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
+ }
return nil, err
}
return cnx, nil
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index 65fa383..5b4e2ac 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -45,22 +45,22 @@
}
type lookupService struct {
- rpcClient RPCClient
- serviceURL *url.URL
- tlsEnabled bool
- log log.Logger
- metrics *Metrics
+ rpcClient RPCClient
+ serviceNameResolver ServiceNameResolver
+ tlsEnabled bool
+ log log.Logger
+ metrics *Metrics
}
// NewLookupService init a lookup service struct and return an object of LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
+func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
- rpcClient: rpcClient,
- serviceURL: serviceURL,
- tlsEnabled: tlsEnabled,
- log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
- metrics: metrics,
+ rpcClient: rpcClient,
+ serviceNameResolver: serviceNameResolver,
+ tlsEnabled: tlsEnabled,
+ log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+ metrics: metrics,
}
}
@@ -78,7 +78,10 @@
var physicalAddr *url.URL
if lr.GetProxyThroughServiceUrl() {
- physicalAddr = ls.serviceURL
+ physicalAddr, err = ls.serviceNameResolver.ResolveHost()
+ if err != nil {
+ return nil, nil, err
+ }
} else {
physicalAddr = logicalAddress
}
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index ad0e1bd..a3f9bfe 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -112,6 +112,7 @@
func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
+ serviceNameResolver := NewPulsarServiceNameResolver(url)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
@@ -131,7 +132,7 @@
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -144,6 +145,7 @@
func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
+ serviceNameResolver := NewPulsarServiceNameResolver(url)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
@@ -163,7 +165,7 @@
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -176,6 +178,7 @@
func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
+ serviceNameResolver := NewPulsarServiceNameResolver(url)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
@@ -196,7 +199,7 @@
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -229,7 +232,7 @@
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -273,7 +276,7 @@
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -317,7 +320,7 @@
BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -350,7 +353,7 @@
ProxyThroughServiceUrl: proto.Bool(false),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -378,7 +381,7 @@
Authoritative: proto.Bool(true),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -447,6 +450,7 @@
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
+ serviceNameResolver := NewPulsarServiceNameResolver(url)
ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,
@@ -464,10 +468,43 @@
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
- }, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+ }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
assert.NoError(t, err)
assert.NotNil(t, metadata)
assert.Equal(t, metadata.GetPartitions(), uint32(1))
}
+
+func TestLookupSuccessWithMultipleHosts(t *testing.T) {
+ url, err := url.Parse("pulsar://host1,host2,host3:6650")
+ assert.NoError(t, err)
+ serviceNameResolver := NewPulsarServiceNameResolver(url)
+
+ ls := NewLookupService(&mockedLookupRPCClient{
+ t: t,
+
+ expectedRequests: []pb.CommandLookupTopic{
+ {
+ RequestId: proto.Uint64(1),
+ Topic: proto.String("my-topic"),
+ Authoritative: proto.Bool(false),
+ },
+ },
+ mockedResponses: []pb.CommandLookupTopicResponse{
+ {
+ RequestId: proto.Uint64(1),
+ Response: responseType(pb.CommandLookupTopicResponse_Connect),
+ Authoritative: proto.Bool(true),
+ BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
+ },
+ },
+ }, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+
+ lr, err := ls.Lookup("my-topic")
+ assert.NoError(t, err)
+ assert.NotNil(t, lr)
+
+ assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
+ assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
+}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0bb482c..b51e4e3 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -56,7 +56,7 @@
}
type rpcClient struct {
- serviceURL *url.URL
+ serviceNameResolver ServiceNameResolver
pool ConnectionPool
requestTimeout time.Duration
requestIDGenerator uint64
@@ -66,22 +66,26 @@
metrics *Metrics
}
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient {
return &rpcClient{
- serviceURL: serviceURL,
- pool: pool,
- requestTimeout: requestTimeout,
- log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
- metrics: metrics,
+ serviceNameResolver: serviceNameResolver,
+ pool: pool,
+ requestTimeout: requestTimeout,
+ log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+ metrics: metrics,
}
}
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
-
- rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
- if _, ok := err.(net.Error); ok {
+ host, err := c.serviceNameResolver.ResolveHost()
+ if err != nil {
+ c.log.Errorf("request host resolve failed with error: {%v}", err)
+ return nil, err
+ }
+ rpcResult, err := c.Request(host, host, requestID, cmdType, message)
+ if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
backoff := Backoff{100 * time.Millisecond}
@@ -92,9 +96,13 @@
retryTime = backoff.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
-
- rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
- if _, ok := err.(net.Error); ok {
+ host, err = c.serviceNameResolver.ResolveHost()
+ if err != nil {
+ c.log.Errorf("Retrying request host resolve failed with error: {%v}", err)
+ continue
+ }
+ rpcResult, err = c.Request(host, host, requestID, cmdType, message)
+ if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
continue
} else {
// We either succeeded or encountered a non connection error
@@ -102,7 +110,6 @@
}
}
}
-
return rpcResult, err
}
diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go
new file mode 100644
index 0000000..3b1209c
--- /dev/null
+++ b/pulsar/internal/service_name_resolver.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "net/url"
+ "sync/atomic"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+)
+
+type ServiceNameResolver interface {
+ ResolveHost() (*url.URL, error)
+ ResolveHostURI() (*PulsarServiceURI, error)
+ UpdateServiceURL(url *url.URL) error
+ GetServiceURI() *PulsarServiceURI
+ GetServiceURL() *url.URL
+ GetAddressList() []*url.URL
+}
+
+type pulsarServiceNameResolver struct {
+ ServiceURI *PulsarServiceURI
+ ServiceURL *url.URL
+ CurrentIndex int32
+ AddressList []*url.URL
+}
+
+func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
+ r := &pulsarServiceNameResolver{}
+ err := r.UpdateServiceURL(url)
+ if err != nil {
+ log.Errorf("create pulsar service name resolver failed : %v", err)
+ }
+ return r
+}
+
+func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
+ if r.AddressList == nil {
+ return nil, errors.New("no service url is provided yet")
+ }
+ if len(r.AddressList) == 0 {
+ return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL)
+ }
+ if len(r.AddressList) == 1 {
+ return r.AddressList[0], nil
+ }
+ idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
+ atomic.StoreInt32(&r.CurrentIndex, idx)
+ return r.AddressList[idx], nil
+}
+
+func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) {
+ host, err := r.ResolveHost()
+ if err != nil {
+ return nil, err
+ }
+ hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port()
+ return NewPulsarServiceURIFromURIString(hostURL)
+}
+
+func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error {
+ uri, err := NewPulsarServiceURIFromURL(u)
+ if err != nil {
+ log.Errorf("invalid service-url instance %s provided %v", u, err)
+ return err
+ }
+
+ hosts := uri.ServiceHosts
+ addresses := []*url.URL{}
+ for _, host := range hosts {
+ hostURL := uri.URL.Scheme + "://" + host
+ u, err := url.Parse(hostURL)
+ if err != nil {
+ log.Errorf("invalid host-url %s provided %v", hostURL, err)
+ return err
+ }
+ addresses = append(addresses, u)
+ }
+ r.AddressList = addresses
+ r.ServiceURL = u
+ r.ServiceURI = uri
+ rand.Seed(time.Now().Unix()) // initialize global pseudo random generator
+ atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses))))
+ return nil
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI {
+ return r.ServiceURI
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL {
+ return r.ServiceURL
+}
+
+func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL {
+ return r.AddressList
+}
diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go
new file mode 100644
index 0000000..e190633
--- /dev/null
+++ b/pulsar/internal/service_name_resolver_test.go
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "net/url"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ u, err := resolver.ResolveHost()
+ assert.Nil(t, u)
+ assert.NotNil(t, err)
+ assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ u, err := resolver.ResolveHostURI()
+ assert.Nil(t, u)
+ assert.NotNil(t, err)
+ assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestUpdateInvalidServiceUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ url, _ := url.Parse("pulsar:///")
+ err := resolver.UpdateServiceURL(url)
+ assert.NotNil(t, err)
+ assert.Empty(t, resolver.GetServiceURL())
+ assert.Nil(t, resolver.GetServiceURI())
+}
+
+func TestSimpleHostUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ serviceURL, _ := url.Parse("pulsar://host1:6650")
+ err := resolver.UpdateServiceURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, serviceURL, resolver.GetServiceURL())
+ expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedURI, resolver.GetServiceURI())
+ actualHost, err := resolver.ResolveHost()
+ assert.Nil(t, err)
+ assert.Equal(t, "host1", actualHost.Hostname())
+ assert.Equal(t, "6650", actualHost.Port())
+
+ newServiceURL, _ := url.Parse("pulsar://host2:6650")
+ err = resolver.UpdateServiceURL(newServiceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, newServiceURL, resolver.GetServiceURL())
+ expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedURI, resolver.GetServiceURI())
+ actualHost, err = resolver.ResolveHost()
+ assert.Nil(t, err)
+ assert.Equal(t, "host2", actualHost.Hostname())
+ assert.Equal(t, "6650", actualHost.Port())
+}
+
+func TestMultipleHostsUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650")
+ err := resolver.UpdateServiceURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, serviceURL, resolver.GetServiceURL())
+ expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedURI, resolver.GetServiceURI())
+ host1, _ := url.Parse("pulsar://host1:6650")
+ host2, _ := url.Parse("pulsar://host2:6650")
+ host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650")
+ host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650")
+ assert.Contains(t, resolver.GetAddressList(), host1)
+ assert.Contains(t, resolver.GetAddressList(), host2)
+ hosts := []*url.URL{host1, host2}
+ hosturis := []*PulsarServiceURI{host1uri, host2uri}
+ for i := 0; i < 10; i++ {
+ host, err := resolver.ResolveHost()
+ assert.Nil(t, err)
+ hosturi, err := resolver.ResolveHostURI()
+ assert.Nil(t, err)
+ assert.Contains(t, hosts, host)
+ assert.Contains(t, hosturis, hosturi)
+ }
+}
+
+func TestMultipleHostsTlsUrl(t *testing.T) {
+ resolver := NewPulsarServiceNameResolver(nil)
+ serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651")
+ err := resolver.UpdateServiceURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, serviceURL, resolver.GetServiceURL())
+ expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedURI, resolver.GetServiceURI())
+ host1, _ := url.Parse("pulsar+ssl://host1:6651")
+ host2, _ := url.Parse("pulsar+ssl://host2:6651")
+ host1uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651")
+ host2uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651")
+ assert.Contains(t, resolver.GetAddressList(), host1)
+ assert.Contains(t, resolver.GetAddressList(), host2)
+ hosts := []*url.URL{host1, host2}
+ hosturis := []*PulsarServiceURI{host1uri, host2uri}
+ for i := 0; i < 10; i++ {
+ host, err := resolver.ResolveHost()
+ assert.Nil(t, err)
+ hosturi, err := resolver.ResolveHostURI()
+ assert.Nil(t, err)
+ assert.Contains(t, hosts, host)
+ assert.Contains(t, hosturis, hosturi)
+ }
+}
diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go
new file mode 100644
index 0000000..7b75339
--- /dev/null
+++ b/pulsar/internal/service_uri.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+)
+
+const (
+ BinaryService = "pulsar"
+ HTTPService = "http"
+ HTTPSService = "https"
+ SSLService = "ssl"
+ BinaryPort = 6650
+ BinaryTLSPort = 6651
+ HTTPPort = 80
+ HTTPSPort = 443
+)
+
+type PulsarServiceURI struct {
+ ServiceName string
+ ServiceInfos []string
+ ServiceHosts []string
+ servicePath string
+ URL *url.URL
+}
+
+func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
+ u, err := fromString(uri)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ return u, nil
+}
+
+func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
+ u, err := fromURL(url)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ return u, nil
+}
+
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+ if uriStr == "" || len(uriStr) == 0 {
+ return nil, errors.New("service uriStr string is null")
+ }
+ if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
+ // deal with ipv6 address
+ hosts := strings.FieldsFunc(uriStr, splitURI)
+ if len(hosts) > 1 {
+ // deal with ipv6 address
+ firstHost := hosts[0]
+ lastHost := hosts[len(hosts)-1]
+ hasPath := strings.Contains(lastHost, "/")
+ path := ""
+ if hasPath {
+ idx := strings.Index(lastHost, "/")
+ path = lastHost[idx:]
+ }
+ firstHost += path
+ url, err := url.Parse(firstHost)
+ if err != nil {
+ return nil, err
+ }
+ serviceURI, err := fromURL(url)
+ if err != nil {
+ return nil, err
+ }
+ var mHosts []string
+ var multiHosts []string
+ mHosts = append(mHosts, serviceURI.ServiceHosts[0])
+ mHosts = append(mHosts, hosts[1:]...)
+
+ for _, v := range mHosts {
+ h, err := validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
+ if err == nil {
+ multiHosts = append(multiHosts, h)
+ } else {
+ return nil, err
+ }
+ }
+
+ return &PulsarServiceURI{
+ serviceURI.ServiceName,
+ serviceURI.ServiceInfos,
+ multiHosts,
+ serviceURI.servicePath,
+ serviceURI.URL,
+ }, nil
+ }
+ }
+
+ url, err := url.Parse(uriStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return fromURL(url)
+}
+
+func fromURL(url *url.URL) (*PulsarServiceURI, error) {
+ if url == nil {
+ return nil, errors.New("service url instance is null")
+ }
+
+ if url.Host == "" || len(url.Host) == 0 {
+ return nil, errors.New("service host is null")
+ }
+
+ var serviceName string
+ var serviceInfos []string
+ scheme := url.Scheme
+ if scheme != "" {
+ scheme = strings.ToLower(scheme)
+ schemeParts := strings.Split(scheme, "+")
+ serviceName = schemeParts[0]
+ serviceInfos = schemeParts[1:]
+ }
+
+ var serviceHosts []string
+ hosts := strings.FieldsFunc(url.Host, splitURI)
+ for _, v := range hosts {
+ h, err := validateHostName(serviceName, serviceInfos, v)
+ if err == nil {
+ serviceHosts = append(serviceHosts, h)
+ } else {
+ return nil, err
+ }
+ }
+
+ return &PulsarServiceURI{
+ serviceName,
+ serviceInfos,
+ serviceHosts,
+ url.Path,
+ url,
+ }, nil
+}
+
+func splitURI(r rune) bool {
+ return r == ',' || r == ';'
+}
+
+func validateHostName(serviceName string, serviceInfos []string, hostname string) (string, error) {
+ uri, err := url.Parse("dummyscheme://" + hostname)
+ if err != nil {
+ return "", err
+ }
+ host := uri.Hostname()
+ if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
+ host = fmt.Sprintf("[%s]", host)
+ }
+ if host == "" || uri.Scheme == "" {
+ return "", errors.New("Invalid hostname : " + hostname)
+ }
+
+ port := uri.Port()
+ if uri.Port() == "" {
+ p := getServicePort(serviceName, serviceInfos)
+ if p == -1 {
+ return "", fmt.Errorf("invalid port : %d", p)
+ }
+ port = fmt.Sprint(p)
+ }
+ result := host + ":" + port
+ _, _, err = net.SplitHostPort(result)
+ if err != nil {
+ return "", err
+ }
+ return result, nil
+}
+
+func getServicePort(serviceName string, serviceInfos []string) int {
+ switch strings.ToLower(serviceName) {
+ case BinaryService:
+ if len(serviceInfos) == 0 {
+ return BinaryPort
+ } else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService {
+ return BinaryTLSPort
+ }
+ case HTTPService:
+ return HTTPPort
+ case HTTPSService:
+ return HTTPSPort
+ }
+ return -1
+}
diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go
new file mode 100644
index 0000000..445b325
--- /dev/null
+++ b/pulsar/internal/service_uri_test.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestInvalidServiceUris(t *testing.T) {
+ uris := []string{
+ "://localhost:6650", // missing scheme
+ "pulsar:///", // missing authority
+ "pulsar://localhost:6650:6651/", // invalid hostname pair
+ "pulsar://localhost:xyz/", // invalid port
+ "pulsar://localhost:-6650/", // negative port
+ "pulsar://fec0:0:0:ffff::1:6650", // missing brackets
+ }
+
+ for _, uri := range uris {
+ testInvalidServiceURI(t, uri)
+ }
+}
+
+func TestEmptyServiceUriString(t *testing.T) {
+ u, err := NewPulsarServiceURIFromURIString("")
+ assert.Nil(t, u)
+ assert.NotNil(t, err)
+}
+
+func TestNullServiceUrlInstance(t *testing.T) {
+ u, err := NewPulsarServiceURIFromURL(nil)
+ assert.Nil(t, u)
+ assert.NotNil(t, err)
+}
+
+func TestMissingServiceName(t *testing.T) {
+ serviceURI := "//localhost:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "")
+}
+
+func TestEmptyPath(t *testing.T) {
+ serviceURI := "pulsar://localhost:6650"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "", "")
+}
+
+func TestRootPath(t *testing.T) {
+ serviceURI := "pulsar://localhost:6650/"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/", "")
+}
+
+func TestUserInfo(t *testing.T) {
+ serviceURI := "pulsar://pulsaruser@localhost:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser")
+}
+
+func TestIpv6Uri(t *testing.T) {
+ serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace",
+ "pulsaruser")
+}
+
+func TestIpv6UriWithoutPulsarPort(t *testing.T) {
+ serviceURI := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "")
+}
+
+func TestMultiIpv6Uri(t *testing.T) {
+ serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650" +
+ "/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil,
+ []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+ "pulsaruser")
+}
+
+func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) {
+ serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil,
+ []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+ "pulsaruser")
+}
+
+func TestMultipleHostsSemiColon(t *testing.T) {
+ serviceURI := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+ "/path/to/namespace", "")
+}
+
+func TestMultipleHostsComma(t *testing.T) {
+ serviceURI := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+ "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarPorts(t *testing.T) {
+ serviceURI := "pulsar://host1,host2,host3/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+ "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) {
+ serviceURI := "pulsar+ssl://host1,host2,host3/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"},
+ "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpPorts(t *testing.T) {
+ serviceURI := "http://host1,host2,host3/path/to/namespace"
+ assertServiceURI(t, serviceURI, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpsPorts(t *testing.T) {
+ serviceURI := "https://host1,host2,host3/path/to/namespace"
+ assertServiceURI(t, serviceURI, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace",
+ "")
+}
+
+func TestMultipleHostsMixedPorts(t *testing.T) {
+ serviceURI := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
+ "/path/to/namespace", "")
+}
+
+func TestMultipleHostsMixed(t *testing.T) {
+ serviceURI := "pulsar://host1:6640,host2,host3:6660/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
+ "/path/to/namespace", "")
+}
+
+func TestUserInfoWithMultipleHosts(t *testing.T) {
+ serviceURI := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
+ assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+ "/path/to/namespace", "pulsaruser")
+}
+
+func testInvalidServiceURI(t *testing.T, serviceURI string) {
+ u, err := NewPulsarServiceURIFromURIString(serviceURI)
+ t.Logf("testInvalidServiceURI %s", serviceURI)
+ assert.Nil(t, u)
+ assert.NotNil(t, err)
+}
+
+func assertServiceURI(t *testing.T, serviceURI, expectedServiceName string,
+ expectedServiceInfo, expectedServiceHosts []string, expectedServicePath, expectedServiceUser string) {
+ uri, err := NewPulsarServiceURIFromURIString(serviceURI)
+ assert.Nil(t, err)
+ assert.NotNil(t, serviceURI)
+ assert.Equal(t, expectedServiceName, uri.ServiceName)
+ assert.Equal(t, expectedServicePath, uri.servicePath)
+ if expectedServiceUser != "" {
+ assert.Equal(t, expectedServiceUser, uri.URL.User.Username())
+ }
+ assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos)
+ assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts)
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5708cad..4d62cac 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -621,7 +621,7 @@
}
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
- Name: "my-producer",
+ Name: "meta-data-producer",
Properties: props,
})
if err != nil {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index cd97964..f72ba1d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -586,3 +586,55 @@
assert.False(t, reader.HasNext())
}
+
+func TestReaderWithMultiHosts(t *testing.T) {
+ // Multi hosts included an unreached port and the actual port for verify retry logic
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6600,localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, msgID)
+ }
+
+ // create reader on 5th message (not included)
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ })
+
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ i := 0
+ for reader.HasNext() {
+ msg, err := reader.Next(context.Background())
+ assert.NoError(t, err)
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+ i++
+ }
+
+ assert.Equal(t, 10, i)
+}