blob: f01ee7c963625cf73b476a5ae352edd975aac278 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.events.TopicPoliciesEvent;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cached topic policies service will cache the system topic reader and the topic policies
*
* While reader cache for the namespace was removed, the topic policies will remove automatically.
*/
public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {
private final PulsarService pulsarService;
private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
@VisibleForTesting
final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>();
private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap<>();
private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
@VisibleForTesting
final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}
@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}
@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
}
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture = systemTopicClient.newWriterAsync();
writerFuture.whenComplete((writer, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
if (messageId != null) {
result.complete(null);
} else {
result.completeExceptionally(new RuntimeException("Got message id is null."));
}
}
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
}
}
});
})
);
}
});
return result;
}
private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) {
return PulsarEvent.builder()
.actionType(actionType)
.eventType(EventType.TOPIC_POLICY)
.topicPoliciesEvent(
TopicPoliciesEvent.builder()
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build();
}
private void notifyListener(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(null);
}
}
return;
}
if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
return;
}
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(),
event.getNamespace(), event.getTopic());
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(policies);
}
}
}
@Override
public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
}
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
}
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.complete(null);
return result;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newReaderAsync().thenAccept(r ->
fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result));
return result;
}
@Override
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
result.complete(null);
return result;
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
result.complete(null);
} else {
prepareInitPoliciesCache(namespace, result);
}
}
return result;
}
private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
readerCompletableFuture.thenAccept(reader -> {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}).exceptionally(ex -> {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
result.completeExceptionally(ex);
return null;
});
}
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result);
return result;
}
@Override
public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.checkHeartbeatNamespace(namespace) != null
|| NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) {
return CompletableFuture.completedFuture(null);
}
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
cleanCacheAndCloseReader(namespace, true);
}
return CompletableFuture.completedFuture(null);
}
@Override
public void start() {
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(
new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
addOwnedNamespaceBundleAsync(bundle);
}
@Override
public void unLoad(NamespaceBundle bundle) {
removeOwnedNamespaceBundleAsync(bundle);
}
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}
});
}
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
log.error("[{}] Failed to check the move events for the system topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return;
}
if (hasMore) {
reader.readNextAsync().thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
}
initPolicesCache(reader, future);
}).exceptionally(e -> {
log.error("[{}] Failed to read event from the system topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(topicPolicies);
}
}
}));
future.complete(null);
}
});
}
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
.exceptionally(ex -> {
log.warn("[{}] Close change_event reader fail.", namespace, ex);
return null;
});
}
policyCacheInitMap.remove(namespace);
}
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
})
.whenComplete((__, ex) -> {
if (ex == null) {
readMorePolicies(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
log.error("Read more topic policies exception, close the read now!", ex);
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
}
}
});
}
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
policiesCache.remove(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
return;
}
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName =
TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
switch (msg.getValue().getActionType()) {
case INSERT:
TopicPolicies old = policiesCache.putIfAbsent(topicName, event.getPolicies());
if (old != null) {
log.warn("Policy insert failed, the topic: {}' policy already exist", topicName);
}
break;
case UPDATE:
policiesCache.put(topicName, event.getPolicies());
break;
case DELETE:
// Since PR #11928, this branch is no longer needed.
// However, due to compatibility, it is temporarily retained here
// and can be deleted in the future.
policiesCache.remove(topicName);
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
log.error("Failed to create system topic factory");
break;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
.whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
if (ex != null) {
log.error("close writer failed ", ex);
}
})));
break;
case NONE:
break;
default:
log.warn("Unknown event action type: {}", msg.getValue().getActionType());
break;
}
}
}
private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
if (namespaceEventsSystemTopicFactory == null) {
synchronized (this) {
if (namespaceEventsSystemTopicFactory == null) {
try {
namespaceEventsSystemTopicFactory =
new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
throw e;
}
}
}
}
}
private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> reader,
TopicName topicName, TopicPolicies policies,
CompletableFuture<TopicPolicies> future) {
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
if (hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
if (e != null) {
future.completeExceptionally(e);
}
if (msg.getValue() != null
&& EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent();
if (topicName.equals(TopicName.get(
topicPoliciesEvent.getDomain(),
topicPoliciesEvent.getTenant(),
topicPoliciesEvent.getNamespace(),
topicPoliciesEvent.getTopic()))
) {
fetchTopicPoliciesAsyncAndCloseReader(reader, topicName,
topicPoliciesEvent.getPolicies(), future);
} else {
fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, policies, future);
}
} else {
future.complete(null);
}
});
} else {
future.complete(policies);
reader.closeAsync().whenComplete((v, e) -> {
if (e != null) {
log.error("[{}] Close reader error.", topicName, e);
}
});
}
});
}
@VisibleForTesting
long getPoliciesCacheSize() {
return policiesCache.size();
}
@VisibleForTesting
long getReaderCacheCount() {
return readerCaches.size();
}
@VisibleForTesting
boolean checkReaderIsCached(NamespaceName namespaceName) {
return readerCaches.get(namespaceName) != null;
}
@VisibleForTesting
public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
@Override
public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
listeners.compute(topicName, (k, topicListeners) -> {
if (topicListeners == null) {
topicListeners = Lists.newCopyOnWriteArrayList();
}
topicListeners.add(listener);
return topicListeners;
});
}
@Override
public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
listeners.compute(topicName, (k, topicListeners) -> {
if (topicListeners != null){
topicListeners.remove(listener);
if (topicListeners.isEmpty()) {
topicListeners = null;
}
}
return topicListeners;
});
}
@Override
public void clean(TopicName topicName) {
TopicName realTopicName = topicName;
if (topicName.isPartitioned()) {
//change persistent://tenant/namespace/xxx-partition-0 to persistent://tenant/namespace/xxx
realTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
listeners.remove(realTopicName);
}
@VisibleForTesting
protected Map<TopicName, TopicPolicies> getPoliciesCache() {
return policiesCache;
}
@VisibleForTesting
protected Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> getListeners() {
return listeners;
}
private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
}