blob: a9b10267b44192ab1a34625dbf70dceef5b8aa55 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.naming;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.common.policies.data.Policies.FIRST_BOUNDARY;
import static org.apache.pulsar.common.policies.data.Policies.LAST_BOUNDARY;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NamespaceBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
private final HashFunction hashFunc;
private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
private final Duration maxRetryDuration = Duration.ofSeconds(10);
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
this.bundlesCache = Caffeine.newBuilder()
.recordStats()
.buildAsync(this::loadBundles);
CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
this.pulsar = pulsar;
this.policiesCache = pulsar.getConfigurationMetadataStore().getMetadataCache(Policies.class);
}
private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace, Executor executor) {
if (LOG.isDebugEnabled()) {
LOG.debug("Loading cache with bundles for {}", namespace);
}
if (pulsar == null) {
return CompletableFuture.completedFuture(getBundles(namespace, Optional.empty()));
}
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
return future;
}
private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
}
private void handleLoadBundlesRetry(NamespaceName namespace,
CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline, Throwable e) {
if (e instanceof Error || System.nanoTime() > retryDeadline) {
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
long retryDelay = backoff.next();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
}
}
private static Backoff createBackoff() {
return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}
private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version)
throws IOException {
NamespaceBundles namespaceBundles = getBundles(namespace,
Optional.of(Pair.of(localPolicies, version)));
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Get bundles from getLocalZkCacheService: bundles: {}, version: {}",
namespace,
(localPolicies.bundles.getBoundaries() != null) ? localPolicies.bundles : "null",
namespaceBundles.getVersion());
}
return namespaceBundles;
}
private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespace) {
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return CompletableFuture.completedFuture(getBundles(namespace, Optional.empty()));
}
Policies policies = optPolicies.get();
LocalPolicies localPolicies = new LocalPolicies(policies.bundles,
null,
null);
return pulsar.getPulsarResources().getLocalPolicies()
.createLocalPoliciesAsync(namespace, localPolicies)
.thenApply(stat -> getBundles(namespace,
Optional.of(Pair.of(localPolicies, 0L))));
});
}
private void handleMetadataStoreNotification(Notification n) {
if (LocalPoliciesResources.isLocalPoliciesPath(n.getPath())) {
try {
final Optional<NamespaceName> namespace = NamespaceName.getIfValid(
getNamespaceFromPoliciesPath(n.getPath()));
if (namespace.isPresent()) {
LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", namespace);
// Trigger a background refresh to fetch new bundle data from the policies
bundlesCache.synchronous().invalidate(namespace.get());
}
} catch (Exception e) {
LOG.error("Failed to update the policy change for path {}", n.getPath(), e);
}
}
}
public void invalidateBundleCache(NamespaceName namespace) {
bundlesCache.synchronous().invalidate(namespace);
}
public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname) {
return bundlesCache.get(nsname);
}
public NamespaceBundle getBundleWithHighestTopics(NamespaceName nsname) {
try {
return getBundleWithHighestTopicsAsync(nsname).get(PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC,
TimeUnit.SECONDS);
} catch (Exception e) {
LOG.info("failed to derive bundle for {}", nsname, e);
throw new IllegalStateException(e instanceof ExecutionException ? e.getCause() : e);
}
}
public CompletableFuture<NamespaceBundle> getBundleWithHighestTopicsAsync(NamespaceName nsname) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(nsname).thenCompose(topics -> {
return bundlesCache.get(nsname).handle((bundles, e) -> {
Map<String, Integer> countMap = new HashMap<>();
NamespaceBundle resultBundle = null;
int maxCount = 0;
for (String topic : topics) {
NamespaceBundle bundle = bundles.findBundle(TopicName.get(topic));
String bundleRange = bundle.getBundleRange();
int count = countMap.getOrDefault(bundleRange, 0) + 1;
countMap.put(bundleRange, count);
if (count > maxCount) {
maxCount = count;
resultBundle = bundle;
}
}
return resultBundle;
});
});
}
public NamespaceBundle getBundle(TopicName topic) {
NamespaceBundles bundles = bundlesCache.synchronous().get(topic.getNamespaceObject());
return bundles != null ? bundles.findBundle(topic) : null;
}
public NamespaceBundle getBundleWithHighestThroughput(NamespaceName nsName) {
LoadManager loadManager = pulsar.getLoadManager().get();
if (loadManager instanceof ModularLoadManagerWrapper) {
NamespaceBundles bundles = getBundles(nsName);
double maxMsgThroughput = -1;
NamespaceBundle bundleWithHighestThroughput = null;
for (NamespaceBundle bundle : bundles.getBundles()) {
BundleData bundleData = ((ModularLoadManagerWrapper) loadManager).getLoadManager()
.getBundleDataOrDefault(bundle.toString());
if (bundleData.getTopics() > 0
&& bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput) {
maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput();
bundleWithHighestThroughput = bundle;
}
}
return bundleWithHighestThroughput;
}
return getBundleWithHighestTopics(nsName);
}
public NamespaceBundles getBundles(NamespaceName nsname) {
return bundlesCache.synchronous().get(nsname);
}
public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName nsname) {
return Optional.ofNullable(bundlesCache.synchronous().getIfPresent(nsname));
}
public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
return new NamespaceBundle(nsname, hashRange, this);
}
public NamespaceBundle getBundle(String namespace, String bundleRange) {
checkArgument(bundleRange.contains("_"), "Invalid bundle range");
String[] boundaries = bundleRange.split("_");
Long lowerEndpoint = Long.decode(boundaries[0]);
Long upperEndpoint = Long.decode(boundaries[1]);
Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
return getBundle(NamespaceName.get(namespace), hashRange);
}
public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundlesCache.synchronous().get(fqnn).getFullBundle();
}
public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
return bundlesCache.get(fqnn).thenApply(NamespaceBundles::getFullBundle);
}
public long getLongHashCode(String name) {
return this.hashFunc.hashString(name, Charsets.UTF_8).padToLong();
}
public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
return new NamespaceBundles(nsname, this, Optional.empty(), NamespaceBundles.getPartitions(bundleData));
}
private NamespaceBundles getBundles(NamespaceName nsname, Optional<Pair<LocalPolicies, Long>> localPolicies) {
return new NamespaceBundles(nsname, this, localPolicies);
}
/**
* Fetches {@link NamespaceBundles} from cache for a given namespace. finds target bundle, split into numBundles and
* returns new {@link NamespaceBundles} with newly split bundles into it.
*
* @param targetBundle
* {@link NamespaceBundle} needs to be split
* @param argNumBundles
* split into numBundles
* @param splitBoundary
* split into 2 numBundles by the given split key. The given split key must between the key range of the
* given split bundle.
* @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including
* split bundles for a given namespace
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(
NamespaceBundle targetBundle, int argNumBundles, Long splitBoundary) {
checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle);
if (splitBoundary != null) {
checkArgument(splitBoundary > targetBundle.getLowerEndpoint()
&& splitBoundary < targetBundle.getUpperEndpoint(),
"The given fixed key must between the key range of the %s bundle", targetBundle);
argNumBundles = 2;
}
checkNotNull(targetBundle, "can't split null bundle");
checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
NamespaceName nsname = targetBundle.getNamespaceObject();
final int numBundles = argNumBundles;
return bundlesCache.get(nsname).thenApply(sourceBundle -> {
final int lastIndex = sourceBundle.partitions.length - 1;
final long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
int pos = 0;
int splitPartition = -1;
final Range<Long> range = targetBundle.getKeyRange();
for (int i = 0; i < lastIndex; i++) {
if (sourceBundle.partitions[i] == range.lowerEndpoint()
&& (range.upperEndpoint() == sourceBundle.partitions[i + 1])) {
splitPartition = i;
long maxVal = sourceBundle.partitions[i + 1];
long minVal = sourceBundle.partitions[i];
long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal;
partitions[pos++] = minVal;
long curPartition = minVal + segSize;
for (int j = 0; j < numBundles - 1; j++) {
partitions[pos++] = curPartition;
curPartition += segSize;
}
} else {
partitions[pos++] = sourceBundle.partitions[i];
}
}
partitions[pos] = sourceBundle.partitions[lastIndex];
if (splitPartition != -1) {
// keep version of sourceBundle
NamespaceBundles splitNsBundles =
new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition,
(splitPartition + numBundles));
return new ImmutablePair<>(splitNsBundles, splitBundles);
}
return null;
});
}
public boolean canSplitBundle(NamespaceBundle bundle) {
Range<Long> range = bundle.getKeyRange();
return range.upperEndpoint() - range.lowerEndpoint() > 1;
}
public static void validateFullRange(SortedSet<String> partitions) {
checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY));
}
public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
return new NamespaceBundleFactory(pulsar, hashFunc);
}
public static boolean isFullBundle(String bundleRange) {
return bundleRange.equals(String.format("%s_%s", FIRST_BOUNDARY, LAST_BOUNDARY));
}
public static String getDefaultBundleRange() {
return String.format("%s_%s", FIRST_BOUNDARY, LAST_BOUNDARY);
}
/*
* @param path - path for the namespace policies ex. /admin/policies/prop/cluster/namespace
*
* @returns namespace with path, ex. prop/cluster/namespace
*/
public static String getNamespaceFromPoliciesPath(String path) {
if (path.isEmpty()) {
return path;
}
// String before / is considered empty string by splitter
Iterable<String> splitter = Splitter.on("/").limit(6).split(path);
Iterator<String> i = splitter.iterator();
// skip first three - "","admin", "policies"
i.next();
i.next();
i.next();
// prop, cluster, namespace
return Joiner.on("/").join(i);
}
}