blob: d49626f1567c251fbd4e38a3916b3201dbb0359a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"fmt"
"math"
"time"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
type consumerState int
const (
consumerInit consumerState = iota
consumerReady
consumerClosing
consumerClosed
)
type partitionConsumerOpts struct {
topic string
consumerName string
subscription string
subscriptionType SubscriptionType
subscriptionInitPos SubscriptionInitialPosition
partitionIdx int
receiverQueueSize int
nackRedeliveryDelay time.Duration
}
type partitionConsumer struct {
client *client
// this is needed for sending ConsumerMessage on the messageCh
parentConsumer Consumer
state consumerState
options *partitionConsumerOpts
conn internal.Connection
topic string
name string
consumerID uint64
partitionIdx int
// shared channel
messageCh chan ConsumerMessage
// the number of message slots available
availablePermits int32
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
nackTracker *negativeAcksTracker
log *log.Entry
}
func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage) (*partitionConsumer, error) {
pc := &partitionConsumer{
state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: options.partitionIdx,
eventsCh: make(chan interface{}, 3),
queueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
log: log.WithField("topic", options.topic),
}
pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
err := pc.grabConn()
if err != nil {
log.WithError(err).Errorf("Failed to create consumer")
return nil, err
}
pc.log.Info("Created consumer")
pc.state = consumerReady
go pc.dispatcher()
go pc.runEventsLoop()
return pc, nil
}
func (pc *partitionConsumer) Unsubscribe() error {
req := &unsubscribeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
// wait for the request to complete
<-req.doneCh
return req.err
}
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
ConsumerId: proto.Uint64(pc.consumerID),
}
_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
}
pc.conn.DeleteConsumeHandler(pc.consumerID)
}
func (pc *partitionConsumer) AckID(msgID *messageID) {
if msgID != nil && msgID.ack() {
req := &ackRequest{
msgID: msgID,
}
pc.eventsCh <- req
}
}
func (pc *partitionConsumer) NackID(msgID *messageID) {
pc.nackTracker.Add(msgID)
}
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
pc.eventsCh <- &redeliveryRequest{msgIds}
}
func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
msgIds := req.msgIds
pc.log.Debug("Request redelivery after negative ack for messages", msgIds)
msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
for i := 0; i < len(msgIds); i++ {
msgIdDataList[i] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
EntryId: proto.Uint64(uint64(msgIds[i].entryID)),
}
}
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
MessageIds: msgIdDataList,
})
}
func (pc *partitionConsumer) Close() {
if pc.state != consumerReady {
return
}
req := &closeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
// wait for request to finish
<-req.doneCh
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
msgId := req.msgID
messageIDs := make([]*pb.MessageIdData, 1)
messageIDs[0] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
EntryId: proto.Uint64(uint64(msgId.entryID)),
}
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId: messageIDs,
AckType: pb.CommandAck_Individual.Enum(),
}
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_ACK, cmdAck)
}
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()
reader := internal.NewMessageReader(headersAndPayload)
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
// TODO send discardCorruptedMessage
return err
}
numMsgs := 1
if msgMeta.NumMessagesInBatch != nil {
numMsgs = int(msgMeta.GetNumMessagesInBatch())
}
messages := make([]*message, numMsgs)
var ackTracker *ackTracker
// are there multiple messages in this batch?
if numMsgs > 1 {
ackTracker = newAckTracker(numMsgs)
}
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil {
// TODO send corrupted message
return err
}
msgID := newTrackingMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
i,
pc.partitionIdx,
ackTracker)
var msg *message
if smm != nil {
msg = &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(smm.GetEventTime()),
key: smm.GetPartitionKey(),
properties: internal.ConvertToStringMap(smm.GetProperties()),
topic: pc.topic,
msgID: msgID,
payLoad: payload,
}
} else {
msg = &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: msgID,
payLoad: payload,
}
}
messages[i] = msg
}
// send messages to the dispatcher
pc.queueCh <- messages
return nil
}
func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.eventsCh <- &connectionClosed{}
}
// Flow command gives additional permits to send messages to the consumer.
// A typical consumer implementation will use a queue to accumulate these messages
// before the application is ready to consume them. After the consumer is ready,
// the client needs to give permission to the broker to push messages.
func (pc *partitionConsumer) internalFlow(permits uint32) error {
if permits == 0 {
return fmt.Errorf("invalid number of permits requested: %d", permits)
}
cmdFlow := &pb.CommandFlow{
ConsumerId: proto.Uint64(pc.consumerID),
MessagePermits: proto.Uint32(permits),
}
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_FLOW, cmdFlow)
return nil
}
// dispatcher manages the internal message queue channel
// and manages the flow control
func (pc *partitionConsumer) dispatcher() {
defer func() {
pc.log.Info("exiting dispatch loop")
}()
var messages []*message
for {
var queueCh chan []*message
var messageCh chan ConsumerMessage
var nextMessage ConsumerMessage
// are there more messages to send?
if len(messages) > 0 {
nextMessage = ConsumerMessage{
Consumer: pc.parentConsumer,
Message: messages[0],
}
messageCh = pc.messageCh
} else {
// we are ready for more messages
queueCh = pc.queueCh
}
select {
case <-pc.closeCh:
return
case _, ok := <-pc.connectedCh:
if !ok {
return
}
pc.log.Debug("dispatcher received connection event")
// drain messages
messages = nil
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
}
}
// reset available permits
pc.availablePermits = 0
initialPermits := uint32(pc.queueSize)
pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
if err := pc.internalFlow(initialPermits); err != nil {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}
case msgs, ok := <-queueCh:
if !ok {
return
}
// we only read messages here after the consumer has processed all messages
// in the previous batch
messages = msgs
// if the messageCh is nil or the messageCh is full this will not be selected
case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
// TODO implement a better flow controller
// send more permits if needed
pc.availablePermits++
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
if pc.availablePermits >= flowThreshold {
availablePermits := pc.availablePermits
requestedPermits := availablePermits
pc.availablePermits = 0
pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}
}
}
}
type ackRequest struct {
msgID *messageID
}
type unsubscribeRequest struct {
doneCh chan struct{}
err error
}
type closeRequest struct {
doneCh chan struct{}
}
type redeliveryRequest struct {
msgIds []messageID
}
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Info("exiting events loop")
}()
for {
select {
case <-pc.closeCh:
return
case i := <-pc.eventsCh:
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
case *redeliveryRequest:
pc.internalRedeliver(v)
case *unsubscribeRequest:
pc.internalUnsubscribe(v)
case *connectionClosed:
pc.reconnectToBroker()
case *closeRequest:
pc.internalClose(v)
return
}
}
}
}
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
if pc.state != consumerReady {
return
}
pc.state = consumerClosing
pc.log.Infof("Closing consumer=%d", pc.consumerID)
requestID := pc.client.rpcClient.NewConsumerID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
if err != nil {
pc.log.WithError(err).Warn("Failed to close consumer")
} else {
pc.log.Info("Closed consumer")
}
pc.state = consumerClosed
pc.conn.DeleteConsumeHandler(pc.consumerID)
pc.nackTracker.Close()
close(pc.closeCh)
}
func (pc *partitionConsumer) reconnectToBroker() {
backoff := internal.Backoff{}
for {
if pc.state != consumerReady {
// Consumer is already closing
return
}
d := backoff.Next()
pc.log.Info("Reconnecting to broker in ", d)
time.Sleep(d)
err := pc.grabConn()
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
return
}
}
}
func (pc *partitionConsumer) grabConn() error {
lr, err := pc.client.lookupService.Lookup(pc.topic)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
return err
}
pc.log.Debugf("Lookup result: %+v", lr)
subType := toProtoSubType(pc.options.subscriptionType)
initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
requestID := pc.client.rpcClient.NewRequestID()
cmdSubscribe := &pb.CommandSubscribe{
RequestId: proto.Uint64(requestID),
Topic: proto.String(pc.topic),
SubType: subType.Enum(),
Subscription: proto.String(pc.options.subscription),
ConsumerId: proto.Uint64(pc.consumerID),
ConsumerName: proto.String(pc.name),
InitialPosition: initialPosition.Enum(),
Schema: nil,
}
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
return err
}
if res.Response.ConsumerStatsResponse != nil {
pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
}
pc.conn = res.Cnx
pc.log.Info("Connected consumer")
pc.conn.AddConsumeHandler(pc.consumerID, pc)
msgType := res.Response.GetType()
switch msgType {
case pb.BaseCommand_SUCCESS:
// notify the dispatcher we have connection
go func() {
pc.connectedCh <- struct{}{}
}()
return nil
case pb.BaseCommand_ERROR:
errMsg := res.Response.GetError()
return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
default:
return newUnexpectedErrMsg(msgType, requestID)
}
}