[Issue #123] Ensure message is sent if no room in current batch. (#124)
* [Issue #123] Ensure message is sent if no room in current batch.
* Add issue to test comment.
* Increase test wait timeout.
* Add timeout log for test.
* Add log when a single message fails to be added to batch.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 5ff734c..c2a2073 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -787,7 +787,8 @@
t.Fatal(err)
}
subs := stats["subscriptions"].(map[string]interface{})
- meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+ cons := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})
+ meta := cons["metadata"].(map[string]interface{})
assert.Equal(t, len(props), len(meta))
for k, v := range props {
mv := meta[k].(string)
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 4dac66e..93c1af2 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -29,9 +29,9 @@
const testNackDelay = 300 * time.Millisecond
type nackMockedConsumer struct {
- ch chan messageID
+ ch chan messageID
closed bool
- lock sync.Mutex
+ lock sync.Mutex
msgIds []messageID
}
@@ -42,7 +42,7 @@
go func() {
// since the client ticks at an interval of delay / 3
// wait another interval to ensure we get all messages
- time.Sleep(testNackDelay + 101 * time.Millisecond)
+ time.Sleep(testNackDelay + 101*time.Millisecond)
t.lock.Lock()
defer t.lock.Unlock()
t.closed = true
@@ -69,7 +69,7 @@
return msgIds
}
-func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+func (nmc *nackMockedConsumer) Wait() <-chan messageID {
return nmc.ch
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c5345cb..b3b8937 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -256,14 +256,27 @@
}
if sendAsBatch {
- ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
- if ok == false {
+ added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+ if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()
+
+ // after flushing try again to add the current payload
+ if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !ok {
+ p.log.WithField("size", len(msg.Payload)).
+ WithField("sequenceID", sequenceID).
+ WithField("properties", msg.Properties).
+ Error("unable to add message to batch")
+ }
}
} else {
// Send individually
- p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+ if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !added {
+ p.log.WithField("size", len(msg.Payload)).
+ WithField("sequenceID", sequenceID).
+ WithField("properties", msg.Properties).
+ Error("unable to send single message")
+ }
p.internalFlushCurrentBatch()
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bc3f8ea..4fcc011 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -558,3 +558,66 @@
assert.Equal(t, v, mv)
}
}
+
+// test for issues #76, #114 and #123
+func TestBatchMessageFlushing(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer producer.Close()
+
+ maxBytes := internal.MaxBatchSize
+ genbytes := func(n int) []byte {
+ c := []byte("a")[0]
+ bytes := make([]byte, n)
+ for i := 0; i < n; i++ {
+ bytes[i] = c
+ }
+ return bytes
+ }
+
+ msgs := [][]byte{
+ genbytes(maxBytes - 10),
+ genbytes(11),
+ }
+
+ ch := make(chan struct{}, 2)
+ ctx := context.Background()
+ for _, msg := range msgs {
+ msg := &ProducerMessage{
+ Payload: msg,
+ }
+ producer.SendAsync(ctx, msg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ ch <- struct{}{}
+ })
+ }
+
+ published := 0
+ keepGoing := true
+ for keepGoing {
+ select {
+ case <-ch:
+ published++
+ if published == 2 {
+ keepGoing = false
+ }
+ case <-time.After(defaultBatchingMaxPublishDelay * 10):
+ fmt.Println("TestBatchMessageFlushing timeout waiting to publish messages")
+ keepGoing = false
+ }
+ }
+
+ assert.Equal(t, 2, published, "expected to publish two messages")
+}