add epoch to handle create producer timeout (#582)

* add epoch to producer

* fix CI

* address comments

* address comments

* update style

* better logging
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index abec4fc..ca6850d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -79,6 +79,8 @@
 	schemaInfo       *SchemaInfo
 	partitionIdx     int32
 	metrics          *internal.TopicMetrics
+
+	epoch uint64
 }
 
 func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
@@ -114,6 +116,7 @@
 		lastSequenceID:   -1,
 		partitionIdx:     int32(partitionIdx),
 		metrics:          metrics,
+		epoch:            0,
 	}
 	p.setProducerState(producerInit)
 
@@ -176,12 +179,16 @@
 		p.log.Debug("The partition consumer schema is nil")
 	}
 
+	userProvidedProducerName := p.producerName != ""
+
 	cmdProducer := &pb.CommandProducer{
-		RequestId:  proto.Uint64(id),
-		Topic:      proto.String(p.topic),
-		Encrypted:  nil,
-		ProducerId: proto.Uint64(p.producerID),
-		Schema:     pbSchema,
+		RequestId:                proto.Uint64(id),
+		Topic:                    proto.String(p.topic),
+		Encrypted:                nil,
+		ProducerId:               proto.Uint64(p.producerID),
+		Schema:                   pbSchema,
+		Epoch:                    proto.Uint64(atomic.LoadUint64(&p.epoch)),
+		UserProvidedProducerName: proto.Bool(userProvidedProducerName),
 	}
 
 	if p.producerName != "" {
@@ -230,7 +237,10 @@
 	}
 	p.cnx = res.Cnx
 	p.cnx.RegisterListener(p.producerID, p)
-	p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
+	p.log.WithFields(log.Fields{
+		"cnx":   res.Cnx.ID(),
+		"epoch": atomic.LoadUint64(&p.epoch),
+	}).Debug("Connected producer")
 
 	pendingItems := p.pendingQueue.ReadableSlice()
 	viewSize := len(pendingItems)
@@ -298,7 +308,7 @@
 		d := backoff.Next()
 		p.log.Info("Reconnecting to broker in ", d)
 		time.Sleep(d)
-
+		atomic.AddUint64(&p.epoch, 1)
 		err := p.grabCnx()
 		if err == nil {
 			// Successfully reconnected
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bbe8028..dc7a5ef 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1127,3 +1127,39 @@
 	assert.Nil(t, ID)
 	assert.Error(t, err)
 }
+
+func TestExactlyOnceWithProducerNameSpecified(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	topicName := newTopicName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+		Name:  "p-name-1",
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+		Name:  "p-name-2",
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer2)
+	defer producer2.Close()
+
+	producer3, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+		Name:  "p-name-2",
+	})
+
+	assert.NotNil(t, err)
+	assert.Nil(t, producer3)
+}