Add producer check state before send msg. (#569)
Add producer state check before send msg.
diff --git a/pulsar/error.go b/pulsar/error.go
index 60a832b..f433bfc 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -99,6 +99,8 @@
AddToBatchFailed
// SeekFailed seek failed
SeekFailed
+ // ProducerClosed means producer already been closed
+ ProducerClosed
)
// Error implement error interface, composed of two parts: msg and result.
@@ -201,6 +203,8 @@
return "AddToBatchFailed"
case SeekFailed:
return "SeekFailed"
+ case ProducerClosed:
+ return "ProducerClosed"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8b3d33d..7e83bfa 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -50,6 +50,7 @@
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
+ errProducerClosed = newError(ProducerClosed, "producer already been closed")
buffersPool sync.Pool
)
@@ -658,6 +659,12 @@
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
+ if p.getProducerState() != producerReady {
+ // Producer is closing
+ callback(nil, msg, errProducerClosed)
+ return
+ }
+
sr := &sendRequest{
ctx: ctx,
msg: msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3dbd7..bbe8028 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1097,3 +1097,33 @@
assert.Equal(t, 10, metric.sendn)
assert.Equal(t, 10, metric.ackn)
}
+
+func TestProducerSendAfterClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+
+ producer.Close()
+ ID, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.Nil(t, ID)
+ assert.Error(t, err)
+}