blob: 5098890242b6cb0786ee97a59bf6ee493315a683 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.client.impl.Murmur3Hash32;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher {
protected final String topicName;
protected static final AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
AbstractDispatcherSingleActiveConsumer.class, Consumer.class, "activeConsumer");
private volatile Consumer activeConsumer = null;
protected final CopyOnWriteArrayList<Consumer> consumers;
protected StickyKeyConsumerSelector stickyKeyConsumerSelector;
protected boolean isKeyHashRangeFiltered = false;
protected CompletableFuture<Void> closeFuture = null;
protected final int partitionIndex;
protected final ManagedCursor cursor;
// This dispatcher supports both the Exclusive and Failover subscription types
protected final SubType subscriptionType;
protected static final int FALSE = 0;
protected static final int TRUE = 1;
protected static final AtomicIntegerFieldUpdater<AbstractDispatcherSingleActiveConsumer> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;
protected boolean isFirstRead = true;
private static final int CONSUMER_CONSISTENT_HASH_REPLICAS = 100;
public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig, ManagedCursor cursor) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
this.subscriptionType = subscriptionType;
this.cursor = cursor;
ACTIVE_CONSUMER_UPDATER.set(this, null);
}
protected abstract void scheduleReadOnActiveConsumer();
protected abstract void readMoreEntries(Consumer consumer);
protected abstract void cancelPendingRead();
protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
consumer.notifyActiveConsumerChange(activeConsumer));
}
}
/**
* Pick active consumer for a topic for {@link SubType#Failover} subscription.
* If it's a non-partitioned topic then it'll pick consumer based on order they subscribe to the topic.
* If is's a partitioned topic, first sort consumers based on their priority level and consumer name then
* distributed partitions evenly across consumers with highest priority level.
*
* @return the true consumer if the consumer is changed, otherwise false.
*/
protected boolean pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());
AtomicBoolean hasPriorityConsumer = new AtomicBoolean(false);
consumers.sort((c1, c2) -> {
int priority = c1.getPriorityLevel() - c2.getPriorityLevel();
if (priority != 0) {
hasPriorityConsumer.set(true);
return priority;
}
return c1.consumerName().compareTo(c2.consumerName());
});
int consumersSize = consumers.size();
// find number of consumers which are having the highest priorities. so partitioned-topic assignment happens
// evenly across highest priority consumers
if (hasPriorityConsumer.get()) {
int highestPriorityLevel = consumers.get(0).getPriorityLevel();
for (int i = 0; i < consumers.size(); i++) {
if (highestPriorityLevel != consumers.get(i).getPriorityLevel()) {
consumersSize = i;
break;
}
}
}
int index = partitionIndex >= 0
? partitionIndex % consumersSize
: peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index));
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (prevConsumer == activeConsumer) {
// Active consumer did not change. Do nothing at this point
return false;
} else {
// If the active consumer is changed, send notification.
scheduleReadOnActiveConsumer();
return true;
}
}
private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) {
int hash = Murmur3Hash32.getInstance().makeHash(topicName);
Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash);
return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue();
}
private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) {
NavigableMap<Integer, Integer> hashRing = new TreeMap<>();
for (int i = 0; i < consumerSize; i++) {
for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) {
String key = consumers.get(i).consumerName() + j;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
hashRing.put(hash, i);
}
}
return Collections.unmodifiableNavigableMap(hashRing);
}
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer);
consumer.disconnect();
return CompletableFuture.completedFuture(null);
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected"));
}
if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) {
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit",
this.topicName);
return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit"));
}
if (subscriptionType == SubType.Exclusive
&& consumer.getKeySharedMeta() != null
&& consumer.getKeySharedMeta().getHashRangesList() != null
&& consumer.getKeySharedMeta().getHashRangesList().size() > 0) {
stickyKeyConsumerSelector = new HashRangeExclusiveStickyKeyConsumerSelector();
stickyKeyConsumerSelector.addConsumer(consumer);
isKeyHashRangeFiltered = true;
} else {
isKeyHashRangeFiltered = false;
}
if (consumers.isEmpty()) {
isFirstRead = true;
}
consumers.add(consumer);
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (null == currentActiveConsumer) {
if (log.isDebugEnabled()) {
log.debug("Current active consumer disappears while adding consumer {}", consumer);
}
} else {
consumer.notifyActiveConsumerChange(currentActiveConsumer);
}
}
return CompletableFuture.completedFuture(null);
}
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
log.info("Removing consumer {}", consumer);
if (!consumers.remove(consumer)) {
throw new ServerMetadataException("Consumer was not connected");
}
if (consumers.isEmpty()) {
ACTIVE_CONSUMER_UPDATER.set(this, null);
}
if (closeFuture == null && !consumers.isEmpty()) {
pickAndScheduleActiveConsumer();
return;
}
cancelPendingRead();
if (consumers.isEmpty() && closeFuture != null && !closeFuture.isDone()) {
// Control reaches here only when closeFuture is created
// and no more connected consumers left.
closeFuture.complete(null);
}
}
/**
* Handle unsubscribe command from the client API For failover subscription, if consumer is connected consumer, we
* can unsubscribe.
*
* @param consumer
* Calling consumer object
*/
public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}
public boolean isClosed() {
return isClosed == TRUE;
}
/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
*
* @return
*/
public synchronized CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (!consumers.isEmpty()) {
consumers.forEach(consumer -> consumer.disconnect(isResetCursor));
cancelPendingRead();
} else {
// no consumer connected, complete disconnect immediately
closeFuture.complete(null);
}
return closeFuture;
}
public synchronized CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
closeFuture = new CompletableFuture<>();
if (activeConsumer != null) {
activeConsumer.disconnect(isResetCursor);
}
closeFuture.complete(null);
return closeFuture;
}
@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}
public void reset() {
resetCloseFuture();
IS_CLOSED_UPDATER.set(this, FALSE);
}
public SubType getType() {
return subscriptionType;
}
public Consumer getActiveConsumer() {
return ACTIVE_CONSUMER_UPDATER.get(this);
}
@Override
public List<Consumer> getConsumers() {
return consumers;
}
public boolean isConsumerConnected() {
return ACTIVE_CONSUMER_UPDATER.get(this) != null;
}
private static final Logger log = LoggerFactory.getLogger(AbstractDispatcherSingleActiveConsumer.class);
}