Add producer check state before send msg. (#569)

Add producer state check before send msg.
diff --git a/pulsar/error.go b/pulsar/error.go
index 60a832b..f433bfc 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -99,6 +99,8 @@
 	AddToBatchFailed
 	// SeekFailed seek failed
 	SeekFailed
+	// ProducerClosed means producer already been closed
+	ProducerClosed
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -201,6 +203,8 @@
 		return "AddToBatchFailed"
 	case SeekFailed:
 		return "SeekFailed"
+	case ProducerClosed:
+		return "ProducerClosed"
 	default:
 		return fmt.Sprintf("Result(%d)", r)
 	}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8b3d33d..7e83bfa 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -50,6 +50,7 @@
 	errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
 	errContextExpired  = newError(TimeoutError, "message send context expired")
 	errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
+	errProducerClosed  = newError(ProducerClosed, "producer already been closed")
 
 	buffersPool sync.Pool
 )
@@ -658,6 +659,12 @@
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
+	if p.getProducerState() != producerReady {
+		// Producer is closing
+		callback(nil, msg, errProducerClosed)
+		return
+	}
+
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3dbd7..bbe8028 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1097,3 +1097,33 @@
 	assert.Equal(t, 10, metric.sendn)
 	assert.Equal(t, 10, metric.ackn)
 }
+
+func TestProducerSendAfterClose(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: []byte("hello"),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, ID)
+
+	producer.Close()
+	ID, err = producer.Send(context.Background(), &ProducerMessage{
+		Payload: []byte("hello"),
+	})
+	assert.Nil(t, ID)
+	assert.Error(t, err)
+}