[Issue 177] add multiple hosts support (#484)

Fixes #177 

Master Issue https://github.com/apache/pulsar/issues/3218

### Motivation

add multiple hosts support to go client

### Modifications

- add service uri & service name resolver
- add service name resolver to lookup service & rpc client
- add unit tests
- add integration tests

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
diff --git a/go.sum b/go.sum
index 7f61837..a14857b 100644
--- a/go.sum
+++ b/go.sum
@@ -27,6 +27,7 @@
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
 github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
 github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5882dd4..ee8ff50 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -123,8 +123,10 @@
 		log:     logger,
 		metrics: metrics,
 	}
-	c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics)
-	c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics)
+	serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
+
+	c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
+	c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver, tlsConfig != nil, logger, metrics)
 	c.handlers = internal.NewClientHandlers()
 
 	return c, nil
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 661eac2..65732ea 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"context"
 	"crypto/tls"
 	"fmt"
 	"io/ioutil"
@@ -458,3 +459,58 @@
 		"replication_clusters": []string{"standalone"},
 	}
 }
+
+func TestRetryWithMultipleHosts(t *testing.T) {
+	// Multi hosts included an unreached port and the actual port for verify retry logic
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6600,localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	var msgIDs [][]byte
+
+	for i := 0; i < 10; i++ {
+		if msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			assert.Nil(t, err)
+		} else {
+			assert.NotNil(t, msgID)
+			msgIDs = append(msgIDs, msgID.Serialize())
+		}
+	}
+
+	assert.Equal(t, 10, len(msgIDs))
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            "retry-multi-hosts-sub",
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.Nil(t, err)
+		assert.Contains(t, msgIDs, msg.ID().Serialize())
+		consumer.Ack(msg)
+	}
+
+	err = consumer.Unsubscribe()
+	assert.Nil(t, err)
+
+}
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 54d30b3..29e1267 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -103,6 +103,10 @@
 	}
 
 	if err := cnx.waitUntilReady(); err != nil {
+		if !wasCached {
+			p.pool.Delete(key)
+			p.log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
+		}
 		return nil, err
 	}
 	return cnx, nil
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index 65fa383..5b4e2ac 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -45,22 +45,22 @@
 }
 
 type lookupService struct {
-	rpcClient  RPCClient
-	serviceURL *url.URL
-	tlsEnabled bool
-	log        log.Logger
-	metrics    *Metrics
+	rpcClient           RPCClient
+	serviceNameResolver ServiceNameResolver
+	tlsEnabled          bool
+	log                 log.Logger
+	metrics             *Metrics
 }
 
 // NewLookupService init a lookup service struct and return an object of LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
+func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
 	tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
 	return &lookupService{
-		rpcClient:  rpcClient,
-		serviceURL: serviceURL,
-		tlsEnabled: tlsEnabled,
-		log:        logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
-		metrics:    metrics,
+		rpcClient:           rpcClient,
+		serviceNameResolver: serviceNameResolver,
+		tlsEnabled:          tlsEnabled,
+		log:                 logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+		metrics:             metrics,
 	}
 }
 
@@ -78,7 +78,10 @@
 
 	var physicalAddr *url.URL
 	if lr.GetProxyThroughServiceUrl() {
-		physicalAddr = ls.serviceURL
+		physicalAddr, err = ls.serviceNameResolver.ResolveHost()
+		if err != nil {
+			return nil, nil, err
+		}
 	} else {
 		physicalAddr = logicalAddress
 	}
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index ad0e1bd..a3f9bfe 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -112,6 +112,7 @@
 func TestLookupSuccess(t *testing.T) {
 	url, err := url.Parse("pulsar://example:6650")
 	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
 
 	ls := NewLookupService(&mockedLookupRPCClient{
 		t: t,
@@ -131,7 +132,7 @@
 				BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -144,6 +145,7 @@
 func TestTlsLookupSuccess(t *testing.T) {
 	url, err := url.Parse("pulsar+ssl://example:6651")
 	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
 
 	ls := NewLookupService(&mockedLookupRPCClient{
 		t: t,
@@ -163,7 +165,7 @@
 				BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
 			},
 		},
-	}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, serviceNameResolver, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -176,6 +178,7 @@
 func TestLookupWithProxy(t *testing.T) {
 	url, err := url.Parse("pulsar://example:6650")
 	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
 
 	ls := NewLookupService(&mockedLookupRPCClient{
 		t: t,
@@ -196,7 +199,7 @@
 				ProxyThroughServiceUrl: proto.Bool(true),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -229,7 +232,7 @@
 				ProxyThroughServiceUrl: proto.Bool(true),
 			},
 		},
-	}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -273,7 +276,7 @@
 				BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -317,7 +320,7 @@
 				BrokerServiceUrlTls: proto.String("pulsar+ssl://broker-1:6651"),
 			},
 		},
-	}, url, true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, NewPulsarServiceNameResolver(url), true, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.NoError(t, err)
@@ -350,7 +353,7 @@
 				ProxyThroughServiceUrl: proto.Bool(false),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.Error(t, err)
