blob: 056fd0fb645c9f1f3035e9993def46665910ee2e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;
import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentSubscription implements Subscription {
private final NonPersistentTopic topic;
private volatile NonPersistentDispatcher dispatcher;
private final String topicName;
private final String subName;
private final String fullName;
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<NonPersistentSubscription> IS_FENCED_UPDATER =
AtomicIntegerFieldUpdater
.newUpdater(NonPersistentSubscription.class, "isFenced");
@SuppressWarnings("unused")
private volatile int isFenced = FALSE;
// Timestamp of when this subscription was last seen active
private volatile long lastActive;
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;
private KeySharedMode keySharedMode = null;
public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
this.isDurable = isDurable;
}
@Override
public BrokerInterceptor interceptor() {
return this.topic.getBrokerService().getInterceptor();
}
@Override
public String getName() {
return this.subName;
}
@Override
public Topic getTopic() {
return topic;
}
@Override
public boolean isReplicated() {
return false;
}
@Override
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
return FutureUtil.failedFuture(new SubscriptionFencedException("Subscription is fenced"));
}
if (dispatcher == null || !dispatcher.isConsumerConnected()) {
Dispatcher previousDispatcher = null;
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this);
}
break;
case Failover:
int partitionIndex = TopicName.getPartitionIndex(topicName);
if (partitionIndex < 0) {
// For non partition topics, assume index 0 to pick a predictable consumer
partitionIndex = 0;
}
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
previousDispatcher = dispatcher;
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
topic, this);
}
break;
case Key_Shared:
KeySharedMeta ksm = consumer.getKeySharedMeta();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| !((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
.hasSameKeySharedPolicy(ksm)) {
previousDispatcher = dispatcher;
this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
break;
default:
return FutureUtil.failedFuture(new ServerMetadataException("Unsupported subscription type"));
}
if (previousDispatcher != null) {
previousDispatcher.close().thenRun(() -> {
log.info("[{}][{}] Successfully closed previous dispatcher", topicName, subName);
}).exceptionally(ex -> {
log.error("[{}][{}] Failed to close previous dispatcher", topicName, subName, ex);
return null;
});
}
} else {
if (consumer.subType() != dispatcher.getType()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription is of different type"));
}
}
try {
dispatcher.addConsumer(consumer);
return CompletableFuture.completedFuture(null);
} catch (BrokerServiceException brokerServiceException) {
return FutureUtil.failedFuture(brokerServiceException);
}
}
@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
// preserve accumulative stats form removed consumer
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
if (!isDurable) {
topic.unsubscribe(subName);
}
// invalid consumer remove will throw an exception
// decrement usage is triggered only for valid consumer close
topic.decrementUsageCount();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", topic.getName(), subName, consumer.consumerName(),
topic.currentUsageCount());
}
}
@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}
@Override
public void acknowledgeMessage(List<Position> position, AckType ackType, Map<String, Long> properties) {
// No-op
}
@Override
public String toString() {
return fullName;
}
@Override
public String getTopicName() {
return this.topicName;
}
@Override
public SubType getType() {
return dispatcher != null ? dispatcher.getType() : null;
}
@Override
public String getTypeString() {
SubType type = getType();
if (type == null) {
return "None";
}
switch (type) {
case Exclusive:
return "Exclusive";
case Failover:
return "Failover";
case Shared:
return "Shared";
case Key_Shared:
return "Key_Shared";
}
return "Null";
}
@Override
public CompletableFuture<Void> clearBacklog() {
// No-op
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> skipMessages(int numMessagesToSkip) {
// No-op
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
// No-op
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
// No-op
return CompletableFuture.completedFuture(null); // TODO: throw exception
}
@Override
public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
// No-op
return 0;
}
@Override
public NonPersistentDispatcher getDispatcher() {
return this.dispatcher;
}
@Override
public CompletableFuture<Void> close() {
IS_FENCED_UPDATER.set(this, TRUE);
return CompletableFuture.completedFuture(null);
}
/**
* Disconnect all consumers attached to the dispatcher and close this subscription.
*
* @return CompletableFuture indicating the completion of disconnect operation
*/
@Override
public synchronized CompletableFuture<Void> disconnect() {
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(v -> close())
.thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
if (dispatcher != null) {
dispatcher.reset();
}
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
return null;
});
return disconnectFuture;
}
/**
* Delete the subscription by closing and deleting its managed cursor if no consumers are connected to it. Handle
* unsubscribe call from admin layer.
*
* @return CompletableFuture indicating the completion of delete operation
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false);
}
/**
* Forcefully close all consumers and deletes the subscription.
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(true);
}
/**
* Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer.
*
* @param closeIfConsumersConnected
* Flag indicate whether explicitly close connected consumers before trying to delete subscription. If
* any consumer is connected to it and if this flag is disable then this operation fails.
* @return CompletableFuture indicating the completion of delete operation
*/
private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
log.info("[{}][{}] Unsubscribing", topicName, subName);
CompletableFuture<Void> closeSubscriptionFuture = new CompletableFuture<>();
if (closeIfConsumersConnected) {
this.disconnect().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex);
closeSubscriptionFuture.completeExceptionally(ex);
return null;
});
} else {
this.close().thenRun(() -> {
closeSubscriptionFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}][{}] Error closing subscription", topicName, subName, exception);
closeSubscriptionFuture.completeExceptionally(exception);
return null;
});
}
// cursor close handles pending delete (ack) operations
closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> {
synchronized (this) {
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
log.info("[{}][{}] Successfully deleted subscription", topicName, subName);
deleteFuture.complete(null);
}).exceptionally(ex -> {
IS_FENCED_UPDATER.set(this, FALSE);
if (dispatcher != null) {
dispatcher.reset();
}
log.error("[{}][{}] Error deleting subscription", topicName, subName, ex);
deleteFuture.completeExceptionally(ex);
return null;
});
}
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
log.error("[{}][{}] Error deleting subscription", topicName, subName, exception);
deleteFuture.completeExceptionally(exception);
return null;
});
return deleteFuture;
}
/**
* Handle unsubscribe command from the client API Check with the dispatcher is this consumer can proceed with
* unsubscribe.
*
* @param consumer consumer object that is initiating the unsubscribe operation
* @return CompletableFuture indicating the completion of ubsubscribe operation
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
if (dispatcher.canUnsubscribe(consumer)) {
consumer.close();
return delete();
}
future.completeExceptionally(
new ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
} catch (BrokerServiceException e) {
log.warn("Error removing consumer {}", consumer);
future.completeExceptionally(e);
}
return future;
}
@Override
public List<Consumer> getConsumers() {
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
return dispatcher.getConsumers();
} else {
return Collections.emptyList();
}
}
@Override
public boolean expireMessages(int messageTTLInSeconds) {
throw new UnsupportedOperationException("Expire message by timestamp is not supported for"
+ " non-persistent topic.");
}
@Override
public boolean expireMessages(Position position) {
throw new UnsupportedOperationException("Expire message by position is not supported for"
+ " non-persistent topic.");
}
public NonPersistentSubscriptionStatsImpl getStats() {
NonPersistentSubscriptionStatsImpl subStats = new NonPersistentSubscriptionStatsImpl();
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
NonPersistentDispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
dispatcher.getConsumers().forEach(consumer -> {
ConsumerStatsImpl consumerStats = consumer.getStats();
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
});
}
subStats.type = getTypeString();
subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
KeySharedMode keySharedMode = this.keySharedMode;
if (getType() == SubType.Key_Shared && keySharedMode != null) {
subStats.keySharedMode = keySharedMode.toString();
}
return subStats;
}
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
// No-op
}
@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
// No-op
}
@Override
public void addUnAckedMessages(int unAckMessages) {
// No-op
}
@Override
public double getExpiredMessageRate() {
// No-op
return 0;
}
@Override
public void markTopicWithBatchMessagePublished() {
topic.markBatchMessagePublished();
}
@Override
public CompletableFuture<Void> resetCursor(Position position) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
completableFuture.completeExceptionally(
new Exception("Unsupported operation end txn for NonPersistentSubscription"));
return completableFuture;
}
private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);
public long getLastActive() {
return lastActive;
}
public void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}
}