add epoch to handle create producer timeout (#582)
* add epoch to producer
* fix CI
* address comments
* address comments
* update style
* better logging
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index abec4fc..ca6850d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -79,6 +79,8 @@
schemaInfo *SchemaInfo
partitionIdx int32
metrics *internal.TopicMetrics
+
+ epoch uint64
}
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
@@ -114,6 +116,7 @@
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
+ epoch: 0,
}
p.setProducerState(producerInit)
@@ -176,12 +179,16 @@
p.log.Debug("The partition consumer schema is nil")
}
+ userProvidedProducerName := p.producerName != ""
+
cmdProducer := &pb.CommandProducer{
- RequestId: proto.Uint64(id),
- Topic: proto.String(p.topic),
- Encrypted: nil,
- ProducerId: proto.Uint64(p.producerID),
- Schema: pbSchema,
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(p.topic),
+ Encrypted: nil,
+ ProducerId: proto.Uint64(p.producerID),
+ Schema: pbSchema,
+ Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
+ UserProvidedProducerName: proto.Bool(userProvidedProducerName),
}
if p.producerName != "" {
@@ -230,7 +237,10 @@
}
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerID, p)
- p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
+ p.log.WithFields(log.Fields{
+ "cnx": res.Cnx.ID(),
+ "epoch": atomic.LoadUint64(&p.epoch),
+ }).Debug("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)
@@ -298,7 +308,7 @@
d := backoff.Next()
p.log.Info("Reconnecting to broker in ", d)
time.Sleep(d)
-
+ atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
if err == nil {
// Successfully reconnected
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bbe8028..dc7a5ef 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1127,3 +1127,39 @@
assert.Nil(t, ID)
assert.Error(t, err)
}
+
+func TestExactlyOnceWithProducerNameSpecified(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-1",
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ producer2, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-2",
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer2)
+ defer producer2.Close()
+
+ producer3, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ Name: "p-name-2",
+ })
+
+ assert.NotNil(t, err)
+ assert.Nil(t, producer3)
+}