[fix] Fix event time not being set when batching is disabled (#1015)
Fixes #1013
### Motivation
The event time is not set when batching is disabled. The event time will be lost.
This is a regression bug in 0.10.0.
### Modifications
* Set the event time when sending single message
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e77c929..fc67f51 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -735,6 +735,10 @@
UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
}
+ if !msg.EventTime.IsZero() {
+ mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
+ }
+
if msg.Key != "" {
mm.PartitionKey = proto.String(msg.Key)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a6f5e39..b587975 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1905,3 +1905,36 @@
})
assert.NoError(t, err)
}
+
+func TestSendMessagesWithMetadata(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+
+ msg := &ProducerMessage{EventTime: time.Now().Local(),
+ Payload: []byte("msg")}
+
+ _, err = producer.Send(context.Background(), msg)
+ assert.Nil(t, err)
+
+ recvMsg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+
+ assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()), internal.TimestampMillis(msg.EventTime))
+}