Fix unsubscribe blocked when consumer is closing or has closed (#457)
### Motivation
For the present consumer, `Close()` and `Unsubscribe()` handled by the same eventloop goroutine.
The eventloop exited after `Close()`, then unsubscribe event wouldn't be selected and handled anymore, lead to block.
example:
```go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Unsubscribe() // unintentional
defer consumer.Close()
}
```
`Unsubscribe()` blocked:
![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png)
### Modifications
Check consumer state before send unsubscribe event, if consumer is closing or has closed, just logging it
### Verifying this change
- [x] Make sure that the change passes the CI checks.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b9c9b12..b75a7d7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,21 @@
consumerClosed
)
+func (s consumerState) String() string {
+ switch s {
+ case consumerInit:
+ return "Initializing"
+ case consumerReady:
+ return "Ready"
+ case consumerClosing:
+ return "Closing"
+ case consumerClosed:
+ return "Closed"
+ default:
+ return "Unknown"
+ }
+}
+
type subscriptionMode int
const (
@@ -195,6 +210,11 @@
}
func (pc *partitionConsumer) Unsubscribe() error {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
+ return nil
+ }
+
req := &unsubscribeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -206,9 +226,8 @@
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
- state := pc.getConsumerState()
- if state == consumerClosed || state == consumerClosing {
- pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
return
}
@@ -354,7 +373,7 @@
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.Error("Consumer was already closed")
+ pc.log.WithField("state", state).Error("Consumer is closing or has closed")
return nil
}
@@ -398,7 +417,7 @@
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.Error("Consumer was already closed")
+ pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
return
}
@@ -798,7 +817,7 @@
}
if state == consumerClosed || state == consumerClosing {
- pc.log.Error("The consumer is closing or has been closed")
+ pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}