blob: 73a9f333cc7a4c6205e0a003f16b575624edd97d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A class encapsulating some of the logic around metadata.
* <p>
* This class is shared by the client thread (for partitioning) and the background sender thread.
*
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
*/
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private Cluster cluster;
private boolean needUpdate;
private final Set<String> topics;
private final List<Listener> listeners;
private boolean needMetadataForAllTopics;
/**
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
this(100L, 60 * 60 * 1000L);
}
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
this.topics = new HashSet<String>();
this.listeners = new ArrayList<>();
this.needMetadataForAllTopics = false;
}
/**
* Get the current cluster info without blocking
*/
public synchronized Cluster fetch() {
return this.cluster;
}
/**
* Add the topic to maintain in the metadata
*/
public synchronized void add(String topic) {
topics.add(topic);
}
/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
* is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
/**
* Request an update of the current cluster metadata info, return the current version before the update
*/
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
/**
* Replace the current set of topics maintained to the one provided
* @param topics
*/
public synchronized void setTopics(Collection<String> topics) {
if (!this.topics.containsAll(topics))
requestUpdate();
this.topics.clear();
this.topics.addAll(topics);
}
/**
* Get the list of topics we are currently maintaining metadata for
*/
public synchronized Set<String> topics() {
return new HashSet<String>(this.topics);
}
/**
* Check if a topic is already in the topic set.
* @param topic topic to check
* @return true if the topic exists, false otherwise
*/
public synchronized boolean containsTopic(String topic) {
return this.topics.contains(topic);
}
/**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
}
/**
* @return The current metadata version
*/
public synchronized int version() {
return this.version;
}
/**
* The last time metadata was successfully updated.
*/
public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}
/**
* The metadata refresh backoff in ms
*/
public long refreshBackoff() {
return refreshBackoffMs;
}
/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadaForAllTopics boolean indicating need for metadata of all topics in cluster.
*/
public synchronized void needMetadataForAllTopics(boolean needMetadaForAllTopics) {
this.needMetadataForAllTopics = needMetadaForAllTopics;
}
/**
* Get whether metadata for all topics is needed or not
*/
public synchronized boolean needMetadataForAllTopics() {
return this.needMetadataForAllTopics;
}
/**
* Add a Metadata listener that gets notified of metadata updates
*/
public synchronized void addListener(Listener listener) {
this.listeners.add(listener);
}
/**
* Stop notifying the listener of metadata updates
*/
public synchronized void removeListener(Listener listener) {
this.listeners.remove(listener);
}
/**
* MetadataUpdate Listener
*/
public interface Listener {
void onMetadataUpdate(Cluster cluster);
}
private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
if (cluster != null) {
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics);
for (String topic : this.topics) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
nodes = cluster.nodes();
}
return new Cluster(nodes, partitionInfos, unauthorizedTopics);
}
}