blob: 76106773fa0e3d1ce43ac7e0ff7d36fe7b94d547 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
@Slf4j
public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final Subscription subscription;
protected AbstractBaseDispatcher(Subscription subscription) {
this.subscription = subscription;
}
/**
* Filter messages that are being sent to a consumers.
* <p>
* Messages can be filtered out for multiple reasons:
* <ul>
* <li>Checksum or metadata corrupted
* <li>Message is an internal marker
* <li>Message is not meant to be delivered immediately
* </ul>
*
* @param entries
* a list of entries as read from storage
*
* @param batchSizes
* an array where the batch size for each entry (the number of messages within an entry) is stored. This
* array needs to be of at least the same size as the entries list
*
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
if (!isReplayRead && msgMetadata != null
&& msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
entries.set(i, null);
entry.release();
continue;
} else if (((PersistentTopic) subscription.getTopic()).isTxnAborted(
new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
} else if (msgMetadata.hasDeliverAtTime()
&& trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
continue;
}
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
batchSizes.setBatchSize(i, batchSize);
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
ackSet = cursor.getDeletedBatchIndexesAsLongArray(
PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
} else {
indexesAcks.setIndexesAcks(i, null);
}
}
BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
interceptor.beforeSendMessage(
subscription,
entry,
ackSet,
msgMetadata
);
}
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
*/
protected abstract boolean isConsumersExceededOnSubscription();
protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
String topic, int consumerSize) {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(brokerService
.getTopicPolicies(TopicName.get(topic)))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
}
} catch (Exception e) {
log.debug("Get topic or namespace policies fail", e);
}
if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies != null
&& policies.max_consumers_per_subscription != null
&& policies.max_consumers_per_subscription >= 0
? policies.max_consumers_per_subscription :
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
}
return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
}
private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);
try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
}
}
public void resetCloseFuture() {
// noop
}
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}
protected void addMessageToReplay(long ledgerId, long entryId) {
// No-op
}
}