blob: 2a2b63e8e396e2af858e0344da63af3fe1d942e5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
var (
producersOpened = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the client",
})
producersClosed = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_producers_closed",
Help: "Counter of producers closed by the client",
})
producersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pulsar_client_producers_partitions_active",
Help: "Counter of individual partitions the producers are currently active",
})
)
type producer struct {
sync.RWMutex
client *client
options *ProducerOptions
topic string
producers []Producer
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}
log *log.Entry
}
const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
var partitionsAutoDiscoveryInterval = 1 * time.Minute
func getHashingFunction(s HashingScheme) func(string) uint32 {
switch s {
case JavaStringHash:
return internal.JavaStringHash
case Murmur3_32Hash:
return internal.Murmur3_32Hash
default:
return internal.JavaStringHash
}
}
func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
return nil, newError(ResultInvalidTopicName, "Topic name is required for producer")
}
p := &producer{
options: options,
topic: options.Topic,
client: client,
log: log.WithField("topic", options.Topic),
}
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
batchingMaxPublishDelay = options.BatchingMaxPublishDelay
} else {
batchingMaxPublishDelay = defaultBatchingMaxPublishDelay
}
if options.Interceptors == nil {
options.Interceptors = defaultProducerInterceptors
}
if options.MessageRouter == nil {
internalRouter := internal.NewDefaultRouter(
internal.NewSystemClock(),
getHashingFunction(options.HashingScheme),
batchingMaxPublishDelay, options.DisableBatching)
p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int {
return internalRouter(message.Key, metadata.NumPartitions())
}
} else {
p.messageRouter = options.MessageRouter
}
err := p.internalCreatePartitionsProducers()
if err != nil {
return nil, err
}
ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
p.ticker = ticker
p.tickerStop = make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()
producersOpened.Inc()
return p, nil
}
func (p *producer) internalCreatePartitionsProducers() error {
partitions, err := p.client.TopicPartitions(p.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
p.Lock()
defer p.Unlock()
oldProducers := p.producers
if oldProducers != nil {
oldNumPartitions = len(oldProducers)
if oldNumPartitions == newNumPartitions {
p.log.Debug("Number of partitions in topic has not changed")
return nil
}
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
p.producers = make([]Producer, newNumPartitions)
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
p.producers[i] = oldProducers[i]
}
type ProducerError struct {
partition int
prod Producer
err error
}
partitionsToAdd := newNumPartitions - oldNumPartitions
c := make(chan ProducerError, partitionsToAdd)
for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
partition := partitions[partitionIdx]
go func(partitionIdx int, partition string) {
prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx)
c <- ProducerError{
partition: partitionIdx,
prod: prod,
err: e,
}
}(partitionIdx, partition)
}
for i := 0; i < partitionsToAdd; i++ {
pe, ok := <-c
if ok {
if pe.err != nil {
err = pe.err
} else {
p.producers[pe.partition] = pe.prod
}
}
}
if err != nil {
// Since there were some failures, cleanup all the partitions that succeeded in creating the producers
for _, producer := range p.producers {
if producer != nil {
producer.Close()
}
}
return err
}
producersPartitions.Add(float64(partitionsToAdd))
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
}
func (p *producer) Topic() string {
return p.topic
}
func (p *producer) Name() string {
p.RLock()
defer p.RUnlock()
return p.producers[0].Name()
}
func (p *producer) NumPartitions() uint32 {
return atomic.LoadUint32(&p.numPartitions)
}
func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
return p.getPartition(msg).Send(ctx, msg)
}
func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.getPartition(msg).SendAsync(ctx, msg, callback)
}
func (p *producer) getPartition(msg *ProducerMessage) Producer {
// Since partitions can only increase, it's ok if the producers list
// is updated in between. The numPartition is updated only after the list.
partition := p.messageRouter(msg, p)
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
if partition >= len(producers) {
// We read the old producers list while the count was already
// updated
partition %= len(producers)
}
return producers[partition]
}
func (p *producer) LastSequenceID() int64 {
p.RLock()
defer p.RUnlock()
var maxSeq int64 = -1
for _, pp := range p.producers {
s := pp.LastSequenceID()
if s > maxSeq {
maxSeq = s
}
}
return maxSeq
}
func (p *producer) Flush() error {
p.RLock()
defer p.RUnlock()
for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
return err
}
}
return nil
}
func (p *producer) Close() {
p.RLock()
defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}
for _, pp := range p.producers {
pp.Close()
}
p.client.handlers.Del(p)
producersPartitions.Sub(float64(len(p.producers)))
producersClosed.Inc()
}