blob: aea5b9fc65b463e4c4db353883b59bbe4fe8aade [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicListService {
public static class TopicListWatcher implements BiConsumer<String, NotificationType> {
/** Topic names which are matching, the topic name contains the partition suffix. **/
private final List<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
/** The regexp for the topic name(not contains partition suffix). **/
private final Pattern topicsPattern;
/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(TopicListService topicListService, long id,
Pattern topicsPattern, List<String> topics) {
this.topicListService = topicListService;
this.id = id;
this.topicsPattern = topicsPattern;
this.matchingTopics = TopicList.filterTopics(topics, topicsPattern);
}
public List<String> getMatchingTopics() {
return matchingTopics;
}
/***
* @param topicName topic name which contains partition suffix.
*/
@Override
public void accept(String topicName, NotificationType notificationType) {
if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) {
List<String> newTopics;
List<String> deletedTopics;
if (notificationType == NotificationType.Deleted) {
newTopics = Collections.emptyList();
deletedTopics = Collections.singletonList(topicName);
matchingTopics.remove(topicName);
} else {
deletedTopics = Collections.emptyList();
newTopics = Collections.singletonList(topicName);
matchingTopics.add(topicName);
}
String hash = TopicList.calculateHash(matchingTopics);
topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics);
}
}
}
private static final Logger log = LoggerFactory.getLogger(TopicListService.class);
private final NamespaceService namespaceService;
private final TopicResources topicResources;
private final ServerCnx connection;
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> watchers;
public TopicListService(PulsarService pulsar, ServerCnx connection,
boolean enableSubscriptionPatternEvaluation, int maxSubscriptionPatternLength) {
this.namespaceService = pulsar.getNamespaceService();
this.connection = connection;
this.enableSubscriptionPatternEvaluation = enableSubscriptionPatternEvaluation;
this.maxSubscriptionPatternLength = maxSubscriptionPatternLength;
this.watchers = ConcurrentLongHashMap.<CompletableFuture<TopicListWatcher>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.topicResources = pulsar.getPulsarResources().getTopicResources();
}
public void inactivate() {
for (Long watcherId : new HashSet<>(watchers.keys())) {
deleteTopicListWatcher(watcherId);
}
}
/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern,
String topicsHash, Semaphore lookupSemaphore) {
if (!enableSubscriptionPatternEvaluation || topicsPattern.pattern().length() > maxSubscriptionPatternLength) {
String msg = "Unable to create topic list watcher: ";
if (!enableSubscriptionPatternEvaluation) {
msg += "Evaluating subscription patterns is disabled.";
} else {
msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength;
}
log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName);
connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg);
lookupSemaphore.release();
return;
}
CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>();
CompletableFuture<TopicListWatcher> existingWatcherFuture = watchers.putIfAbsent(watcherId, watcherFuture);
if (existingWatcherFuture != null) {
if (existingWatcherFuture.isDone() && !existingWatcherFuture.isCompletedExceptionally()) {
TopicListWatcher watcher = existingWatcherFuture.getNow(null);
log.info("[{}] Watcher with the same id is already created:"
+ " watcherId={}, watcher={}",
connection.getRemoteAddress(), watcherId, watcher);
watcherFuture = existingWatcherFuture;
} else {
// There was an early request to create a watcher with the same watcherId. This can happen when
// client timeout is lower the broker timeouts. We need to wait until the previous watcher
// creation request either completes or fails.
log.warn("[{}] Watcher with id is already present on the connection,"
+ " consumerId={}", connection.getRemoteAddress(), watcherId);
ServerError error;
if (!existingWatcherFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = ServerError.UnknownError;
watchers.remove(watcherId, existingWatcherFuture);
}
connection.getCommandSender().sendErrorResponse(requestId, error,
"Topic list watcher is already present on the connection");
lookupSemaphore.release();
return;
}
} else {
initializeTopicsListWatcher(watcherFuture, namespaceName, watcherId, topicsPattern);
}
CompletableFuture<TopicListWatcher> finalWatcherFuture = watcherFuture;
finalWatcherFuture.thenAccept(watcher -> {
List<String> topicList = watcher.getMatchingTopics();
String hash = TopicList.calculateHash(topicList);
if (hash.equals(topicsHash)) {
topicList = Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received WatchTopicList for namespace [//{}] by {}",
connection.getRemoteAddress(), namespaceName, requestId);
}
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList);
lookupSemaphore.release();
})
.exceptionally(ex -> {
log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}",
connection.getRemoteAddress(), namespaceName, requestId);
connection.getCommandSender().sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(
new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
watchers.remove(watcherId, finalWatcherFuture);
lookupSemaphore.release();
return null;
});
}
/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, Pattern topicsPattern) {
namespaceService.getListOfPersistentTopics(namespace).
thenApply(topics -> {
TopicListWatcher watcher = new TopicListWatcher(this, watcherId, topicsPattern, topics);
topicResources.registerPersistentTopicListener(namespace, watcher);
return watcher;
}).
whenComplete((watcher, exception) -> {
if (exception != null) {
watcherFuture.completeExceptionally(exception);
} else {
if (!watcherFuture.complete(watcher)) {
log.warn("[{}] Watcher future was already completed. Deregistering watcherId={}.",
connection.getRemoteAddress(), watcherId);
topicResources.deregisterPersistentTopicListener(watcher);
}
}
});
}
public void handleWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) {
long requestId = commandWatchTopicListClose.getRequestId();
long watcherId = commandWatchTopicListClose.getWatcherId();
deleteTopicListWatcher(watcherId);
connection.getCommandSender().sendSuccessResponse(requestId);
}
public void deleteTopicListWatcher(Long watcherId) {
CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId);
if (watcherFuture == null) {
log.info("[{}] TopicListWatcher was not registered on the connection: {}",
watcherId, connection.getRemoteAddress());
return;
}
if (!watcherFuture.isDone() && watcherFuture
.completeExceptionally(new IllegalStateException("Closed watcher before creation was complete"))) {
// We have received a request to close the watcher before it was actually completed, we have marked the
// watcher future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new watcher will be discarded.
log.info("[{}] Closed watcher before its creation was completed. watcherId={}",
connection.getRemoteAddress(), watcherId);
watchers.remove(watcherId);
return;
}
if (watcherFuture.isCompletedExceptionally()) {
log.info("[{}] Closed watcher that already failed to be created. watcherId={}",
connection.getRemoteAddress(), watcherId);
watchers.remove(watcherId);
return;
}
// Proceed with normal watcher close
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
watchers.remove(watcherId);
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
}
/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash, List<String> deletedTopics,
List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, deletedTopics, topicsHash);
}
}