Remove error return on Close() methods (#99)

diff --git a/pulsar/client.go b/pulsar/client.go
index e69a9b2..6916371 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -108,5 +108,5 @@
 	TopicPartitions(topic string) ([]string, error)
 
 	// Close the Client and free associated resources
-	Close() error
+	Close()
 }
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 9fec475..23ec78c 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -165,5 +165,5 @@
 	NackID(MessageID)
 
 	// Close the consumer and stop the broker to push more messages
-	Close() error
+	Close()
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 216ad0d..d2a3474 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -163,7 +163,7 @@
 		// cleanup all the partitions that succeeded in creating the consumer
 		for _, c := range consumer.consumers {
 			if c != nil {
-				_ = c.Close()
+				c.Close()
 			}
 		}
 		return nil, err
@@ -253,7 +253,7 @@
 	c.consumers[partition].NackID(mid)
 }
 
-func (c *consumer) Close() error {
+func (c *consumer) Close() {
 	var wg sync.WaitGroup
 	for i := range c.consumers {
 		wg.Add(1)
@@ -263,8 +263,6 @@
 		}(c.consumers[i])
 	}
 	wg.Wait()
-
-	return nil
 }
 
 var random = rand.New(rand.NewSource(time.Now().UnixNano()))
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b1d7b15..d49626f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -183,9 +183,9 @@
 		})
 }
 