@@ -378,7 +381,7 @@
 				Authoritative: proto.Bool(true),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, NewPulsarServiceNameResolver(url), false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	lr, err := ls.Lookup("my-topic")
 	assert.Error(t, err)
@@ -447,6 +450,7 @@
 func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
 	url, err := url.Parse("pulsar://example:6650")
 	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
 
 	ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
 		t: t,
@@ -464,10 +468,43 @@
 				Response:   pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
 			},
 		},
-	}, url, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+	}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
 	metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
 	assert.NoError(t, err)
 	assert.NotNil(t, metadata)
 	assert.Equal(t, metadata.GetPartitions(), uint32(1))
 }
+
+func TestLookupSuccessWithMultipleHosts(t *testing.T) {
+	url, err := url.Parse("pulsar://host1,host2,host3:6650")
+	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
+
+	ls := NewLookupService(&mockedLookupRPCClient{
+		t: t,
+
+		expectedRequests: []pb.CommandLookupTopic{
+			{
+				RequestId:     proto.Uint64(1),
+				Topic:         proto.String("my-topic"),
+				Authoritative: proto.Bool(false),
+			},
+		},
+		mockedResponses: []pb.CommandLookupTopicResponse{
+			{
+				RequestId:        proto.Uint64(1),
+				Response:         responseType(pb.CommandLookupTopicResponse_Connect),
+				Authoritative:    proto.Bool(true),
+				BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
+			},
+		},
+	}, url, serviceNameResolver, false, log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+
+	lr, err := ls.Lookup("my-topic")
+	assert.NoError(t, err)
+	assert.NotNil(t, lr)
+
+	assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
+	assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
+}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0bb482c..b51e4e3 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -56,7 +56,7 @@
 }
 
 type rpcClient struct {
-	serviceURL          *url.URL
+	serviceNameResolver ServiceNameResolver
 	pool                ConnectionPool
 	requestTimeout      time.Duration
 	requestIDGenerator  uint64
@@ -66,22 +66,26 @@
 	metrics             *Metrics
 }
 
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, pool ConnectionPool,
 	requestTimeout time.Duration, logger log.Logger, metrics *Metrics) RPCClient {
 	return &rpcClient{
-		serviceURL:     serviceURL,
-		pool:           pool,
-		requestTimeout: requestTimeout,
-		log:            logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
-		metrics:        metrics,
+		serviceNameResolver: serviceNameResolver,
+		pool:                pool,
+		requestTimeout:      requestTimeout,
+		log:                 logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+		metrics:             metrics,
 	}
 }
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
 	message proto.Message) (*RPCResult, error) {
-
-	rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
-	if _, ok := err.(net.Error); ok {
+	host, err := c.serviceNameResolver.ResolveHost()
+	if err != nil {
+		c.log.Errorf("request host resolve failed with error: {%v}", err)
+		return nil, err
+	}
+	rpcResult, err := c.Request(host, host, requestID, cmdType, message)
+	if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
 		// We can retry this kind of requests over a connection error because they're
 		// not specific to a particular broker.
 		backoff := Backoff{100 * time.Millisecond}
@@ -92,9 +96,13 @@
 			retryTime = backoff.Next()
 			c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
 			time.Sleep(retryTime)
-
-			rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
-			if _, ok := err.(net.Error); ok {
+			host, err = c.serviceNameResolver.ResolveHost()
+			if err != nil {
+				c.log.Errorf("Retrying request host resolve failed with error: {%v}", err)
+				continue
+			}
+			rpcResult, err = c.Request(host, host, requestID, cmdType, message)
+			if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
 				continue
 			} else {
 				// We either succeeded or encountered a non connection error
@@ -102,7 +110,6 @@
 			}
 		}
 	}
