blob: 72422dc89c28b2b12dd4bcc156f06f0625e152e7 [file] [log] [blame]
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
import (
"context"
"time"
)
// Pair of a Consumer and Message
type ConsumerMessage struct {
Consumer
Message
}
// Types of subscription supported by Pulsar
type SubscriptionType int
const (
// There can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
// Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
// Multiple consumer will be able to use the same subscription and all messages with the same key
// will be dispatched to only one consumer
KeyShared
)
type InitialPosition int
const (
// Latest position which means the start consuming position will be the last message
Latest InitialPosition = iota
// Earliest position which means the start consuming position will be the first message
Earliest
)
// ConsumerBuilder is used to configure and create instances of Consumer
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string
// Specify a list of topics this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topics []string
// Specify a regular expression to subscribe to multiple topics under the same namespace.
// Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPattern string
// Specify the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
// Attach a set of application defined properties to the consumer
// This properties will be visible in the topic stats
Properties map[string]string
// Set the timeout for unacked messages
// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
// Default is 0, which means message are not being replayed based on ack time
AckTimeout time.Duration
// The delay after which to redeliver the messages that failed to be
// processed. Default is 1min. (See `Consumer.Nack()`)
NackRedeliveryDelay *time.Duration
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
// InitialPosition at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitPos InitialPosition
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
// Sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
// Set to -1 to disable prefetching in consumer
ReceiverQueueSize int
// Set the max total receiver queue size across partitions.
// This setting will be used to reduce the receiver queue size for individual partitions
// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
MaxTotalReceiverQueueSizeAcrossPartitions int
// Set the consumer name.
Name string
// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
// point, the messages will be sent as normal.
//
// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool
Schema
}
// An interface that abstracts behavior of Pulsar's consumer
type Consumer interface {
// Get the topic for the consumer
Topic() string
// Get a subscription for the consumer
Subscription() string
// Unsubscribe the consumer
Unsubscribe() error
// Receives a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
//Ack the consumption of a single message
Ack(Message) error
// Ack the consumption of a single message, identified by its MessageID
AckID(MessageID) error
// Ack the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
//
// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
//
// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
AckCumulative(Message) error
// Ack the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
//
// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
//
// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
AckCumulativeID(MessageID) error
// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NAckRedeliveryDelay .
//
// This call is not blocking.
Nack(Message) error
// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NackRedeliveryDelay .
//
// This call is not blocking.
NackID(MessageID) error
// Close the consumer and stop the broker to push more messages
Close() error
// Reset the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(msgID MessageID) error
// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
RedeliverUnackedMessages()
Schema() Schema
}