[Issue #123] Ensure message is sent if no room in current batch. (#124)

* [Issue #123] Ensure message is sent if no room in current batch.

* Add issue to test comment.

* Increase test wait timeout.

* Add timeout log for test.

* Add log when a single message fails to be added to batch.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 5ff734c..c2a2073 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -787,7 +787,8 @@
 		t.Fatal(err)
 	}
 	subs := stats["subscriptions"].(map[string]interface{})
-	meta := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+	cons := subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})
+	meta := cons["metadata"].(map[string]interface{})
 	assert.Equal(t, len(props), len(meta))
 	for k, v := range props {
 		mv := meta[k].(string)
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 4dac66e..93c1af2 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -29,9 +29,9 @@
 const testNackDelay = 300 * time.Millisecond
 
 type nackMockedConsumer struct {
-	ch chan messageID
+	ch     chan messageID
 	closed bool
-	lock sync.Mutex
+	lock   sync.Mutex
 	msgIds []messageID
 }
 
@@ -42,7 +42,7 @@
 	go func() {
 		// since the client ticks at an interval of delay / 3
 		// wait another interval to ensure we get all messages
-		time.Sleep(testNackDelay + 101 * time.Millisecond)
+		time.Sleep(testNackDelay + 101*time.Millisecond)
 		t.lock.Lock()
 		defer t.lock.Unlock()
 		t.closed = true
@@ -69,7 +69,7 @@
 	return msgIds
 }
 
-func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 	return nmc.ch
 }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c5345cb..b3b8937 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -256,14 +256,27 @@
 	}
 
 	if sendAsBatch {
-		ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
-		if ok == false {
+		added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+		if !added {
 			// The current batch is full.. flush it and retry
 			p.internalFlushCurrentBatch()
+
+			// after flushing try again to add the current payload
+			if ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !ok {
+				p.log.WithField("size", len(msg.Payload)).
+					WithField("sequenceID", sequenceID).
+					WithField("properties", msg.Properties).
+					Error("unable to add message to batch")
+			}
 		}
 	} else {
 		// Send individually
-		p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters)
+		if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters); !added {
+			p.log.WithField("size", len(msg.Payload)).
+				WithField("sequenceID", sequenceID).
+				WithField("properties", msg.Properties).
+				Error("unable to send single message")
+		}
 		p.internalFlushCurrentBatch()
 	}
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bc3f8ea..4fcc011 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -558,3 +558,66 @@
 		assert.Equal(t, v, mv)
 	}
 }
+
+// test for issues #76, #114 and #123
+func TestBatchMessageFlushing(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	topic := newTopicName()
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	maxBytes := internal.MaxBatchSize
+	genbytes := func(n int) []byte {
+		c := []byte("a")[0]
+		bytes := make([]byte, n)
+		for i := 0; i < n; i++ {
+			bytes[i] = c
+		}
+		return bytes
+	}
+
+	msgs := [][]byte{
+		genbytes(maxBytes - 10),
+		genbytes(11),
+	}
+
+	ch := make(chan struct{}, 2)
+	ctx := context.Background()
+	for _, msg := range msgs {
+		msg := &ProducerMessage{
+			Payload: msg,
+		}
+		producer.SendAsync(ctx, msg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+			ch <- struct{}{}
+		})
+	}
+
+	published := 0
+	keepGoing := true
+	for keepGoing {
+		select {
+		case <-ch:
+			published++
+			if published == 2 {
+				keepGoing = false
+			}
+		case <-time.After(defaultBatchingMaxPublishDelay * 10):
+			fmt.Println("TestBatchMessageFlushing timeout waiting to publish messages")
+			keepGoing = false
+		}
+	}
+
+	assert.Equal(t, 2, published, "expected to publish two messages")
+}