blob: db52b75db93afdcdb8ac52b8df45360bcdbae3a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.api.Commands.readChecksum;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* A Consumer is a consumer currently connected and associated with a Subscription
*/
public class Consumer {
private final Subscription subscription;
private final SubType subType;
private final ServerCnx cnx;
private final String appId;
private final String topicName;
private final long consumerId;
private final int priorityLevel;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
// increase its availability
private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
private volatile int messagePermits = 0;
// It starts keep tracking of messagePermits once consumer gets blocked, as consumer needs two separate counts:
// messagePermits (1) before and (2) after being blocked: to dispatch only blockedPermit number of messages at the
// time of redelivery
private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
private volatile int permitsReceivedWhileConsumerBlocked = 0;
private final ConcurrentLongLongPairHashMap pendingAcks;
private final ConsumerStats stats;
private final int maxUnackedMessages;
private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
this.appId = appId;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DateFormatter.now();
stats.clientVersion = cnx.getClientVersion();
if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
}
}
public SubType subType() {
return subType;
}
public long consumerId() {
return consumerId;
}
public String consumerName() {
return consumerName;
}
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries object.</b>
*
* @return a promise that can be use to track when all the data has been written into the socket
*/
public SendMessageInfo sendMessages(final List<Entry> entries) {
final ChannelHandlerContext ctx = cnx.ctx();
final SendMessageInfo sentMessages = new SendMessageInfo();
final ChannelPromise writePromise = ctx.newPromise();
sentMessages.channelPromse = writePromise;
if (entries.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
topicName, subscription, consumerId);
}
writePromise.setSuccess();
sentMessages.totalSentMessages = 0;
sentMessages.totalSentMessageBytes = 0;
return sentMessages;
}
try {
updatePermitsAndPendingAcks(entries, sentMessages);
} catch (PulsarServerException pe) {
log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId,
cnx.getRemoteEndpointProtocolVersion());
subscription.markTopicWithBatchMessagePublished();
sentMessages.totalSentMessages = 0;
sentMessages.totalSentMessageBytes = 0;
// disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this
// consumer to other consumer
disconnect();
return sentMessages;
}
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
PositionImpl pos = (PositionImpl) entry.getPosition();
MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
MessageIdData messageId = messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
.build();
ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
metadataAndPayload.retain();
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
readChecksum(metadataAndPayload);
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Sending message to consumerId {}, entry id {}", topicName, subscription,
consumerId, pos.getEntryId());
}
// We only want to pass the "real" promise on the last entry written
ChannelPromise promise = ctx.voidPromise();
if (i == (entries.size() - 1)) {
promise = writePromise;
}
ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise);
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
}
ctx.flush();
});
return sentMessages;
}
private void incrementUnackedMessages(int ackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
}
public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
try {
// save the reader index and restore after parsing
metadataAndPayload.markReaderIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
int batchSize = metadata.getNumMessagesInBatch();
metadata.recycle();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] num messages in batch are {} ", subscription, consumerId, batchSize);
}
return batchSize;
} catch (Throwable t) {
log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t);
}
return -1;
}
void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sentMessages) throws PulsarServerException {
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
long totalReadableBytes = 0;
boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId);
if (batchSize == -1) {
// this would suggest that the message might have been corrupted
iter.remove();
PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(pos, AckType.Individual);
continue;
}
if (pendingAcks != null) {
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
}
// check if consumer supports batch message
if (batchSize > 1 && !clientSupportBatchMessages) {
unsupportedVersion = true;
}
totalReadableBytes += metadataAndPayload.readableBytes();
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (unsupportedVersion) {
throw new PulsarServerException("Consumer does not support batch-message");
}
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] [{}] message permits dropped below 0 - {}", topicName, subscription, consumerId,
permits);
}
}
msgOut.recordMultipleEvents(permitsToReduce, totalReadableBytes);
sentMessages.totalSentMessages = permitsToReduce;
sentMessages.totalSentMessageBytes = totalReadableBytes;
}
public boolean isWritable() {
return cnx.isWritable();
}
public void sendError(ByteBuf error) {
cnx.ctx().writeAndFlush(error).addListener(ChannelFutureListener.CLOSE);
}
/**
* Close the consumer if: a. the connection is dropped b. connection is open (graceful close) and there are no
* pending message acks
*/
public void close() throws BrokerServiceException {
subscription.removeConsumer(this);
cnx.removedConsumer(this);
}
public void disconnect() {
log.info("Disconnecting consumer: {}", this);
cnx.closeConsumer(this);
try {
close();
} catch (BrokerServiceException e) {
log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e);
}
}
void doUnsubscribe(final long requestId) {
final ChannelHandlerContext ctx = cnx.ctx();
subscription.doUnsubscribe(this).thenAccept(v -> {
log.info("Unsubscribed successfully from {}", subscription);
cnx.removedConsumer(this);
ctx.writeAndFlush(Commands.newSuccess(requestId));
}).exceptionally(exception -> {
log.warn("Unsubscribe failed for {}", subscription, exception);
ctx.writeAndFlush(
Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getCause().getMessage()));
return null;
});
}
void messageAcked(CommandAck ack) {
MessageIdData msgId = ack.getMessageId();
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
if (ack.hasValidationError()) {
log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription, consumerId,
position, ack.getValidationError());
}
if (subType == SubType.Shared) {
// On shared subscriptions, cumulative ack is not supported
checkArgument(ack.getAckType() == AckType.Individual);
// Only ack a single message
removePendingAcks(position);
subscription.acknowledgeMessage(position, AckType.Individual);
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
}
}
void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = ", topicName,
subscription, additionalNumberOfMessages, oldPermits, blockedConsumerOnUnackedMsgs);
}
}
/**
* Triggers dispatcher to dispatch {@code blockedPermits} number of messages and adds same number of permits to
* {@code messagePermits} as it maintains count of actual dispatched message-permits.
*
* @param consumer:
* Consumer whose blockedPermits needs to be dispatched
*/
void flowConsumerBlockedPermits(Consumer consumer) {
int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
// add newly flow permits to actual consumer.messagePermits
MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
// dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer
subscription.consumerFlow(consumer, additionalNumberOfPermits);
}
public int getAvailablePermits() {
return MESSAGE_PERMITS_UPDATER.get(this);
}
public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}
public void reachedEndOfTopic() {
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9_VALUE) {
log.info("[{}] Notifying consumer that end of topic has been reached", this);
cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
}
}
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link maxUnackedMessages} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
}
public void updateRates() {
msgOut.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
}
public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
return stats;
}
public int getUnackedMessages() {
return unackedMessages;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
}
public ChannelHandlerContext ctx() {
return cnx.ctx();
}
public void checkPermissions() {
DestinationName destination = DestinationName.get(subscription.getDestination());
if (cnx.getBrokerService().getAuthorizationManager() != null) {
try {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
return;
}
} catch (Exception e) {
log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, subscription.getDestination(),
e.getMessage(), e);
}
log.info("[{}] is not allowed to consume from Destination" + " [{}] anymore", appId,
subscription.getDestination());
disconnect();
}
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Consumer) {
Consumer other = (Consumer) obj;
return Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()) && consumerId == other.consumerId;
}
return false;
}
@Override
public int hashCode() {
return consumerName.hashCode() + 31 * cnx.hashCode();
}
/**
* first try to remove ack-position from the current_consumer's pendingAcks.
* if ack-message doesn't present into current_consumer's pendingAcks
* a. try to remove from other connected subscribed consumers (It happens when client
* tries to acknowledge message through different consumer under the same subscription)
*
*
* @param position
*/
private void removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(position.getLedgerId(), position.getEntryId())) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
}
// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnedConsumer != null) {
int totalAckedMsgs = (int) ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()).first;
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (((addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
}
}
public ConcurrentLongLongPairHashMap getPendingAcks() {
return pendingAcks;
}
public int getPriorityLevel() {
return priorityLevel;
}
public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs(this);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
}
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
if (pendingAcks != null) {
AtomicInteger totalRedeliveryMessages = new AtomicInteger(0);
pendingAcks.forEach(
(ledgerId, entryId, batchSize, none) -> totalRedeliveryMessages.addAndGet((int) batchSize));
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.get(), totalRedeliveryMessages.get());
pendingAcks.clear();
}
}
public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
int totalRedeliveryMessages = 0;
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId());
LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (batchSize != null) {
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
totalRedeliveryMessages += batchSize.first;
pendingPositions.add(position);
}
}
addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId,
totalRedeliveryMessages, pendingPositions.size());
}
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
int numberOfBlockedPermits = Math.min(totalRedeliveryMessages,
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.get(this));
// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, -numberOfBlockedPermits);
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
subscription.consumerFlow(this, numberOfBlockedPermits);
}
}
public Subscription getSubscription() {
return subscription;
}
private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
subscription.addUnAckedMessages(ackedMessages);
return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
}
private void clearUnAckedMsgs(Consumer consumer) {
int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
subscription.addUnAckedMessages(-unaAckedMsgs);
}
public static class SendMessageInfo {
ChannelPromise channelPromse;
int totalSentMessages;
long totalSentMessageBytes;
public ChannelPromise getChannelPromse() {
return channelPromse;
}
public void setChannelPromse(ChannelPromise channelPromse) {
this.channelPromse = channelPromse;
}
public int getTotalSentMessages() {
return totalSentMessages;
}
public void setTotalSentMessages(int totalSentMessages) {
this.totalSentMessages = totalSentMessages;
}
public long getTotalSentMessageBytes() {
return totalSentMessageBytes;
}
public void setTotalSentMessageBytes(long totalSentMessageBytes) {
this.totalSentMessageBytes = totalSentMessageBytes;
}
}
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}