Fix unsubscribe blocked when consumer is closing or has closed (#457)

### Motivation

For the present consumer, `Close()` and `Unsubscribe()` handled by the same eventloop goroutine.
The eventloop exited after `Close()`, then unsubscribe event wouldn't be selected and handled anymore, lead to block.

example:
```go
func main() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "topic-1",
		SubscriptionName: "my-sub",
	})
	if err != nil {
		log.Fatal(err)
	}
	
	defer consumer.Unsubscribe() // unintentional
	defer consumer.Close()
}
```

`Unsubscribe()` blocked:

![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png)




### Modifications

Check consumer state before send unsubscribe event, if consumer is closing or has closed, just logging it

### Verifying this change

- [x] Make sure that the change passes the CI checks.
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b9c9b12..b75a7d7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,21 @@
 	consumerClosed
 )
 
+func (s consumerState) String() string {
+	switch s {
+	case consumerInit:
+		return "Initializing"
+	case consumerReady:
+		return "Ready"
+	case consumerClosing:
+		return "Closing"
+	case consumerClosed:
+		return "Closed"
+	default:
+		return "Unknown"
+	}
+}
+
 type subscriptionMode int
 
 const (
@@ -195,6 +210,11 @@
 }
 
 func (pc *partitionConsumer) Unsubscribe() error {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
+		return nil
+	}
+
 	req := &unsubscribeRequest{doneCh: make(chan struct{})}
 	pc.eventsCh <- req
 
@@ -206,9 +226,8 @@
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
 	defer close(unsub.doneCh)
 
-	state := pc.getConsumerState()
-	if state == consumerClosed || state == consumerClosing {
-		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
 		return
 	}
 
@@ -354,7 +373,7 @@
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.Error("Consumer was already closed")
+		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
 		return nil
 	}
 
@@ -398,7 +417,7 @@
 
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.Error("Consumer was already closed")
+		pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
 		return
 	}
 
@@ -798,7 +817,7 @@
 	}
 
 	if state == consumerClosed || state == consumerClosing {
-		pc.log.Error("The consumer is closing or has been closed")
+		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
 		if pc.nackTracker != nil {
 			pc.nackTracker.Close()
 		}