Remove error return on Close() methods (#99)
diff --git a/pulsar/client.go b/pulsar/client.go
index e69a9b2..6916371 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -108,5 +108,5 @@
TopicPartitions(topic string) ([]string, error)
// Close the Client and free associated resources
- Close() error
+ Close()
}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 9fec475..23ec78c 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -165,5 +165,5 @@
NackID(MessageID)
// Close the consumer and stop the broker to push more messages
- Close() error
+ Close()
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 216ad0d..d2a3474 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -163,7 +163,7 @@
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range consumer.consumers {
if c != nil {
- _ = c.Close()
+ c.Close()
}
}
return nil, err
@@ -253,7 +253,7 @@
c.consumers[partition].NackID(mid)
}
-func (c *consumer) Close() error {
+func (c *consumer) Close() {
var wg sync.WaitGroup
for i := range c.consumers {
wg.Add(1)
@@ -263,8 +263,6 @@
}(c.consumers[i])
}
wg.Wait()
-
- return nil
}
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b1d7b15..d49626f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -183,9 +183,9 @@
})
}
-func (pc *partitionConsumer) Close() error {
+func (pc *partitionConsumer) Close() {
if pc.state != consumerReady {
- return nil
+ return
}
req := &closeRequest{doneCh: make(chan struct{})}
@@ -193,7 +193,6 @@
// wait for request to finish
<-req.doneCh
- return req.err
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -403,7 +402,6 @@
type closeRequest struct {
doneCh chan struct{}
- err error
}
type redeliveryRequest struct {
@@ -452,14 +450,15 @@
}
_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
if err != nil {
- req.err = err
+ pc.log.WithError(err).Warn("Failed to close consumer")
} else {
pc.log.Info("Closed consumer")
- pc.state = consumerClosed
- pc.conn.DeleteConsumeHandler(pc.consumerID)
- pc.nackTracker.Close()
- close(pc.closeCh)
}
+
+ pc.state = consumerClosed
+ pc.conn.DeleteConsumeHandler(pc.consumerID)
+ pc.nackTracker.Close()
+ close(pc.closeCh)
}
func (pc *partitionConsumer) reconnectToBroker() {
diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go
index 851b473..71273c5 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -28,8 +28,8 @@
func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
- eventsCh: eventsCh,
+ queueCh: make(chan []*message, 1),
+ eventsCh: eventsCh,
}
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -46,8 +46,8 @@
// ack the message id
pc.AckID(messages[0].msgID.(*messageID))
- select{
- case <- eventsCh:
+ select {
+ case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
@@ -56,8 +56,8 @@
func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
- eventsCh: eventsCh,
+ queueCh: make(chan []*message, 1),
+ eventsCh: eventsCh,
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -74,8 +74,8 @@
// ack the message id
pc.AckID(messages[0].msgID.(*messageID))
- select{
- case <- eventsCh:
+ select {
+ case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
@@ -84,8 +84,8 @@
func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh := make(chan interface{}, 1)
pc := partitionConsumer{
- queueCh: make(chan []*message, 1),
- eventsCh: eventsCh,
+ queueCh: make(chan []*message, 1),
+ eventsCh: eventsCh,
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
@@ -104,8 +104,8 @@
pc.AckID(messages[i].msgID.(*messageID))
}
- select{
- case <- eventsCh:
+ select {
+ case <-eventsCh:
t.Error("The message id should not be acked!")
default:
}
@@ -113,8 +113,8 @@
// ack last message
pc.AckID(messages[9].msgID.(*messageID))
- select{
- case <- eventsCh:
+ select {
+ case <-eventsCh:
default:
t.Error("Expected an ack request to be triggered!")
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cc40d22..f34966b 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -593,8 +593,7 @@
}
}
- err = consumer.Close()
- assert.Nil(t, err)
+ consumer.Close()
// Subscribe again
consumer, err = client.Subscribe(ConsumerOptions{
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 53c2074..b9869a5 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -143,12 +143,8 @@
return []string{topicName.Name}, nil
}
-func (client *client) Close() error {
+func (client *client) Close() {
for handler := range client.handlers {
- if err := handler.Close(); err != nil {
- return err
- }
+ handler.Close()
}
-
- return nil
}
diff --git a/pulsar/impl_client_test.go b/pulsar/impl_client_test.go
index 55e227f..7c28da5 100644
--- a/pulsar/impl_client_test.go
+++ b/pulsar/impl_client_test.go
@@ -47,8 +47,7 @@
assert.Error(t, err)
assert.Nil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSInsecureConnection(t *testing.T) {
@@ -65,8 +64,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSConnection(t *testing.T) {
@@ -83,8 +81,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSConnectionHostNameVerification(t *testing.T) {
@@ -102,8 +99,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSConnectionHostNameVerificationError(t *testing.T) {
@@ -121,8 +117,7 @@
assert.Error(t, err)
assert.Nil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSAuthError(t *testing.T) {
@@ -139,8 +134,7 @@
assert.Error(t, err)
assert.Nil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTLSAuth(t *testing.T) {
@@ -158,8 +152,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTokenAuth(t *testing.T) {
@@ -179,8 +172,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTokenAuthFromFile(t *testing.T) {
@@ -197,8 +189,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
- err = client.Close()
- assert.NoError(t, err)
+ client.Close()
}
func TestTopicPartitions(t *testing.T) {
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index dd420a4..5d09242 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -414,7 +414,7 @@
})
if err != nil {
- req.err = err
+ p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
p.state = producerClosed
@@ -440,20 +440,19 @@
return cp.err
}
-func (p *partitionProducer) Close() error {
+func (p *partitionProducer) Close() {
if p.state != producerReady {
// Producer is closing
- return nil
+ return
}
wg := sync.WaitGroup{}
wg.Add(1)
- cp := &closeProducer{&wg, nil}
+ cp := &closeProducer{&wg}
p.eventsChan <- cp
wg.Wait()
- return cp.err
}
type sendRequest struct {
@@ -465,7 +464,6 @@
type closeProducer struct {
waitGroup *sync.WaitGroup
- err error
}
type flushRequest struct {
diff --git a/pulsar/impl_producer.go b/pulsar/impl_producer.go
index b0620c2..c1a9902 100644
--- a/pulsar/impl_producer.go
+++ b/pulsar/impl_producer.go
@@ -19,8 +19,6 @@
import (
"context"
- "fmt"
- "github.com/pkg/errors"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
@@ -102,7 +100,7 @@
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
for _, producer := range p.producers {
if producer != nil {
- _ = producer.Close()
+ producer.Close()
}
}
return nil, err
@@ -154,13 +152,8 @@
return nil
}
-func (p *producer) Close() error {
- var errs error
+func (p *producer) Close() {
for _, pp := range p.producers {
- if err := pp.Close(); err != nil {
- errs = errors.Wrap(err, fmt.Sprintf("unable to close producer %s", p.Name()))
- }
-
+ pp.Close()
}
- return errs
}
diff --git a/pulsar/internal/closable.go b/pulsar/internal/closable.go
index 7906c89..ac26e1e 100644
--- a/pulsar/internal/closable.go
+++ b/pulsar/internal/closable.go
@@ -18,5 +18,5 @@
package internal
type Closable interface {
- Close() error
+ Close()
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a199e23..968b2bf 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -162,5 +162,5 @@
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
- Close() error
+ Close()
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 1961172..7ee42e4 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -85,6 +85,7 @@
URL: serviceURL,
})
assert.NoError(t, err)
+ defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
@@ -92,6 +93,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
+ defer producer.Close()
for i := 0; i < 10; i++ {
err = producer.Send(context.Background(), &ProducerMessage{
@@ -100,12 +102,6 @@
assert.NoError(t, err)
}
-
- err = producer.Close()
- assert.NoError(t, err)
-
- err = client.Close()
- assert.NoError(t, err)
}
func TestProducerAsyncSend(t *testing.T) {
@@ -113,6 +109,7 @@
URL: serviceURL,
})
assert.NoError(t, err)
+ defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
@@ -121,6 +118,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
+ defer producer.Close()
wg := sync.WaitGroup{}
wg.Add(10)
@@ -148,12 +146,6 @@
wg.Wait()
assert.Equal(t, 0, errors.Size())
-
- err = producer.Close()
- assert.NoError(t, err)
-
- err = client.Close()
- assert.NoError(t, err)
}
func TestProducerCompression(t *testing.T) {
@@ -176,6 +168,7 @@
URL: serviceURL,
})
assert.NoError(t, err)
+ defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
@@ -184,6 +177,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
+ defer producer.Close()
for i := 0; i < 10; i++ {
err = producer.Send(context.Background(), &ProducerMessage{
@@ -192,12 +186,6 @@
assert.NoError(t, err)
}
-
- err = producer.Close()
- assert.NoError(t, err)
-
- err = client.Close()
- assert.NoError(t, err)
})
}
}
@@ -207,6 +195,7 @@
URL: serviceURL,
})
assert.NoError(t, err)
+ defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
@@ -214,6 +203,7 @@
assert.NoError(t, err)
assert.NotNil(t, producer)
+ defer producer.Close()
assert.Equal(t, int64(-1), producer.LastSequenceID())
@@ -225,12 +215,6 @@
assert.NoError(t, err)
assert.Equal(t, int64(i), producer.LastSequenceID())
}
-
- err = producer.Close()
- assert.NoError(t, err)
-
- err = client.Close()
- assert.NoError(t, err)
}
func TestEventTime(t *testing.T) {
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 3e088b4..8c01d7b 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,5 +79,5 @@
HasNext() (bool, error)
// Close the reader and stop the broker to push more messages
- Close() error
+ Close()
}