blob: 3e1601f156291cafec8a020700daf2565ba1cb49 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
type BuffersPool interface {
GetBuffer() Buffer
}
// BatcherBuilderProvider defines func which returns the BatchBuilder.
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
) (BatchBuilder, error)
// BatchBuilder is a interface of batch builders
type BatchBuilder interface {
// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
IsFull() bool
// Add will add single message to batch.
Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
) bool
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
// Flush all the messages buffered in multiple batches and wait until all
// messages have been successfully persisted.
FlushBatches() (
batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
)
// Return the batch container batch message in multiple batches.
IsMultiBatches() bool
reset()
Close() error
}
// batchContainer wraps the objects needed to a batch.
// batchContainer implement BatchBuilder as a single batch container.
type batchContainer struct {
buffer Buffer
// Current number of messages in the batch
numMessages uint
// Max number of message allowed in the batch
maxMessages uint
// The largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
maxBatchSize uint
producerName string
producerID uint64
cmdSend *pb.BaseCommand
msgMetadata *pb.MessageMetadata
callbacks []interface{}
compressionProvider compression.Provider
buffersPool BuffersPool
log log.Logger
}
// newBatchContainer init a batchContainer
func newBatchContainer(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
) batchContainer {
bc := batchContainer{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
producerName: producerName,
producerID: producerID,
cmdSend: baseCommand(
pb.BaseCommand_SEND,
&pb.CommandSend{
ProducerId: &producerID,
},
),
msgMetadata: &pb.MessageMetadata{
ProducerName: &producerName,
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
buffersPool: bufferPool,
log: logger,
}
if compressionType != pb.CompressionType_NONE {
bc.msgMetadata.Compression = &compressionType
}
return bc
}
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
) (BatchBuilder, error) {
bc := newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID, compressionType,
level, bufferPool, logger,
)
return &bc, nil
}
// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
func (bc *batchContainer) IsFull() bool {
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize)
}
func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize)
}
// Add will add single message to batch.
func (bc *batchContainer) Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set the replication clusters,
// then we need to force the current batch to flush and send the message individually
return false
} else if bc.msgMetadata.ReplicateTo != nil {
// There's already a message with cluster replication list. need to flush before next
// message can be sent
return false
} else if bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}
if bc.numMessages == 0 {
var sequenceID uint64
if metadata.SequenceId != nil {
sequenceID = *metadata.SequenceId
} else {
sequenceID = GetAndAdd(sequenceIDGenerator, 1)
}
bc.msgMetadata.SequenceId = proto.Uint64(sequenceID)
bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
if deliverAt.UnixNano() > 0 {
bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt)))
}
bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
}
addSingleMessageToBatch(bc.buffer, metadata, payload)
bc.numMessages++
bc.callbacks = append(bc.callbacks, callback)
return true
}
func (bc *batchContainer) reset() {
bc.numMessages = 0
bc.buffer.Clear()
bc.callbacks = []interface{}{}
bc.msgMetadata.ReplicateTo = nil
bc.msgMetadata.DeliverAtTime = nil
}
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (bc *batchContainer) Flush() (
batchData Buffer, sequenceID uint64, callbacks []interface{},
) {
if bc.numMessages == 0 {
// No-Op for empty batch
return nil, 0, nil
}
bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages))
bc.cmdSend.Send.NumMessages = proto.Int32(int32(bc.numMessages))
uncompressedSize := bc.buffer.ReadableBytes()
bc.msgMetadata.UncompressedSize = &uncompressedSize
buffer := bc.buffersPool.GetBuffer()
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
}
serializeBatch(
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
)
callbacks = bc.callbacks
sequenceID = bc.cmdSend.Send.GetSequenceId()
bc.reset()
return buffer, sequenceID, callbacks
}
// FlushBatches only for multiple batches container
func (bc *batchContainer) FlushBatches() (
batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
) {
panic("single batch container not support FlushBatches(), please use Flush() instead")
}
// batchContainer as a single batch container
func (bc *batchContainer) IsMultiBatches() bool {
return false
}
func (bc *batchContainer) Close() error {
return bc.compressionProvider.Close()
}
func getCompressionProvider(
compressionType pb.CompressionType,
level compression.Level,
) compression.Provider {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NewNoopProvider()
case pb.CompressionType_LZ4:
return compression.NewLz4Provider()
case pb.CompressionType_ZLIB:
return compression.NewZLibProvider()
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(level)
default:
panic("unsupported compression type")
}
}