[Issue 118][client_impl.go] fix race condition when accessing client.handlers (#119)

* fix race condition when access client.handlers

* added newline to statisfy lic test

* refactor Set(Closable, bool) to Add(Closable) for a more explicit
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 9cb3442..d0c36fe 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -39,7 +39,7 @@
 	lookupService internal.LookupService
 	auth          auth.Provider
 
-	handlers            map[internal.Closable]bool
+	handlers            internal.ClientHandlers
 	producerIDGenerator uint64
 	consumerIDGenerator uint64
 }
@@ -86,14 +86,14 @@
 	}
 	c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
 	c.lookupService = internal.NewLookupService(c.rpcClient, url)
-	c.handlers = make(map[internal.Closable]bool)
+	c.handlers = internal.NewClientHandlers()
 	return c, nil
 }
 
 func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
 	producer, err := newProducer(c, &options)
 	if err == nil {
-		c.handlers[producer] = true
+		c.handlers.Add(producer)
 	}
 	return producer, err
 }
@@ -103,7 +103,7 @@
 	if err != nil {
 		return nil, err
 	}
-	c.handlers[consumer] = true
+	c.handlers.Add(consumer)
 	return consumer, nil
 }
 
@@ -145,9 +145,7 @@
 }
 
 func (c *client) Close() {
-	for handler := range c.handlers {
-		handler.Close()
-	}
+	c.handlers.Close()
 }
 
 func (c *client) namespaceTopics(namespace string) ([]string, error) {
diff --git a/pulsar/internal/client_handlers.go b/pulsar/internal/client_handlers.go
new file mode 100644
index 0000000..1ecfdd9
--- /dev/null
+++ b/pulsar/internal/client_handlers.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "sync"
+
+// ClientHandlerMap is a simple concurrent-safe map for the client type
+type ClientHandlers struct {
+	handlers map[Closable]bool
+	l        *sync.RWMutex
+}
+
+func NewClientHandlers() ClientHandlers {
+	return ClientHandlers{
+		handlers: map[Closable]bool{},
+		l:        &sync.RWMutex{},
+	}
+}
+func (h *ClientHandlers) Add(c Closable) {
+	h.l.Lock()
+	defer h.l.Unlock()
+	h.handlers[c] = true
+}
+func (h *ClientHandlers) Val(c Closable) bool {
+	h.l.RLock()
+	defer h.l.RUnlock()
+	return h.handlers[c]
+}
+
+func (h *ClientHandlers) Close() {
+	h.l.Lock()
+	defer h.l.Unlock()
+
+	for handler := range h.handlers {
+		handler.Close()
+	}
+}
diff --git a/pulsar/internal/client_handlers_test.go b/pulsar/internal/client_handlers_test.go
new file mode 100644
index 0000000..baaf899
--- /dev/null
+++ b/pulsar/internal/client_handlers_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestClientHandlers(t *testing.T) {
+	h := NewClientHandlers()
+	assert.NotNil(t, h.l)
+	assert.Equal(t, h.handlers, map[Closable]bool{})
+
+	closable := &testClosable{false}
+	h.Add(closable)
+	assert.True(t, h.Val(closable))
+
+	h.Close()
+	t.Log("closable is: ", closable.closed)
+	assert.True(t, closable.closed)
+}
+
+type testClosable struct {
+	closed bool
+}
+
+func (t *testClosable) Close() {
+	t.closed = true
+}