[Issue #153] Fix handler memory leak (#154)
* fix producer handler memory leak
* consumer removed from client handler when closed
* golanglint
* add tests & fix deadlock when handler close
* make handlers delete in closeOnce to prevent unnecessary calls
* goimports
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 3bad95a..ede02ef 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -41,6 +41,7 @@
}
type consumer struct {
+ client *client
options ConsumerOptions
consumers []*partitionConsumer
@@ -114,6 +115,7 @@
func internalTopicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage) (*consumer, error) {
consumer := &consumer{
+ client: client,
options: options,
messageCh: messageCh,
closeCh: make(chan struct{}),
@@ -316,6 +318,7 @@
}
wg.Wait()
close(c.closeCh)
+ c.client.handlers.Del(c)
})
}
diff --git a/pulsar/internal/client_handlers.go b/pulsar/internal/client_handlers.go
index 1ecfdd9..39b9bb7 100644
--- a/pulsar/internal/client_handlers.go
+++ b/pulsar/internal/client_handlers.go
@@ -36,6 +36,13 @@
defer h.l.Unlock()
h.handlers[c] = true
}
+
+func (h *ClientHandlers) Del(c Closable) {
+ h.l.Lock()
+ defer h.l.Unlock()
+ delete(h.handlers, c)
+}
+
func (h *ClientHandlers) Val(c Closable) bool {
h.l.RLock()
defer h.l.RUnlock()
@@ -44,9 +51,13 @@
func (h *ClientHandlers) Close() {
h.l.Lock()
- defer h.l.Unlock()
-
+ handlers := make([]Closable, 0, len(h.handlers))
for handler := range h.handlers {
+ handlers = append(handlers, handler)
+ }
+ h.l.Unlock()
+
+ for _, handler := range handlers {
handler.Close()
}
}
diff --git a/pulsar/internal/client_handlers_test.go b/pulsar/internal/client_handlers_test.go
index baaf899..09dcf44 100644
--- a/pulsar/internal/client_handlers_test.go
+++ b/pulsar/internal/client_handlers_test.go
@@ -28,7 +28,7 @@
assert.NotNil(t, h.l)
assert.Equal(t, h.handlers, map[Closable]bool{})
- closable := &testClosable{false}
+ closable := &testClosable{h: &h, closed: false}
h.Add(closable)
assert.True(t, h.Val(closable))
@@ -37,10 +37,40 @@
assert.True(t, closable.closed)
}
+func TestClientHandlers_Del(t *testing.T) {
+ h := NewClientHandlers()
+ assert.NotNil(t, h.l)
+ assert.Equal(t, h.handlers, map[Closable]bool{})
+
+ closable1 := &testClosable{h: &h, closed: false}
+ h.Add(closable1)
+
+ closable2 := &testClosable{h: &h, closed: false}
+ h.Add(closable2)
+
+ assert.Len(t, h.handlers, 2)
+ assert.True(t, h.Val(closable1))
+ assert.True(t, h.Val(closable2))
+
+ closable1.Close()
+ assert.False(t, h.Val(closable1))
+ assert.True(t, h.Val(closable2))
+ assert.Len(t, h.handlers, 1)
+
+ h.Close()
+ t.Log("closable1 is: closed ", closable1.closed)
+ t.Log("closable2 is: closed ", closable2.closed)
+ assert.True(t, closable1.closed)
+ assert.True(t, closable2.closed)
+ assert.Len(t, h.handlers, 0)
+}
+
type testClosable struct {
+ h *ClientHandlers
closed bool
}
func (t *testClosable) Close() {
t.closed = true
+ t.h.Del(t)
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 101929d..83a6f75 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -24,6 +24,7 @@
)
type producer struct {
+ client *client
topic string
producers []Producer
messageRouter func(*ProducerMessage, TopicMetadata) int
@@ -46,7 +47,8 @@
}
p := &producer{
- topic: options.Topic,
+ topic: options.Topic,
+ client: client,
}
if options.MessageRouter == nil {
@@ -160,4 +162,5 @@
for _, pp := range p.producers {
pp.Close()
}
+ p.client.handlers.Del(p)
}