blob: 51ae69ebd5752af80fd24b114cc70cebe2454f21 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)
const (
// defaultSendTimeout init default timeout for ack since sent.
defaultSendTimeout = 30 * time.Second
// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
// defaultMaxBatchSize init default for maximum number of bytes per batch
defaultMaxBatchSize = 128 * 1024
// defaultMaxMessagesPerBatch init default num of entries in per batch.
defaultMaxMessagesPerBatch = 1000
)
var (
producersOpened = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the client",
})
producersClosed = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_producers_closed",
Help: "Counter of producers closed by the client",
})
producersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pulsar_client_producers_partitions_active",
Help: "Counter of individual partitions the producers are currently active",
})
)
type producer struct {
sync.RWMutex
client *client
options *ProducerOptions
topic string
producers []Producer
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}
log log.Logger
}
var partitionsAutoDiscoveryInterval = 1 * time.Minute
func getHashingFunction(s HashingScheme) func(string) uint32 {
switch s {
case JavaStringHash:
return internal.JavaStringHash
case Murmur3_32Hash:
return internal.Murmur3_32Hash
default:
return internal.JavaStringHash
}
}
func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
}
if options.SendTimeout == 0 {
options.SendTimeout = defaultSendTimeout
}
if options.BatchingMaxMessages == 0 {
options.BatchingMaxMessages = defaultMaxMessagesPerBatch
}
if options.BatchingMaxSize == 0 {
options.BatchingMaxSize = defaultMaxBatchSize
}
if options.BatchingMaxPublishDelay <= 0 {
options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
p := &producer{
options: options,
topic: options.Topic,
client: client,
log: client.log.SubLogger(log.Fields{"topic": options.Topic}),
}
if options.Interceptors == nil {
options.Interceptors = defaultProducerInterceptors
}
if options.MessageRouter == nil {
internalRouter := NewDefaultRouter(
getHashingFunction(options.HashingScheme),
options.BatchingMaxMessages,
options.BatchingMaxSize,
options.BatchingMaxPublishDelay,
options.DisableBatching)
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message, metadata.NumPartitions())
}
} else {
p.messageRouter = options.MessageRouter
}
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
}
}
err := p.internalCreatePartitionsProducers()
if err != nil {
return nil, err
}
ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
p.ticker = ticker
p.tickerStop = make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()
producersOpened.Inc()
return p, nil
}
func (p *producer) internalCreatePartitionsProducers() error {
partitions, err := p.client.TopicPartitions(p.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
p.Lock()
defer p.Unlock()
oldProducers := p.producers
if oldProducers != nil {
oldNumPartitions = len(oldProducers)
if oldNumPartitions == newNumPartitions {
p.log.Debug("Number of partitions in topic has not changed")
return nil
}
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
p.producers = make([]Producer, newNumPartitions)
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
p.producers[i] = oldProducers[i]
}
type ProducerError struct {
partition int
prod Producer
err error
}
partitionsToAdd := newNumPartitions - oldNumPartitions
c := make(chan ProducerError, partitionsToAdd)
for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
partition := partitions[partitionIdx]
go func(partitionIdx int, partition string) {
prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx)
c <- ProducerError{
partition: partitionIdx,
prod: prod,
err: e,
}
}(partitionIdx, partition)
}
for i := 0; i < partitionsToAdd; i++ {
pe, ok := <-c
if ok {
if pe.err != nil {
err = pe.err
} else {
p.producers[pe.partition] = pe.prod
}
}
}
if err != nil {
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
for _, producer := range p.producers {
if producer != nil {
producer.Close()
}
}
return err
}
producersPartitions.Add(float64(partitionsToAdd))
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
}
func (p *producer) Topic() string {
return p.topic
}
func (p *producer) Name() string {
p.RLock()
defer p.RUnlock()
return p.producers[0].Name()
}
func (p *producer) NumPartitions() uint32 {
return atomic.LoadUint32(&p.numPartitions)
}
func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
return p.getPartition(msg).Send(ctx, msg)
}
func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.getPartition(msg).SendAsync(ctx, msg, callback)
}
func (p *producer) getPartition(msg *ProducerMessage) Producer {
// Since partitions can only increase, it's ok if the producers list
// is updated in between. The numPartition is updated only after the list.
partition := p.messageRouter(msg, p)
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
if partition >= len(producers) {
// We read the old producers list while the count was already
// updated
partition %= len(producers)
}
return producers[partition]
}
func (p *producer) LastSequenceID() int64 {
p.RLock()
defer p.RUnlock()
var maxSeq int64 = -1
for _, pp := range p.producers {
s := pp.LastSequenceID()
if s > maxSeq {
maxSeq = s
}
}
return maxSeq
}
func (p *producer) Flush() error {
p.RLock()
defer p.RUnlock()
for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
return err
}
}
return nil
}
func (p *producer) Close() {
p.Lock()
defer p.Unlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}
for _, pp := range p.producers {
pp.Close()
}
p.client.handlers.Del(p)
producersPartitions.Sub(float64(len(p.producers)))
producersClosed.Inc()
}