blob: fbcc5b977678490e692a157811ce835b0eebc918 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
"google.golang.org/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
uAtomic "go.uber.org/atomic"
)
type producerState int32
const (
// producer states
producerInit = iota
producerReady
producerClosing
producerClosed
)
var (
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = errors.New("transaction error")
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
ErrProducerFenced = newError(ProducerFenced, "producer fenced")
buffersPool sync.Pool
sendRequestPool *sync.Pool
)
const (
errMsgTopicNotFound = "TopicNotFound"
errMsgTopicTerminated = "TopicTerminatedError"
errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException"
errMsgProducerFenced = "ProducerFenced"
)
func init() {
sendRequestPool = &sync.Pool{
New: func() interface{} {
return &sendRequest{}
},
}
}
type partitionProducer struct {
state uAtomic.Int32
client *client
topic string
log log.Logger
conn uAtomic.Value
options *ProducerOptions
producerName string
userProvidedProducerName bool
producerID uint64
batchBuilder internal.BatchBuilder
sequenceIDGenerator *uint64
batchFlushTicker *time.Ticker
encryptor internalcrypto.Encryptor
compressionProvider compression.Provider
// Channel where app is posting messages to be published
dataChan chan *sendRequest
cmdChan chan interface{}
connectClosedCh chan connectionClosed
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
lastSequenceID int64
schemaInfo *SchemaInfo
partitionIdx int32
metrics *internal.LeveledMetrics
epoch uint64
schemaCache *schemaCache
topicEpoch *uint64
}
type schemaCache struct {
schemas sync.Map
}
func newSchemaCache() *schemaCache {
return &schemaCache{}
}
func (s *schemaCache) Put(schema *SchemaInfo, schemaVersion []byte) {
key := schema.hash()
s.schemas.Store(key, schemaVersion)
}
func (s *schemaCache) Get(schema *SchemaInfo) (schemaVersion []byte) {
val, ok := s.schemas.Load(schema.hash())
if !ok {
return nil
}
return val.([]byte)
}
func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int,
metrics *internal.LeveledMetrics) (
*partitionProducer, error) {
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
batchingMaxPublishDelay = options.BatchingMaxPublishDelay
} else {
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
var maxPendingMessages int
if options.MaxPendingMessages == 0 {
maxPendingMessages = 1000
} else {
maxPendingMessages = options.MaxPendingMessages
}
logger := client.log.SubLogger(log.Fields{"topic": topic})
p := &partitionProducer{
client: client,
topic: topic,
log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
cmdChan: make(chan interface{}, 10),
connectClosedCh: make(chan connectionClosed, 10),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: int32(partitionIdx),
metrics: metrics,
epoch: 0,
schemaCache: newSchemaCache(),
}
if p.options.DisableBatching {
p.batchFlushTicker.Stop()
}
p.setProducerState(producerInit)
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
p.schemaInfo = options.Schema.GetSchemaInfo()
} else {
p.schemaInfo = nil
}
if options.Name != "" {
p.producerName = options.Name
p.userProvidedProducerName = true
} else {
p.userProvidedProducerName = false
}
err := p.grabCnx()
if err != nil {
p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
return nil, err
}
p.log = p.log.SubLogger(log.Fields{
"producer_name": p.producerName,
"producerID": p.producerID,
})
p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
p.setProducerState(producerReady)
if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
go p.runEventsLoop()
return p, nil
}
func (p *partitionProducer) grabCnx() error {
lr, err := p.client.lookupService.Lookup(p.topic)
if err != nil {
p.log.WithError(err).Warn("Failed to lookup topic")
return err
}
p.log.Debug("Lookup result: ", lr)
id := p.client.rpcClient.NewRequestID()
// set schema info for producer
var pbSchema *pb.Schema
if p.schemaInfo != nil {
tmpSchemaType := pb.Schema_Type(int32(p.schemaInfo.Type))
pbSchema = &pb.Schema{
Name: proto.String(p.schemaInfo.Name),
Type: &tmpSchemaType,
SchemaData: []byte(p.schemaInfo.Schema),
Properties: internal.ConvertFromStringMap(p.schemaInfo.Properties),
}
p.log.Debugf("The partition producer schema name is: %s", pbSchema.Name)
} else {
p.log.Debug("The partition producer schema is nil")
}
cmdProducer := &pb.CommandProducer{
RequestId: proto.Uint64(id),
Topic: proto.String(p.topic),
Encrypted: nil,
ProducerId: proto.Uint64(p.producerID),
Schema: pbSchema,
Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
ProducerAccessMode: toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
}
if p.topicEpoch != nil {
cmdProducer.TopicEpoch = proto.Uint64(*p.topicEpoch)
}
if p.producerName != "" {
cmdProducer.ProducerName = proto.String(p.producerName)
}
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if errors.Is(err, internal.ErrRequestTimeOut) {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
}
return err
}
p.producerName = res.Response.ProducerSuccess.GetProducerName()
nextTopicEpoch := res.Response.ProducerSuccess.GetTopicEpoch()
p.topicEpoch = &nextTopicEpoch
if p.options.Encryption != nil {
p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
p.options.Encryption.KeyReader,
p.options.Encryption.MessageCrypto,
p.options.Encryption.ProducerCryptoFailureAction, p.log)
} else {
p.encryptor = internalcrypto.NewNoopEncryptor()
}
if p.sequenceIDGenerator == nil {
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion()
if len(schemaVersion) != 0 {
p.schemaCache.Put(p.schemaInfo, schemaVersion)
}
p._setConn(res.Cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
return err
}
if !p.options.DisableBatching && p.batchBuilder == nil {
provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType)
if err != nil {
return err
}
maxMessageSize := uint32(p._getConn().GetMaxMessageSize())
p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
p.log,
p.encryptor)
if err != nil {
return err
}
}
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
}).Info("Connected producer")
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)
if viewSize > 0 {
p.log.Infof("Resending %d pending batches", viewSize)
lastViewItem := pendingItems[viewSize-1].(*pendingItem)
// iterate at most pending items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
continue
}
pi := item.(*pendingItem)
// when resending pending batches, we update the sendAt timestamp and put to the back of queue
// to avoid pending item been removed by failTimeoutMessages and cause race condition
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p._getConn().WriteData(pi.buffer)
if pi == lastViewItem {
break
}
}
}
return nil
}
type connectionClosed struct{}
func (p *partitionProducer) GetBuffer() internal.Buffer {
b, ok := buffersPool.Get().(internal.Buffer)
if ok {
b.Clear()
}
return b
}
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
p.connectClosedCh <- connectionClosed{}
}
func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVersion []byte, err error) {
tmpSchemaType := pb.Schema_Type(int32(schemaInfo.Type))
pbSchema := &pb.Schema{
Name: proto.String(schemaInfo.Name),
Type: &tmpSchemaType,
SchemaData: []byte(schemaInfo.Schema),
Properties: internal.ConvertFromStringMap(schemaInfo.Properties),
}
id := p.client.rpcClient.NewRequestID()
req := &pb.CommandGetOrCreateSchema{
RequestId: proto.Uint64(id),
Topic: proto.String(p.topic),
Schema: pbSchema,
}
res, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_GET_OR_CREATE_SCHEMA, req)
if err != nil {
return
}
if res.Response.Error != nil {
err = errors.New(res.Response.GetError().String())
return
}
if res.Response.GetOrCreateSchemaResponse.ErrorCode != nil {
err = errors.New(*res.Response.GetOrCreateSchemaResponse.ErrorMessage)
return
}
return res.Response.GetOrCreateSchemaResponse.SchemaVersion, nil
}
func (p *partitionProducer) reconnectToBroker() {
var maxRetry int
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
for maxRetry != 0 {
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
}
p.log.Info("Reconnecting to broker in ", delayReconnectTime)
time.Sleep(delayReconnectTime)
// double check
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
return
}
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
return
}
p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicNotfound, err))
break
}
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicTerminated, err))
break
}
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}
if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(joinErrors(ErrProducerFenced, err))
break
}
if maxRetry > 0 {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
}
func (p *partitionProducer) runEventsLoop() {
for {
select {
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}
func (p *partitionProducer) Topic() string {
return p.topic
}
func (p *partitionProducer) Name() string {
return p.producerName
}
func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg *ProducerMessage, err error) {
if cb == nil {
return
}
cb(id, msg, err)
}
func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.Debug("Received send request: ", *sr.msg)
if sr.sendAsBatch {
smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
ok := addRequestToBatch(
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
if !ok {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties", sr.msg.Properties).
Error("unable to add message to batch")
sr.done(nil, ErrFailAddToBatch)
return
}
}
if sr.flushImmediately {
p.internalFlushCurrentBatch()
}
return
}
if sr.totalChunks <= 1 {
p.internalSingleSend(sr.mm, sr.compressedPayload, sr, uint32(sr.maxMessageSize))
return
}
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10))
sr.mm.Uuid = proto.String(uuid)
sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks))
sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize))
cr := newChunkRecorder()
for chunkID := 0; chunkID < sr.totalChunks; chunkID++ {
lhs = chunkID * sr.payloadChunkSize
if rhs = lhs + sr.payloadChunkSize; rhs > sr.compressedSize {
rhs = sr.compressedSize
}
// update chunk id
sr.mm.ChunkId = proto.Int32(int32(chunkID))
nsr := sendRequestPool.Get().(*sendRequest)
*nsr = sendRequest{
pool: sendRequestPool,
ctx: sr.ctx,
msg: sr.msg,
producer: sr.producer,
callback: sr.callback,
callbackOnce: sr.callbackOnce,
publishTime: sr.publishTime,
flushImmediately: sr.flushImmediately,
totalChunks: sr.totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: sr.transaction,
memLimit: sr.memLimit,
semaphore: sr.semaphore,
reservedMem: int64(rhs - lhs),
sendAsBatch: sr.sendAsBatch,
schema: sr.schema,
schemaVersion: sr.schemaVersion,
uncompressedPayload: sr.uncompressedPayload,
uncompressedSize: sr.uncompressedSize,
compressedPayload: sr.compressedPayload,
compressedSize: sr.compressedSize,
payloadChunkSize: sr.payloadChunkSize,
mm: sr.mm,
deliverAt: sr.deliverAt,
maxMessageSize: sr.maxMessageSize,
}
p.internalSingleSend(nsr.mm, nsr.compressedPayload[lhs:rhs], nsr, uint32(nsr.maxMessageSize))
}
}
func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
uncompressedPayload []byte,
request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool) bool {
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}
return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits,
leastSigBits)
}
func (p *partitionProducer) genMetadata(msg *ProducerMessage,
uncompressedSize int,
deliverAt time.Time) (mm *pb.MessageMetadata) {
mm = &pb.MessageMetadata{
ProducerName: &p.producerName,
PublishTime: proto.Uint64(internal.TimestampMillis(time.Now())),
ReplicateTo: msg.ReplicationClusters,
UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
}
if !msg.EventTime.IsZero() {
mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
}
if msg.Key != "" {
mm.PartitionKey = proto.String(msg.Key)
}
if len(msg.OrderingKey) != 0 {
mm.OrderingKey = []byte(msg.OrderingKey)
}
if msg.Properties != nil {
mm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
if deliverAt.UnixNano() > 0 {
mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
}
return
}
func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *ProducerMessage) {
if msg.SequenceID != nil {
mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
} else {
mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
}
}
func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) {
if msg.SequenceID != nil {
smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
} else {
smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
}
}
func (p *partitionProducer) genSingleMessageMetadataInBatch(
msg *ProducerMessage,
uncompressedSize int,
) (smm *pb.SingleMessageMetadata) {
smm = &pb.SingleMessageMetadata{
PayloadSize: proto.Int32(int32(uncompressedSize)),
}
if !msg.EventTime.IsZero() {
smm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
}
if msg.Key != "" {
smm.PartitionKey = proto.String(msg.Key)
}
if len(msg.OrderingKey) != 0 {
smm.OrderingKey = []byte(msg.OrderingKey)
}
if msg.Properties != nil {
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
p.updateSingleMessageMetadataSeqID(smm, msg)
return
}
func (p *partitionProducer) internalSingleSend(
mm *pb.MessageMetadata,
compressedPayload []byte,
sr *sendRequest,
maxMessageSize uint32,
) {
msg := sr.msg
payloadBuf := internal.NewBuffer(len(compressedPayload))
payloadBuf.Write(compressedPayload)
buffer := p.GetBuffer()
if buffer == nil {
buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
}
sid := *mm.SequenceId
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64
if sr.transaction != nil {
txnID := sr.transaction.GetTxnID()
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}
err := internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
useTxn,
mostSigBits,
leastSigBits,
)
if err != nil {
sr.done(nil, err)
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
}
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: buffer,
sequenceID: sid,
sendRequests: []interface{}{sr},
})
p._getConn().WriteData(buffer)
}
type pendingItem struct {
sync.Mutex
buffer internal.Buffer
sequenceID uint64
sentAt time.Time
sendRequests []interface{}
isDone bool
flushCallback func(err error)
}
func (p *partitionProducer) internalFlushCurrentBatch() {
if p.batchBuilder == nil {
// batch is not enabled
// the batch flush ticker should be stopped but it might still called once
// depends on when stop() is called concurrently
// so we add check to prevent the flow continues on a nil batchBuilder
return
}
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
return
}
batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData == nil {
return
}
// error occurred in batch flush
// report it using callback
if err != nil {
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
sr.done(nil, err)
}
}
if errors.Is(err, internal.ErrExceedMaxMessageSize) {
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", err)
}
return
}
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: batchData,
sequenceID: sequenceID,
sendRequests: callbacks,
})
p._getConn().WriteData(batchData)
}
func (p *partitionProducer) failTimeoutMessages() {
diff := func(sentAt time.Time) time.Duration {
return p.options.SendTimeout - time.Since(sentAt)
}
t := time.NewTimer(p.options.SendTimeout)
defer t.Stop()
for range t.C {
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
}
item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
t.Reset(p.options.SendTimeout)
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
}
// since pending queue is not thread safe because of there is no global iteration lock
// to control poll from pending queue, current goroutine and connection receipt handler
// iterate pending queue at the same time, this maybe a performance trade-off
// see https://github.com/apache/pulsar-client-go/pull/301
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
// double check
t.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
tickerNeedWaiting := time.Duration(0)
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
if m == nil {
return false
}
pi := m.(*pendingItem)
pi.Lock()
defer pi.Unlock()
if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
tickerNeedWaiting = nextWaiting
return false
}
return true
})
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
if tickerNeedWaiting > 0 {
t.Reset(tickerNeedWaiting)
break
}
pi := item.(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, ErrSendTimeout)
}
// flag the sending has completed with error, flush make no effect
pi.done(ErrSendTimeout)
pi.Unlock()
// finally reached the last view item, current iteration ends
if pi == lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
}
}
func (p *partitionProducer) internalFlushCurrentBatches() {
batchesData, sequenceIDs, callbacks, errs := p.batchBuilder.FlushBatches()
if batchesData == nil {
return
}
for i := range batchesData {
// error occurred in processing batch
// report it using callback
if errs[i] != nil {
for _, cb := range callbacks[i] {
if sr, ok := cb.(*sendRequest); ok {
sr.done(nil, errs[i])
}
}
if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}
continue
}
if batchesData[i] == nil {
continue
}
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
buffer: batchesData[i],
sequenceID: sequenceIDs[i],
sendRequests: callbacks[i],
})
p._getConn().WriteData(batchesData[i])
}
}
func (p *partitionProducer) internalFlush(fr *flushRequest) {
// clear all the messages which have sent to dataChan before flush
if len(p.dataChan) != 0 {
oldDataChan := p.dataChan
p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages)
for len(oldDataChan) != 0 {
pendingData := <-oldDataChan
p.internalSend(pendingData)
}
}
if !p.options.DisableBatching {
p.internalFlushCurrentBatch()
}
pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
close(fr.doneCh)
return
}
// lock the pending request while adding requests
// since the ReceivedSendReceipt func iterates over this list
pi.Lock()
defer pi.Unlock()
if pi.isDone {
// The last item in the queue has been completed while we were
// looking at it. It's safe at this point to assume that every
// message enqueued before Flush() was called are now persisted
close(fr.doneCh)
return
}
pi.flushCallback = func(err error) {
fr.err = err
close(fr.doneCh)
}
}
func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
var err error
var msgID MessageID
// use atomic bool to avoid race
isDone := uAtomic.NewBool(false)
doneCh := make(chan struct{})
p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
if isDone.CAS(false, true) {
err = e
msgID = ID
close(doneCh)
}
}, true)
// wait for send request to finish
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-doneCh:
// send request has been finished
return msgID, err
}
}
func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.internalSendAsync(ctx, msg, callback, false)
}
func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
}
if msg.Value != nil && msg.Payload != nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}
if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
}
}
return nil
}
func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if sr.msg.Transaction == nil {
return nil
}
txn := (sr.msg.Transaction).(*transaction)
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send message" +
" by a non-open transaction.")
return joinErrors(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}
if err := txn.registerProducerTopic(p.topic); err != nil {
return joinErrors(ErrTransaction, err)
}
if err := txn.registerSendOrAckOp(); err != nil {
return joinErrors(ErrTransaction, err)
}
sr.transaction = txn
return nil
}
func (p *partitionProducer) updateSchema(sr *sendRequest) error {
var schema Schema
var schemaVersion []byte
var err error
if sr.msg.Schema != nil {
schema = sr.msg.Schema
} else if p.options.Schema != nil {
schema = p.options.Schema
}
if schema == nil {
return nil
}
schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
sr.schema = schema
sr.schemaVersion = schemaVersion
return nil
}
func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
// read payload from message
sr.uncompressedPayload = sr.msg.Payload
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}
// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, err)
}
sr.uncompressedPayload = schemaPayload
}
sr.uncompressedSize = int64(len(sr.uncompressedPayload))
return nil
}
func (p *partitionProducer) updateMetaData(sr *sendRequest) {
deliverAt := sr.msg.DeliverAt
if sr.msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(sr.msg.DeliverAfter)
}
// set default ReplicationClusters when DisableReplication
if sr.msg.DisableReplication {
sr.msg.ReplicationClusters = []string{"__local__"}
}
sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
sr.sendAsBatch = !p.options.DisableBatching &&
sr.msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0
if !sr.sendAsBatch {
// update sequence id for metadata, make the size of msgMetadata more accurate
// batch sending will update sequence ID in the BatchBuilder
p.updateMetadataSeqID(sr.mm, sr.msg)
}
sr.deliverAt = deliverAt
}
func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
checkSize := sr.uncompressedSize
if !sr.sendAsBatch {
sr.compressedPayload = p.compressionProvider.Compress(nil, sr.uncompressedPayload)
sr.compressedSize = len(sr.compressedPayload)
// set the compress type in msgMetaData
compressionType := pb.CompressionType(p.options.CompressionType)
if compressionType != pb.CompressionType_NONE {
sr.mm.Compression = &compressionType
}
checkSize = int64(sr.compressedSize)
}
sr.maxMessageSize = p._getConn().GetMaxMessageSize()
// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
p.log.WithError(ErrMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)
return ErrMessageTooLarge
}
if sr.sendAsBatch || !p.options.EnableChunking {
sr.totalChunks = 1
sr.payloadChunkSize = int(sr.maxMessageSize)
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
p.log.WithError(ErrMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
return ErrMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
sr.payloadChunkSize = int(math.Min(float64(sr.payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
sr.totalChunks = int(math.Max(1, math.Ceil(float64(sr.compressedSize)/float64(sr.payloadChunkSize))))
}
return nil
}
func (p *partitionProducer) internalSendAsync(
ctx context.Context,
msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error),
flushImmediately bool,
) {
if err := p.validateMsg(msg); err != nil {
p.log.Error(err)
runCallback(callback, nil, msg, err)
return
}
sr := sendRequestPool.Get().(*sendRequest)
*sr = sendRequest{
pool: sendRequestPool,
ctx: ctx,
msg: msg,
producer: p,
callback: callback,
callbackOnce: &sync.Once{},
flushImmediately: flushImmediately,
publishTime: time.Now(),
chunkID: -1,
}
if err := p.prepareTransaction(sr); err != nil {
sr.done(nil, err)
return
}
if p.getProducerState() != producerReady {
sr.done(nil, ErrProducerClosed)
return
}
p.options.Interceptors.BeforeSend(p, msg)
if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
p.updateMetaData(sr)
if err := p.updateChunkInfo(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
if err := p.reserveResources(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
}
p.dataChan <- sr
}
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
return
}
if pi.sequenceID < response.GetSequenceId() {
// Force connection closing so that messages can be re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local < remote, closing connection",
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
} else if pi.sequenceID > response.GetSequenceId() {
// Ignoring the ack since it's referring to a message that has already timed out.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, local > remote, ignore it",
response.GetMessageId(), response.GetSequenceId(), pi.sequenceID)
return
} else {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
now := time.Now().UnixNano()
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
batchSize,
)
if sr.totalChunks > 1 {
if sr.chunkID == 0 {
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
} else if sr.chunkID == sr.totalChunks-1 {
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
// use chunkMsgID to set msgID
msgID = &sr.chunkRecorder.chunkedMsgID
}
}
sr.done(msgID, nil)
}
// Mark this pending item as done
pi.done(nil)
}
}
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)
p.doClose(ErrProducerClosed)
}
func (p *partitionProducer) doClose(reason error) {
if !p.casProducerState(producerReady, producerClosing) {
return
}
p.log.Info("Closing producer")
defer close(p.dataChan)
defer close(p.cmdChan)
id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
if err != nil {
p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages(reason)
if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
p.log.WithError(err).Warn("Failed to close batch builder")
}
}
p.setProducerState(producerClosed)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}
func (p *partitionProducer) failPendingMessages(err error) {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
return
}
p.log.Infof("Failing %d messages on closing producer", viewSize)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
return m != nil
})
if item == nil {
return
}
pi := item.(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, err)
}
// flag the sending has completed with error, flush make no effect
pi.done(err)
pi.Unlock()
// finally reached the last view item, current iteration ends
if pi == lastViewItem {
p.log.Infof("%d messages complete failed", viewSize)
return
}
}
}
func (p *partitionProducer) LastSequenceID() int64 {
return atomic.LoadInt64(&p.lastSequenceID)
}
func (p *partitionProducer) Flush() error {
return p.FlushWithCtx(context.Background())
}
func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.cmdChan <- flushReq:
}
// wait for the flush request to complete
select {
case <-ctx.Done():
return ctx.Err()
case <-flushReq.doneCh:
return flushReq.err
}
}
func (p *partitionProducer) getProducerState() producerState {
return producerState(p.state.Load())
}
func (p *partitionProducer) setProducerState(state producerState) {
p.state.Swap(int32(state))
}
// set a new producerState and return the last state
// returns bool if the new state has been set or not
func (p *partitionProducer) casProducerState(oldState, newState producerState) bool {
return p.state.CAS(int32(oldState), int32(newState))
}
func (p *partitionProducer) Close() {
if p.getProducerState() != producerReady {
// Producer is closing
return
}
cp := &closeProducer{doneCh: make(chan struct{})}
p.cmdChan <- cp
// wait for close producer request to complete
<-cp.doneCh
}
type sendRequest struct {
pool *sync.Pool
ctx context.Context
msg *ProducerMessage
producer *partitionProducer
callback func(MessageID, *ProducerMessage, error)
callbackOnce *sync.Once
publishTime time.Time
flushImmediately bool
totalChunks int
chunkID int
uuid string
chunkRecorder *chunkRecorder
/// resource management
memLimit internal.MemoryLimitController
reservedMem int64
semaphore internal.Semaphore
reservedSemaphore int
/// convey settable state
sendAsBatch bool
transaction *transaction
schema Schema
schemaVersion []byte
uncompressedPayload []byte
uncompressedSize int64
compressedPayload []byte
compressedSize int
payloadChunkSize int
mm *pb.MessageMetadata
deliverAt time.Time
maxMessageSize int32
}
func (sr *sendRequest) done(msgID MessageID, err error) {
if err == nil {
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano()) / 1.0e9)
sr.producer.metrics.MessagesPublished.Inc()
sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
if sr.producer.options.Interceptors != nil {
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
}
}
}
if err != nil {
sr.producer.log.WithError(err).
WithField("size", sr.reservedMem).
WithField("properties", sr.msg.Properties)
}
if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}
if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}
if sr.semaphore != nil {
sr.semaphore.Release()
sr.producer.metrics.MessagesPending.Dec()
}
if sr.memLimit != nil {
sr.memLimit.ReleaseMemory(sr.reservedMem)
sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
}
// sr.chunkID == -1 means a chunked message is not yet prepared, so that we should fail it immediately
if sr.totalChunks <= 1 || sr.chunkID == -1 || sr.chunkID == sr.totalChunks-1 {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, msgID, sr.msg, err)
})
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(err)
}
}
pool := sr.pool
if pool != nil {
// reset all the fields
*sr = sendRequest{}
pool.Put(sr)
}
}
func (p *partitionProducer) blockIfQueueFull() bool {
//DisableBlockIfQueueFull == false means enable block
return !p.options.DisableBlockIfQueueFull
}
func (p *partitionProducer) reserveSemaphore(sr *sendRequest) error {
for i := 0; i < sr.totalChunks; i++ {
if p.blockIfQueueFull() {
if !p.publishSemaphore.Acquire(sr.ctx) {
return ErrContextExpired
}
// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
// of that only a part of the chunks acquire succeed
sr.semaphore = p.publishSemaphore
sr.reservedSemaphore++
p.metrics.MessagesPending.Inc()
} else {
if !p.publishSemaphore.TryAcquire() {
return ErrSendQueueIsFull
}
// update sr.semaphore and sr.reservedSemaphore here so that we can release semaphore in the case
// of that only a part of the chunks acquire succeed
sr.semaphore = p.publishSemaphore
sr.reservedSemaphore++
p.metrics.MessagesPending.Inc()
}
}
return nil
}
func (p *partitionProducer) reserveMem(sr *sendRequest) error {
requiredMem := sr.uncompressedSize
if !sr.sendAsBatch {
requiredMem = int64(sr.compressedSize)
}
if p.blockIfQueueFull() {
if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
return ErrContextExpired
}
} else {
if !p.client.memLimit.TryReserveMemory(requiredMem) {
return ErrMemoryBufferIsFull
}
}
sr.memLimit = p.client.memLimit
sr.reservedMem += requiredMem
p.metrics.BytesPending.Add(float64(requiredMem))
return nil
}
func (p *partitionProducer) reserveResources(sr *sendRequest) error {
if err := p.reserveSemaphore(sr); err != nil {
return err
}
if err := p.reserveMem(sr); err != nil {
return err
}
return nil
}
type closeProducer struct {
doneCh chan struct{}
}
type flushRequest struct {
doneCh chan struct{}
err error
}
func (i *pendingItem) done(err error) {
if i.isDone {
return
}
i.isDone = true
buffersPool.Put(i.buffer)
if i.flushCallback != nil {
i.flushCallback(err)
}
}
// _setConn sets the internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer when a new connection is available.
func (p *partitionProducer) _setConn(conn internal.Connection) {
p.conn.Store(conn)
}
// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
// Invariant: p.conn must be non-nil for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
}
type chunkRecorder struct {
chunkedMsgID chunkMessageID
}
func newChunkRecorder() *chunkRecorder {
return &chunkRecorder{
chunkedMsgID: chunkMessageID{},
}
}
func (c *chunkRecorder) setFirstChunkID(msgID *messageID) {
c.chunkedMsgID.firstChunkID = msgID
}
func (c *chunkRecorder) setLastChunkID(msgID *messageID) {
c.chunkedMsgID.messageID = msgID
}
func toProtoProducerAccessMode(accessMode ProducerAccessMode) pb.ProducerAccessMode {
switch accessMode {
case ProducerAccessModeShared:
return pb.ProducerAccessMode_Shared
case ProducerAccessModeExclusive:
return pb.ProducerAccessMode_Exclusive
case ProducerAccessModeWaitForExclusive:
return pb.ProducerAccessMode_WaitForExclusive
}
return pb.ProducerAccessMode_Shared
}