blob: bf91c67fa5a2018b379fd3395d3dea7423e68c00 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)
const (
defaultReceiverQueueSize = 1000
)
type reader struct {
sync.Mutex
client *client
messageCh chan ConsumerMessage
log log.Logger
metrics *internal.LeveledMetrics
c *consumer
}
func newReader(client *client, options ReaderOptions) (Reader, error) {
if options.Topic == "" {
return nil, newError(InvalidConfiguration, "Topic is required")
}
if options.StartMessageID == nil {
return nil, newError(InvalidConfiguration, "StartMessageID is required")
}
var startMessageID *trackingMessageID
if !checkMessageIDType(options.StartMessageID) {
// a custom type satisfying MessageID may not be a messageID or trackingMessageID
// so re-create messageID using its data
deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
// de-serialized MessageID is a messageID
startMessageID = toTrackingMessageID(deserMsgID)
} else {
startMessageID = toTrackingMessageID(options.StartMessageID)
}
subscriptionName := options.SubscriptionName
if subscriptionName == "" {
subscriptionName = options.SubscriptionRolePrefix
if subscriptionName == "" {
subscriptionName = "reader"
}
subscriptionName += "-" + generateRandomName()
}
receiverQueueSize := options.ReceiverQueueSize
if receiverQueueSize <= 0 {
receiverQueueSize = defaultReceiverQueueSize
}
// decryption is enabled, use default message crypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
false,
client.log.SubLogger(log.Fields{"topic": options.Topic}))
if err != nil {
return nil, err
}
options.Decryption.MessageCrypto = messageCrypto
}
if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}
if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}
consumerOptions := &ConsumerOptions{
Topic: options.Topic,
Name: options.Name,
SubscriptionName: subscriptionName,
Type: Exclusive,
ReceiverQueueSize: receiverQueueSize,
SubscriptionMode: NonDurable,
ReadCompacted: options.ReadCompacted,
Properties: options.Properties,
NackRedeliveryDelay: defaultNackRedeliveryDelay,
ReplicateSubscriptionState: false,
Decryption: options.Decryption,
Schema: options.Schema,
BackoffPolicy: options.BackoffPolicy,
MaxPendingChunkedMessage: options.MaxPendingChunkedMessage,
ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
AutoAckIncompleteChunk: options.AutoAckIncompleteChunk,
startMessageID: startMessageID,
StartMessageIDInclusive: options.StartMessageIDInclusive,
}
reader := &reader{
client: client,
messageCh: make(chan ConsumerMessage),
log: client.log.SubLogger(log.Fields{"topic": options.Topic}),
metrics: client.metrics.GetLeveledMetrics(options.Topic),
}
// Provide dummy dlq router with not dlq policy
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}
// Provide dummy rlq router with not dlq policy
rlq, err := newRetryRouter(client, nil, false, client.log)
if err != nil {
return nil, err
}
c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false)
if err != nil {
close(reader.messageCh)
return nil, err
}
reader.c = c
reader.metrics.ReadersOpened.Inc()
return reader, nil
}
func (r *reader) Topic() string {
return r.c.topic
}
func (r *reader) Next(ctx context.Context) (Message, error) {
for {
select {
case cm, ok := <-r.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
// Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects,
// it will specify the subscription position anyway
msgID := cm.Message.ID()
err := r.c.setLastDequeuedMsg(msgID)
if err != nil {
return nil, err
}
err = r.c.AckID(msgID)
if err != nil {
return nil, err
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (r *reader) HasNext() bool {
return r.c.hasNext()
}
func (r *reader) Close() {
r.c.Close()
r.client.handlers.Del(r)
r.metrics.ReadersClosed.Inc()
}
func (r *reader) messageID(msgID MessageID) *trackingMessageID {
mid := toTrackingMessageID(msgID)
partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 {
r.log.Warnf("invalid partition index %d expected", partition)
return nil
}
return mid
}
func (r *reader) Seek(msgID MessageID) error {
r.Lock()
defer r.Unlock()
if !checkMessageIDType(msgID) {
r.log.Warnf("invalid message id type %T", msgID)
return fmt.Errorf("invalid message id type %T", msgID)
}
mid := r.messageID(msgID)
if mid == nil {
return nil
}
return r.c.Seek(mid)
}
func (r *reader) SeekByTime(time time.Time) error {
r.Lock()
defer r.Unlock()
return r.c.SeekByTime(time)
}
func (r *reader) GetLastMessageID() (MessageID, error) {
if len(r.c.consumers) > 1 {
return nil, fmt.Errorf("GetLastMessageID is not supported for multi-topics reader")
}
return r.c.consumers[0].getLastMessageID()
}