[Issue 345] Add a new method to create auth provider from tls cert supplier (#347)

* add a new method to create auth provider from tls cert supplier

* fixup typo
diff --git a/pulsar/client.go b/pulsar/client.go
index 460b275..2253607 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"crypto/tls"
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal/auth"
@@ -57,6 +58,11 @@
 	return auth.NewAuthenticationTLS(certificatePath, privateKeyPath)
 }
 
+// Create new Authentication provider with specified TLS certificate supplier
+func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Authentication {
+	return auth.NewAuthenticationFromTLSCertSupplier(tlsCertSupplier)
+}
+
 func NewAuthenticationAthenz(authParams map[string]string) Authentication {
 	athenz, _ := auth.NewAuthenticationAthenzWithParams(authParams)
 	return athenz
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index ba17219..96fd68e 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"crypto/tls"
 	"fmt"
 	"io/ioutil"
 	"net/http"
@@ -162,6 +163,28 @@
 	client.Close()
 }
 
+func TestTLSAuthWithCertSupplier(t *testing.T) {
+	supplier := func() (*tls.Certificate, error) {
+		cert, err := tls.LoadX509KeyPair(tlsClientCertPath, tlsClientKeyPath)
+		return &cert, err
+	}
+	client, err := NewClient(ClientOptions{
+		URL:                   serviceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		Authentication:        NewAuthenticationFromTLSCertSupplier(supplier),
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
 func TestTokenAuth(t *testing.T) {
 	token, err := ioutil.ReadFile(tokenFilePath)
 	assert.NoError(t, err)
diff --git a/pulsar/internal/auth/tls.go b/pulsar/internal/auth/tls.go
index f04b755..8dc6ee7 100644
--- a/pulsar/internal/auth/tls.go
+++ b/pulsar/internal/auth/tls.go
@@ -22,6 +22,7 @@
 type tlsAuthProvider struct {
 	certificatePath string
 	privateKeyPath  string
+	tlsCertSupplier func() (*tls.Certificate, error)
 }
 
 // NewAuthenticationTLSWithParams initialize the authentication provider with map param.
@@ -40,6 +41,12 @@
 	}
 }
 
+func NewAuthenticationFromTLSCertSupplier(tlsCertSupplier func() (*tls.Certificate, error)) Provider {
+	return &tlsAuthProvider{
+		tlsCertSupplier: tlsCertSupplier,
+	}
+}
+
 func (p *tlsAuthProvider) Init() error {
 	// Try to read certificates immediately to provide better error at startup
 	_, err := p.GetTLSCertificate()
@@ -51,6 +58,9 @@
 }
 
 func (p *tlsAuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+	if p.tlsCertSupplier != nil {
+		return p.tlsCertSupplier()
+	}
 	cert, err := tls.LoadX509KeyPair(p.certificatePath, p.privateKeyPath)
 	return &cert, err
 }