-
 	return rpcResult, err
 }
 
diff --git a/pulsar/internal/service_name_resolver.go b/pulsar/internal/service_name_resolver.go
new file mode 100644
index 0000000..3b1209c
--- /dev/null
+++ b/pulsar/internal/service_name_resolver.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"errors"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"sync/atomic"
+	"time"
+
+	log "github.com/sirupsen/logrus"
+)
+
+type ServiceNameResolver interface {
+	ResolveHost() (*url.URL, error)
+	ResolveHostURI() (*PulsarServiceURI, error)
+	UpdateServiceURL(url *url.URL) error
+	GetServiceURI() *PulsarServiceURI
+	GetServiceURL() *url.URL
+	GetAddressList() []*url.URL
+}
+
+type pulsarServiceNameResolver struct {
+	ServiceURI   *PulsarServiceURI
+	ServiceURL   *url.URL
+	CurrentIndex int32
+	AddressList  []*url.URL
+}
+
+func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
+	r := &pulsarServiceNameResolver{}
+	err := r.UpdateServiceURL(url)
+	if err != nil {
+		log.Errorf("create pulsar service name resolver failed : %v", err)
+	}
+	return r
+}
+
+func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
+	if r.AddressList == nil {
+		return nil, errors.New("no service url is provided yet")
+	}
+	if len(r.AddressList) == 0 {
+		return nil, fmt.Errorf("no hosts found for service url : %v", r.ServiceURL)
+	}
+	if len(r.AddressList) == 1 {
+		return r.AddressList[0], nil
+	}
+	idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
+	atomic.StoreInt32(&r.CurrentIndex, idx)
+	return r.AddressList[idx], nil
+}
+
+func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, error) {
+	host, err := r.ResolveHost()
+	if err != nil {
+		return nil, err
+	}
+	hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port()
+	return NewPulsarServiceURIFromURIString(hostURL)
+}
+
+func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error {
+	uri, err := NewPulsarServiceURIFromURL(u)
+	if err != nil {
+		log.Errorf("invalid service-url instance %s provided %v", u, err)
+		return err
+	}
+
+	hosts := uri.ServiceHosts
+	addresses := []*url.URL{}
+	for _, host := range hosts {
+		hostURL := uri.URL.Scheme + "://" + host
+		u, err := url.Parse(hostURL)
+		if err != nil {
+			log.Errorf("invalid host-url %s provided %v", hostURL, err)
+			return err
+		}
+		addresses = append(addresses, u)
+	}
+	r.AddressList = addresses
+	r.ServiceURL = u
+	r.ServiceURI = uri
+	rand.Seed(time.Now().Unix()) // initialize global pseudo random generator
+	atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses))))
+	return nil
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI {
+	return r.ServiceURI
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL {
+	return r.ServiceURL
+}
+
+func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL {
+	return r.AddressList
+}
diff --git a/pulsar/internal/service_name_resolver_test.go b/pulsar/internal/service_name_resolver_test.go
new file mode 100644
index 0000000..e190633
--- /dev/null
+++ b/pulsar/internal/service_name_resolver_test.go
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"net/url"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	u, err := resolver.ResolveHost()
+	assert.Nil(t, u)
+	assert.NotNil(t, err)
+	assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	u, err := resolver.ResolveHostURI()
+	assert.Nil(t, u)
+	assert.NotNil(t, err)
+	assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestUpdateInvalidServiceUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	url, _ := url.Parse("pulsar:///")
+	err := resolver.UpdateServiceURL(url)
+	assert.NotNil(t, err)
+	assert.Empty(t, resolver.GetServiceURL())
+	assert.Nil(t, resolver.GetServiceURI())
+}
+
+func TestSimpleHostUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	serviceURL, _ := url.Parse("pulsar://host1:6650")
+	err := resolver.UpdateServiceURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, serviceURL, resolver.GetServiceURL())
+	expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedURI, resolver.GetServiceURI())
+	actualHost, err := resolver.ResolveHost()
+	assert.Nil(t, err)
+	assert.Equal(t, "host1", actualHost.Hostname())
+	assert.Equal(t, "6650", actualHost.Port())
+
+	newServiceURL, _ := url.Parse("pulsar://host2:6650")
+	err = resolver.UpdateServiceURL(newServiceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, newServiceURL, resolver.GetServiceURL())
+	expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedURI, resolver.GetServiceURI())
+	actualHost, err = resolver.ResolveHost()
+	assert.Nil(t, err)
+	assert.Equal(t, "host2", actualHost.Hostname())
+	assert.Equal(t, "6650", actualHost.Port())
+}
+
+func TestMultipleHostsUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650")
+	err := resolver.UpdateServiceURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, serviceURL, resolver.GetServiceURL())
+	expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedURI, resolver.GetServiceURI())
+	host1, _ := url.Parse("pulsar://host1:6650")
+	host2, _ := url.Parse("pulsar://host2:6650")
+	host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650")
+	host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650")
+	assert.Contains(t, resolver.GetAddressList(), host1)
+	assert.Contains(t, resolver.GetAddressList(), host2)
+	hosts := []*url.URL{host1, host2}
+	hosturis := []*PulsarServiceURI{host1uri, host2uri}
+	for i := 0; i < 10; i++ {
+		host, err := resolver.ResolveHost()
+		assert.Nil(t, err)
+		hosturi, err := resolver.ResolveHostURI()
+		assert.Nil(t, err)
+		assert.Contains(t, hosts, host)
+		assert.Contains(t, hosturis, hosturi)
+	}
+}
+
+func TestMultipleHostsTlsUrl(t *testing.T) {
+	resolver := NewPulsarServiceNameResolver(nil)
+	serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651")
+	err := resolver.UpdateServiceURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, serviceURL, resolver.GetServiceURL())
+	expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedURI, resolver.GetServiceURI())
+	host1, _ := url.Parse("pulsar+ssl://host1:6651")
+	host2, _ := url.Parse("pulsar+ssl://host2:6651")
+	host1uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651")
+	host2uri, _ := NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651")
+	assert.Contains(t, resolver.GetAddressList(), host1)
+	assert.Contains(t, resolver.GetAddressList(), host2)
+	hosts := []*url.URL{host1, host2}
+	hosturis := []*PulsarServiceURI{host1uri, host2uri}
+	for i := 0; i < 10; i++ {
+		host, err := resolver.ResolveHost()
+		assert.Nil(t, err)
+		hosturi, err := resolver.ResolveHostURI()
+		assert.Nil(t, err)
+		assert.Contains(t, hosts, host)
+		assert.Contains(t, hosturis, hosturi)
+	}
+}
diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go
new file mode 100644
index 0000000..7b75339
--- /dev/null
+++ b/pulsar/internal/service_uri.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"errors"
+	"fmt"
+	"net"
+	"net/url"
+	"strings"
+
+	log "github.com/sirupsen/logrus"
+)
+
+const (
+	BinaryService = "pulsar"
+	HTTPService   = "http"
+	HTTPSService  = "https"
+	SSLService    = "ssl"
+	BinaryPort    = 6650
+	BinaryTLSPort = 6651
+	HTTPPort      = 80
+	HTTPSPort     = 443
+)
+
+type PulsarServiceURI struct {
+	ServiceName  string
+	ServiceInfos []string
+	ServiceHosts []string
+	servicePath  string
+	URL          *url.URL
+}
+
+func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
+	u, err := fromString(uri)
+	if err != nil {
+		log.Error(err)
+		return nil, err
+	}
+	return u, nil
+}
+
+func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
+	u, err := fromURL(url)
+	if err != nil {
+		log.Error(err)
+		return nil, err
+	}
+	return u, nil
+}
+
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+	if uriStr == "" || len(uriStr) == 0 {
+		return nil, errors.New("service uriStr string is null")
+	}
+	if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
+		// deal with ipv6 address
+		hosts := strings.FieldsFunc(uriStr, splitURI)
+		if len(hosts) > 1 {
+			// deal with ipv6 address
+			firstHost := hosts[0]
+			lastHost := hosts[len(hosts)-1]
+			hasPath := strings.Contains(lastHost, "/")
+			path := ""
+			if hasPath {
+				idx := strings.Index(lastHost, "/")
+				path = lastHost[idx:]
+			}
+			firstHost += path
+			url, err := url.Parse(firstHost)
+			if err != nil {
+				return nil, err
+			}
+			serviceURI, err := fromURL(url)
+			if err != nil {
+				return nil, err
+			}
+			var mHosts []string
+			var multiHosts []string
+			mHosts = append(mHosts, serviceURI.ServiceHosts[0])
+			mHosts = append(mHosts, hosts[1:]...)
+
+			for _, v := range mHosts {
+				h, err := validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
+				if err == nil {
+					multiHosts = append(multiHosts, h)
+				} else {
+					return nil, err
+				}
+			}
+
+			return &PulsarServiceURI{
+				serviceURI.ServiceName,
+				serviceURI.ServiceInfos,
+				multiHosts,
+				serviceURI.servicePath,
+				serviceURI.URL,
+			}, nil
+		}
+	}
+
+	url, err := url.Parse(uriStr)
+	if err != nil {
+		return nil, err
+	}
+
+	return fromURL(url)
+}
+
+func fromURL(url *url.URL) (*PulsarServiceURI, error) {
+	if url == nil {
+		return nil, errors.New("service url instance is null")
+	}
+
+	if url.Host == "" || len(url.Host) == 0 {
+		return nil, errors.New("service host is null")
+	}
+
+	var serviceName string
+	var serviceInfos []string
+	scheme := url.Scheme
+	if scheme != "" {
+		scheme = strings.ToLower(scheme)
+		schemeParts := strings.Split(scheme, "+")
+		serviceName = schemeParts[0]
+		serviceInfos = schemeParts[1:]
+	}
+
+	var serviceHosts []string
+	hosts := strings.FieldsFunc(url.Host, splitURI)
+	for _, v := range hosts {
+		h, err := validateHostName(serviceName, serviceInfos, v)
+		if err == nil {
+			serviceHosts = append(serviceHosts, h)
+		} else {
+			return nil, err
+		}
+	}
+
+	return &PulsarServiceURI{
+		serviceName,
+		serviceInfos,
+		serviceHosts,
+		url.Path,
+		url,
+	}, nil
+}
+
+func splitURI(r rune) bool {
+	return r == ',' || r == ';'
+}
+
+func validateHostName(serviceName string, serviceInfos []string, hostname string) (string, error) {
+	uri, err := url.Parse("dummyscheme://" + hostname)
+	if err != nil {
+		return "", err
+	}
+	host := uri.Hostname()
+	if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
+		host = fmt.Sprintf("[%s]", host)
+	}
+	if host == "" || uri.Scheme == "" {
+		return "", errors.New("Invalid hostname : " + hostname)
+	}
+
+	port := uri.Port()
+	if uri.Port() == "" {
+		p := getServicePort(serviceName, serviceInfos)
+		if p == -1 {
+			return "", fmt.Errorf("invalid port : %d", p)
+		}
+		port = fmt.Sprint(p)
+	}
+	result := host + ":" + port
+	_, _, err = net.SplitHostPort(result)
+	if err != nil {
+		return "", err
+	}
+	return result, nil
+}
+
+func getServicePort(serviceName string, serviceInfos []string) int {
+	switch strings.ToLower(serviceName) {
+	case BinaryService:
+		if len(serviceInfos) == 0 {
+			return BinaryPort
+		} else if len(serviceInfos) == 1 && strings.ToLower(serviceInfos[0]) == SSLService {
+			return BinaryTLSPort
+		}
+	case HTTPService:
+		return HTTPPort
+	case HTTPSService:
+		return HTTPSPort
+	}
+	return -1
+}
diff --git a/pulsar/internal/service_uri_test.go b/pulsar/internal/service_uri_test.go
new file mode 100644
index 0000000..445b325
--- /dev/null
+++ b/pulsar/internal/service_uri_test.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestInvalidServiceUris(t *testing.T) {
+	uris := []string{
+		"://localhost:6650",              // missing scheme
+		"pulsar:///",                     // missing authority
+		"pulsar://localhost:6650:6651/",  // invalid hostname pair
+		"pulsar://localhost:xyz/",        // invalid port
+		"pulsar://localhost:-6650/",      // negative port
+		"pulsar://fec0:0:0:ffff::1:6650", // missing brackets
+	}
+
+	for _, uri := range uris {
+		testInvalidServiceURI(t, uri)
+	}
+}
+
+func TestEmptyServiceUriString(t *testing.T) {
+	u, err := NewPulsarServiceURIFromURIString("")
+	assert.Nil(t, u)
+	assert.NotNil(t, err)
+}
+
+func TestNullServiceUrlInstance(t *testing.T) {
+	u, err := NewPulsarServiceURIFromURL(nil)
+	assert.Nil(t, u)
+	assert.NotNil(t, err)
+}
+
+func TestMissingServiceName(t *testing.T) {
+	serviceURI := "//localhost:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, "/path/to/namespace", "")
+}
+
+func TestEmptyPath(t *testing.T) {
+	serviceURI := "pulsar://localhost:6650"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "", "")
+}
+
+func TestRootPath(t *testing.T) {
+	serviceURI := "pulsar://localhost:6650/"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/", "")
+}
+
+func TestUserInfo(t *testing.T) {
+	serviceURI := "pulsar://pulsaruser@localhost:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"localhost:6650"}, "/path/to/namespace", "pulsaruser")
+}
+
+func TestIpv6Uri(t *testing.T) {
+	serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace",
+		"pulsaruser")
+}
+
+func TestIpv6UriWithoutPulsarPort(t *testing.T) {
+	serviceURI := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "")
+}
+
+func TestMultiIpv6Uri(t *testing.T) {
+	serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650" +
+		"/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil,
+		[]string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+		"pulsaruser")
+}
+
+func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) {
+	serviceURI := "pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil,
+		[]string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", "[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+		"pulsaruser")
+}
+
+func TestMultipleHostsSemiColon(t *testing.T) {
+	serviceURI := "pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+		"/path/to/namespace", "")
+}
+
+func TestMultipleHostsComma(t *testing.T) {
+	serviceURI := "pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+		"/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarPorts(t *testing.T) {
+	serviceURI := "pulsar://host1,host2,host3/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+		"/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) {
+	serviceURI := "pulsar+ssl://host1,host2,host3/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", []string{"ssl"}, []string{"host1:6651", "host2:6651", "host3:6651"},
+		"/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpPorts(t *testing.T) {
+	serviceURI := "http://host1,host2,host3/path/to/namespace"
+	assertServiceURI(t, serviceURI, "http", nil, []string{"host1:80", "host2:80", "host3:80"}, "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpsPorts(t *testing.T) {
+	serviceURI := "https://host1,host2,host3/path/to/namespace"
+	assertServiceURI(t, serviceURI, "https", nil, []string{"host1:443", "host2:443", "host3:443"}, "/path/to/namespace",
+		"")
+}
+
+func TestMultipleHostsMixedPorts(t *testing.T) {
+	serviceURI := "pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
+		"/path/to/namespace", "")
+}
+
+func TestMultipleHostsMixed(t *testing.T) {
+	serviceURI := "pulsar://host1:6640,host2,host3:6660/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", "host2:6650", "host3:6660"},
+		"/path/to/namespace", "")
+}
+
+func TestUserInfoWithMultipleHosts(t *testing.T) {
+	serviceURI := "pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
+	assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", "host2:6650", "host3:6650"},
+		"/path/to/namespace", "pulsaruser")
+}
+
+func testInvalidServiceURI(t *testing.T, serviceURI string) {
+	u, err := NewPulsarServiceURIFromURIString(serviceURI)
+	t.Logf("testInvalidServiceURI %s", serviceURI)
+	assert.Nil(t, u)
+	assert.NotNil(t, err)
+}
+
+func assertServiceURI(t *testing.T, serviceURI, expectedServiceName string,
+	expectedServiceInfo, expectedServiceHosts []string, expectedServicePath, expectedServiceUser string) {
+	uri, err := NewPulsarServiceURIFromURIString(serviceURI)
+	assert.Nil(t, err)
+	assert.NotNil(t, serviceURI)
+	assert.Equal(t, expectedServiceName, uri.ServiceName)
+	assert.Equal(t, expectedServicePath, uri.servicePath)
+	if expectedServiceUser != "" {
+		assert.Equal(t, expectedServiceUser, uri.URL.User.Username())
+	}
+	assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos)
+	assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts)
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5708cad..4d62cac 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -621,7 +621,7 @@
 	}
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic:      topic,
-		Name:       "my-producer",
+		Name:       "meta-data-producer",
 		Properties: props,
 	})
 	if err != nil {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index cd97964..f72ba1d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -586,3 +586,55 @@
 
 	assert.False(t, reader.HasNext())
 }
+
+func TestReaderWithMultiHosts(t *testing.T) {
+	// Multi hosts included an unreached port and the actual port for verify retry logic
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6600,localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, msgID)
+	}
+
+	// create reader on 5th message (not included)
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessageID(),
+	})
+
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	i := 0
+	for reader.HasNext() {
+		msg, err := reader.Next(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+		i++
+	}
+
+	assert.Equal(t, 10, i)
+}