blob: b4521c3a584cff1a22e2b13363e67f8f475ecfc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* A class encapsulating some of the logic around metadata.
* <p>
* This class is shared by the client thread (for partitioning) and the background sender thread.
*
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
* <p>
* If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
* is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
*/
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private Cluster cluster;
private boolean needUpdate;
/* Topics with expiry time */
private final Map<String, Long> topics;
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
}
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
* don't exist will be created by the broker when a metadata request is sent
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashMap<>();
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
this.needMetadataForAllTopics = false;
}
/**
* Get the current cluster info without blocking
*/
public synchronized Cluster fetch() {
return this.cluster;
}
/**
* Add the topic to maintain in the metadata. If topic expiry is enabled, expiry time
* will be reset on the next update.
*/
public synchronized void add(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
* is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
/**
* Request an update of the current cluster metadata info, return the current version before the update
*/
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
/**
* Check whether an update has been explicitly requested.
* @return true if an update was requested, false otherwise
*/
public synchronized boolean updateRequested() {
return this.needUpdate;
}
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
/**
* Replace the current set of topics maintained to the one provided.
* If topic expiry is enabled, expiry time of the topics will be
* reset on the next update.
* @param topics
*/
public synchronized void setTopics(Collection<String> topics) {
if (!this.topics.keySet().containsAll(topics)) {
requestUpdateForNewTopics();
}
this.topics.clear();
for (String topic : topics)
this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
}
/**
* Get the list of topics we are currently maintaining metadata for
*/
public synchronized Set<String> topics() {
return new HashSet<>(this.topics.keySet());
}
/**
* Check if a topic is already in the topic set.
* @param topic topic to check
* @return true if the topic exists, false otherwise
*/
public synchronized boolean containsTopic(String topic) {
return this.topics.containsKey(topic);
}
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param now current time in milliseconds
*/
public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
}
}
}
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
this.cluster = cluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!cluster.isBootstrapConfigured()) {
String clusterId = cluster.clusterResource().clusterId();
if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
clusterResourceListeners.onUpdate(cluster.clusterResource());
}
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
}
/**
* @return The current metadata version
*/
public synchronized int version() {
return this.version;
}
/**
* The last time metadata was successfully updated.
*/
public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.
*/
public synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics) {
if (needMetadataForAllTopics && !this.needMetadataForAllTopics) {
requestUpdateForNewTopics();
}
this.needMetadataForAllTopics = needMetadataForAllTopics;
}
/**
* Get whether metadata for all topics is needed or not
*/
public synchronized boolean needMetadataForAllTopics() {
return this.needMetadataForAllTopics;
}
/**
* Add a Metadata listener that gets notified of metadata updates
*/
public synchronized void addListener(Listener listener) {
this.listeners.add(listener);
}
/**
* Stop notifying the listener of metadata updates
*/
public synchronized void removeListener(Listener listener) {
this.listeners.remove(listener);
}
/**
* MetadataUpdate Listener
*/
public interface Listener {
/**
* Callback invoked on metadata update.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
*/
void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
}
private synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
requestUpdate();
}
private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet();
Node controller = null;
String clusterId = null;
if (cluster != null) {
clusterId = cluster.clusterResource().clusterId();
internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics.keySet());
for (String topic : this.topics.keySet()) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
partitionInfos.addAll(partitionInfoList);
}
}
nodes = cluster.nodes();
controller = cluster.controller();
}
return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
}
}