| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| |
| package pulsar |
| |
| import ( |
| "context" |
| "github.com/apache/pulsar-client-go/pulsar/internal" |
| ) |
| |
| type producer struct { |
| topic string |
| producers []Producer |
| messageRouter func(*ProducerMessage, TopicMetadata) int |
| } |
| |
| func getHashingFunction(s HashingScheme) func(string) uint32 { |
| switch s { |
| case JavaStringHash: |
| return internal.JavaStringHash |
| case Murmur3_32Hash: |
| return internal.Murmur3_32Hash |
| default: |
| return internal.JavaStringHash |
| } |
| } |
| |
| func newProducer(client *client, options *ProducerOptions) (*producer, error) { |
| if options.Topic == "" { |
| return nil, newError(ResultInvalidTopicName, "Topic name is required for producer") |
| } |
| |
| p := &producer{ |
| topic: options.Topic, |
| } |
| |
| if options.MessageRouter == nil { |
| internalRouter := internal.NewDefaultRouter( |
| internal.NewSystemClock(), |
| getHashingFunction(options.HashingScheme), |
| options.BatchingMaxPublishDelay) |
| p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int { |
| return internalRouter(message.Key, metadata.NumPartitions()) |
| } |
| } |
| |
| partitions, err := client.TopicPartitions(options.Topic) |
| if err != nil { |
| return nil, err |
| } |
| |
| numPartitions := len(partitions) |
| p.producers = make([]Producer, numPartitions) |
| |
| type ProducerError struct { |
| partition int |
| Producer |
| error |
| } |
| |
| c := make(chan ProducerError, numPartitions) |
| |
| for partitionIdx, partition := range partitions { |
| go func(partitionIdx int, partition string) { |
| prod, err := newPartitionProducer(client, partition, options, partitionIdx) |
| c <- ProducerError{partitionIdx, prod, err} |
| }(partitionIdx, partition) |
| } |
| |
| for i := 0; i < numPartitions; i++ { |
| pe := <-c |
| err = pe.error |
| p.producers[pe.partition] = pe.Producer |
| } |
| |
| if err != nil { |
| // Since there were some failures, cleanup all the partitions that succeeded in creating the producers |
| for _, producer := range p.producers { |
| if producer != nil { |
| _ = producer.Close() |
| } |
| } |
| return nil, err |
| } |
| |
| return p, nil |
| } |
| |
| func (p *producer) Topic() string { |
| return p.topic |
| } |
| |
| func (p *producer) Name() string { |
| return p.producers[0].Name() |
| } |
| |
| func (p *producer) NumPartitions() uint32 { |
| return uint32(len(p.producers)) |
| } |
| |
| func (p *producer) Send(ctx context.Context, msg *ProducerMessage) error { |
| partition := p.messageRouter(msg, p) |
| return p.producers[partition].Send(ctx, msg) |
| } |
| |
| func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) { |
| partition := p.messageRouter(msg, p) |
| p.producers[partition].SendAsync(ctx, msg, callback) |
| } |
| |
| func (p *producer) LastSequenceID() int64 { |
| var maxSeq int64 = -1 |
| for _, pp := range p.producers { |
| s := pp.LastSequenceID() |
| if s > maxSeq { |
| maxSeq = s |
| } |
| } |
| return maxSeq |
| } |
| |
| func (p *producer) Flush() error { |
| for _, pp := range p.producers { |
| return pp.Flush() |
| } |
| return nil |
| } |
| |
| func (p *producer) Close() error { |
| for _, pp := range p.producers { |
| return pp.Close() |
| } |
| return nil |
| } |