-func (pc *partitionConsumer) Close() error {
+func (pc *partitionConsumer) Close() {
 	if pc.state != consumerReady {
-		return nil
+		return
 	}
 
 	req := &closeRequest{doneCh: make(chan struct{})}
@@ -193,7 +193,6 @@
 
 	// wait for request to finish
 	<-req.doneCh
-	return req.err
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -403,7 +402,6 @@
 
 type closeRequest struct {
 	doneCh chan struct{}
-	err    error
 }
 
 type redeliveryRequest struct {
@@ -452,14 +450,15 @@
 	}
 	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
 	if err != nil {
-		req.err = err
+		pc.log.WithError(err).Warn("Failed to close consumer")
 	} else {
 		pc.log.Info("Closed consumer")
-		pc.state = consumerClosed
-		pc.conn.DeleteConsumeHandler(pc.consumerID)
-		pc.nackTracker.Close()
-		close(pc.closeCh)
 	}
+
+	pc.state = consumerClosed
+	pc.conn.DeleteConsumeHandler(pc.consumerID)
+	pc.nackTracker.Close()
+	close(pc.closeCh)
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go
index 851b473..71273c5 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -28,8 +28,8 @@
 func TestSingleMessageIDNoAckTracker(t *testing.T) {
 	eventsCh := make(chan interface{}, 1)
 	pc := partitionConsumer{
-		queueCh:          make(chan []*message, 1),
-		eventsCh:         eventsCh,
+		queueCh:  make(chan []*message, 1),
+		eventsCh: eventsCh,
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -46,8 +46,8 @@
 	// ack the message id
 	pc.AckID(messages[0].msgID.(*messageID))
 
-	select{
-	case <- eventsCh:
+	select {
+	case <-eventsCh:
 	default:
 		t.Error("Expected an ack request to be triggered!")
 	}
@@ -56,8 +56,8 @@
 func TestBatchMessageIDNoAckTracker(t *testing.T) {
 	eventsCh := make(chan interface{}, 1)
 	pc := partitionConsumer{
-		queueCh:          make(chan []*message, 1),
-		eventsCh:         eventsCh,
+		queueCh:  make(chan []*message, 1),
+		eventsCh: eventsCh,
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -74,8 +74,8 @@
 	// ack the message id
 	pc.AckID(messages[0].msgID.(*messageID))
 
-	select{
-	case <- eventsCh:
+	select {
+	case <-eventsCh:
 	default:
 		t.Error("Expected an ack request to be triggered!")
 	}
@@ -84,8 +84,8 @@
 func TestBatchMessageIDWithAckTracker(t *testing.T) {
 	eventsCh := make(chan interface{}, 1)
 	pc := partitionConsumer{
-		queueCh:          make(chan []*message, 1),
-		eventsCh:         eventsCh,
+		queueCh:  make(chan []*message, 1),
+		eventsCh: eventsCh,
 	}
 
 	headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
@@ -104,8 +104,8 @@
 		pc.AckID(messages[i].msgID.(*messageID))
 	}
 
-	select{
-	case <- eventsCh:
+	select {
+	case <-eventsCh:
 		t.Error("The message id should not be acked!")
 	default:
 	}
@@ -113,8 +113,8 @@
 	// ack last message
 	pc.AckID(messages[9].msgID.(*messageID))
 
-	select{
-	case <- eventsCh:
+	select {
+	case <-eventsCh:
 	default:
 		t.Error("Expected an ack request to be triggered!")
 	}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cc40d22..f34966b 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -593,8 +593,7 @@
 		}
 	}
 
-	err = consumer.Close()
-	assert.Nil(t, err)
+	consumer.Close()
 
 	// Subscribe again
 	consumer, err = client.Subscribe(ConsumerOptions{
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 53c2074..b9869a5 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -143,12 +143,8 @@
 	return []string{topicName.Name}, nil
 }
 
-func (client *client) Close() error {
+func (client *client) Close() {
 	for handler := range client.handlers {
-		if err := handler.Close(); err != nil {
-			return err
-		}
+		handler.Close()
 	}
-
-	return nil
 }
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index 55e227f..7c28da5 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -47,8 +47,7 @@
 	assert.Error(t, err)
 	assert.Nil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSInsecureConnection(t *testing.T) {
@@ -65,8 +64,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSConnection(t *testing.T) {
@@ -83,8 +81,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSConnectionHostNameVerification(t *testing.T) {
@@ -102,8 +99,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSConnectionHostNameVerificationError(t *testing.T) {
@@ -121,8 +117,7 @@
 	assert.Error(t, err)
 	assert.Nil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSAuthError(t *testing.T) {
@@ -139,8 +134,7 @@
 	assert.Error(t, err)
 	assert.Nil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTLSAuth(t *testing.T) {
@@ -158,8 +152,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTokenAuth(t *testing.T) {
@@ -179,8 +172,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTokenAuthFromFile(t *testing.T) {
@@ -197,8 +189,7 @@
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 
-	err = client.Close()
-	assert.NoError(t, err)
+	client.Close()
 }
 
 func TestTopicPartitions(t *testing.T) {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index dd420a4..5d09242 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -414,7 +414,7 @@
 	})
 
 	if err != nil {
-		req.err = err
+		p.log.WithError(err).Warn("Failed to close producer")
 	} else {
 		p.log.Info("Closed producer")
 		p.state = producerClosed
@@ -440,20 +440,19 @@
 	return cp.err
 }
 
-func (p *partitionProducer) Close() error {
+func (p *partitionProducer) Close() {
 	if p.state != producerReady {
 		// Producer is closing
-		return nil
+		return
 	}
 
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 
-	cp := &closeProducer{&wg, nil}
+	cp := &closeProducer{&wg}
 	p.eventsChan <- cp
 
 	wg.Wait()
-	return cp.err
 }
 
 type sendRequest struct {
@@ -465,7 +464,6 @@
 
 type closeProducer struct {
 	waitGroup *sync.WaitGroup
-	err       error
 }
 
 type flushRequest struct {
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index b0620c2..c1a9902 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -19,8 +19,6 @@
 
 import (
 	"context"
-	"fmt"
-	"github.com/pkg/errors"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
@@ -102,7 +100,7 @@
 		// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
 		for _, producer := range p.producers {
 			if producer != nil {
-				_ = producer.Close()
+				producer.Close()
 			}
 		}
 		return nil, err
@@ -154,13 +152,8 @@
 	return nil
 }
 
-func (p *producer) Close() error {
-	var errs error
+func (p *producer) Close() {
 	for _, pp := range p.producers {
-		if err := pp.Close(); err != nil {
-			errs = errors.Wrap(err, fmt.Sprintf("unable to close producer %s", p.Name()))
-		}
-
+		pp.Close()
 	}
-	return errs
 }
diff --git a/pulsar/internal/closable.go b/pulsar/internal/closable.go
index 7906c89..ac26e1e 100644
--- a/pulsar/internal/closable.go
+++ b/pulsar/internal/closable.go
@@ -18,5 +18,5 @@
 package internal
 
 type Closable interface {
-	Close() error
+	Close()
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a199e23..968b2bf 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -162,5 +162,5 @@
 	// Close the producer and releases resources allocated
 	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
 	// of errors, pending writes will not be retried.
-	Close() error
+	Close()
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 1961172..7ee42e4 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -85,6 +85,7 @@
 		URL: serviceURL,
 	})
 	assert.NoError(t, err)
+	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic: newTopicName(),
@@ -92,6 +93,7 @@
 
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
+	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
 		err = producer.Send(context.Background(), &ProducerMessage{
@@ -100,12 +102,6 @@
 
 		assert.NoError(t, err)
 	}
-
-	err = producer.Close()
-	assert.NoError(t, err)
-
-	err = client.Close()
-	assert.NoError(t, err)
 }
 
 func TestProducerAsyncSend(t *testing.T) {
@@ -113,6 +109,7 @@
 		URL: serviceURL,
 	})
 	assert.NoError(t, err)
+	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic:                   newTopicName(),
@@ -121,6 +118,7 @@
 
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
+	defer producer.Close()
 
 	wg := sync.WaitGroup{}
 	wg.Add(10)
@@ -148,12 +146,6 @@
 	wg.Wait()
 
 	assert.Equal(t, 0, errors.Size())
-
-	err = producer.Close()
-	assert.NoError(t, err)
-
-	err = client.Close()
-	assert.NoError(t, err)
 }
 
 func TestProducerCompression(t *testing.T) {
@@ -176,6 +168,7 @@
 				URL: serviceURL,
 			})
 			assert.NoError(t, err)
+			defer client.Close()
 
 			producer, err := client.CreateProducer(ProducerOptions{
 				Topic:           newTopicName(),
@@ -184,6 +177,7 @@
 
 			assert.NoError(t, err)
 			assert.NotNil(t, producer)
+			defer producer.Close()
 
 			for i := 0; i < 10; i++ {
 				err = producer.Send(context.Background(), &ProducerMessage{
@@ -192,12 +186,6 @@
 
 				assert.NoError(t, err)
 			}
-
-			err = producer.Close()
-			assert.NoError(t, err)
-
-			err = client.Close()
-			assert.NoError(t, err)
 		})
 	}
 }
@@ -207,6 +195,7 @@
 		URL: serviceURL,
 	})
 	assert.NoError(t, err)
+	defer client.Close()
 
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic: newTopicName(),
@@ -214,6 +203,7 @@
 
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
+	defer producer.Close()
 
 	assert.Equal(t, int64(-1), producer.LastSequenceID())
 
@@ -225,12 +215,6 @@
 		assert.NoError(t, err)
 		assert.Equal(t, int64(i), producer.LastSequenceID())
 	}
-
-	err = producer.Close()
-	assert.NoError(t, err)
-
-	err = client.Close()
-	assert.NoError(t, err)
 }
 
 func TestEventTime(t *testing.T) {
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 3e088b4..8c01d7b 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,5 +79,5 @@
 	HasNext() (bool, error)
 
 	// Close the reader and stop the broker to push more messages
-	Close() error
+	Close()
 }