| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.processor.internals; |
| |
| import java.util.Comparator; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Function; |
| import java.util.Optional; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.serialization.Serializer; |
| import org.apache.kafka.common.utils.LogContext; |
| import org.apache.kafka.streams.KafkaStreams; |
| import org.apache.kafka.streams.KeyQueryMetadata; |
| import org.apache.kafka.streams.processor.StateStore; |
| import org.apache.kafka.streams.processor.StreamPartitioner; |
| import org.apache.kafka.streams.state.HostInfo; |
| import org.apache.kafka.streams.StreamsMetadata; |
| import org.apache.kafka.streams.state.internals.StreamsMetadataImpl; |
| |
| import org.slf4j.Logger; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| |
| import static org.apache.kafka.clients.producer.RecordMetadata.UNKNOWN_PARTITION; |
| |
| /** |
| * Provides access to the {@link StreamsMetadata} in a KafkaStreams application. This can be used |
| * to discover the locations of {@link org.apache.kafka.streams.processor.StateStore}s |
| * in a KafkaStreams application |
| */ |
| public class StreamsMetadataState { |
| private final Logger log; |
| public static final HostInfo UNKNOWN_HOST = HostInfo.unavailable(); |
| private final TopologyMetadata topologyMetadata; |
| private final Set<String> globalStores; |
| private final HostInfo thisHost; |
| private List<StreamsMetadata> allMetadata = Collections.emptyList(); |
| private Map<String, List<PartitionInfo>> partitionsByTopic; |
| private final AtomicReference<StreamsMetadata> localMetadata = new AtomicReference<>(null); |
| |
| public StreamsMetadataState(final TopologyMetadata topologyMetadata, |
| final HostInfo thisHost, |
| final LogContext logContext) { |
| this.topologyMetadata = topologyMetadata; |
| this.globalStores = this.topologyMetadata.globalStateStores().keySet(); |
| this.thisHost = thisHost; |
| this.log = logContext.logger(getClass()); |
| } |
| |
| @Override |
| public String toString() { |
| return toString(""); |
| } |
| |
| public String toString(final String indent) { |
| final StringBuilder builder = new StringBuilder(); |
| builder.append(indent).append("GlobalMetadata: ").append(allMetadata).append("\n"); |
| builder.append(indent).append("GlobalStores: ").append(globalStores).append("\n"); |
| builder.append(indent).append("My HostInfo: ").append(thisHost).append("\n"); |
| builder.append(indent).append("PartitionsByTopic: ").append(partitionsByTopic).append("\n"); |
| |
| return builder.toString(); |
| } |
| |
| /** |
| * Get the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams application} |
| * |
| * @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application |
| */ |
| public StreamsMetadata getLocalMetadata() { |
| return localMetadata.get(); |
| } |
| |
| /** |
| * Find all of the {@link StreamsMetadata}s in a |
| * {@link KafkaStreams application} |
| * |
| * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application |
| */ |
| public Collection<StreamsMetadata> getAllMetadata() { |
| return Collections.unmodifiableList(allMetadata); |
| } |
| |
| /** |
| * Find all of the {@link StreamsMetadata}s for a given storeName |
| * |
| * @param storeName the storeName to find metadata for |
| * @return A collection of {@link StreamsMetadata} that have the provided storeName |
| */ |
| public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) { |
| Objects.requireNonNull(storeName, "storeName cannot be null"); |
| if (topologyMetadata.hasNamedTopologies()) { |
| throw new IllegalArgumentException("Cannot invoke the getAllMetadataForStore(storeName) method when" |
| + "using named topologies, please use the overload that accepts" |
| + "a topologyName parameter to identify the correct store"); |
| } |
| |
| if (!isInitialized()) { |
| return Collections.emptyList(); |
| } |
| |
| if (globalStores.contains(storeName)) { |
| return allMetadata; |
| } |
| |
| final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, null); |
| if (sourceTopics.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| final ArrayList<StreamsMetadata> results = new ArrayList<>(); |
| for (final StreamsMetadata metadata : allMetadata) { |
| if (metadata.stateStoreNames().contains(storeName) || metadata.standbyStateStoreNames().contains(storeName)) { |
| results.add(metadata); |
| } |
| } |
| return results; |
| } |
| |
| /** |
| * Find all of the {@link StreamsMetadata}s for a given storeName in the given topology |
| * |
| * @param storeName the storeName to find metadata for |
| * @param topologyName the storeName to find metadata for |
| * @return A collection of {@link StreamsMetadata} that have the provided storeName |
| */ |
| public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName, final String topologyName) { |
| Objects.requireNonNull(storeName, "storeName cannot be null"); |
| Objects.requireNonNull(topologyName, "topologyName cannot be null"); |
| |
| if (!isInitialized()) { |
| return Collections.emptyList(); |
| } |
| |
| final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, topologyName); |
| if (sourceTopics.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| final ArrayList<StreamsMetadata> results = new ArrayList<>(); |
| for (final StreamsMetadata metadata : allMetadata) { |
| final String metadataTopologyName = ((StreamsMetadataImpl) metadata).topologyName(); |
| if (metadataTopologyName != null && metadataTopologyName.equals(topologyName) |
| && metadata.stateStoreNames().contains(storeName) || metadata.standbyStateStoreNames().contains(storeName)) { |
| results.add(metadata); |
| } |
| } |
| return results; |
| } |
| |
| public synchronized Collection<StreamsMetadata> getAllMetadataForTopology(final String topologyName) { |
| Objects.requireNonNull(topologyName, "topologyName cannot be null"); |
| |
| if (!isInitialized()) { |
| return Collections.emptyList(); |
| } |
| |
| final ArrayList<StreamsMetadata> results = new ArrayList<>(); |
| for (final StreamsMetadata metadata : allMetadata) { |
| final String metadataTopologyName = ((StreamsMetadataImpl) metadata).topologyName(); |
| if (metadataTopologyName != null && metadataTopologyName.equals(topologyName)) { |
| results.add(metadata); |
| } |
| } |
| return results; |
| } |
| |
| /** |
| * Find the {@link KeyQueryMetadata}s for a given storeName and key. This method will use the |
| * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used |
| * please use {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead. |
| * |
| * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, |
| * this method provides a way of finding which {@link KeyQueryMetadata} it would exist on. |
| * |
| * @param storeName Name of the store |
| * @param key Key to use |
| * @param keySerializer Serializer for the key |
| * @param <K> key type |
| * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE} |
| * if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found, |
| * or null if no matching metadata could be found. |
| */ |
| public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final Serializer<K> keySerializer) { |
| Objects.requireNonNull(keySerializer, "keySerializer can't be null"); |
| if (topologyMetadata.hasNamedTopologies()) { |
| throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)" |
| + "method when using named topologies, please use the overload that" |
| + "accepts a topologyName parameter to identify the correct store"); |
| } |
| return getKeyQueryMetadataForKey(storeName, |
| key, |
| new DefaultStreamPartitioner<>(keySerializer)); |
| } |
| |
| /** |
| * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, Serializer)} |
| */ |
| public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final Serializer<K> keySerializer, |
| final String topologyName) { |
| Objects.requireNonNull(keySerializer, "keySerializer can't be null"); |
| return getKeyQueryMetadataForKey(storeName, |
| key, |
| new DefaultStreamPartitioner<>(keySerializer), |
| topologyName); |
| } |
| |
| /** |
| * Find the {@link KeyQueryMetadata}s for a given storeName and key |
| * |
| * Note: the key may not exist in the {@link StateStore},this method provides a way of finding which |
| * {@link StreamsMetadata} it would exist on. |
| * |
| * @param storeName Name of the store |
| * @param key Key to use |
| * @param partitioner partitioner to use to find correct partition for key |
| * @param <K> key type |
| * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE} |
| * if streams is (re-)initializing, or {@code null} if no matching metadata could be found. |
| */ |
| public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final StreamPartitioner<? super K, ?> partitioner) { |
| Objects.requireNonNull(storeName, "storeName can't be null"); |
| Objects.requireNonNull(key, "key can't be null"); |
| Objects.requireNonNull(partitioner, "partitioner can't be null"); |
| if (topologyMetadata.hasNamedTopologies()) { |
| throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, partitioner)" |
| + "method when using named topologies, please use the overload that" |
| + "accepts a topologyName parameter to identify the correct store"); |
| } |
| |
| if (!isInitialized()) { |
| return KeyQueryMetadata.NOT_AVAILABLE; |
| } |
| |
| if (globalStores.contains(storeName)) { |
| // global stores are on every node. if we don't have the host info |
| // for this host then just pick the first metadata |
| if (thisHost.equals(UNKNOWN_HOST)) { |
| return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), UNKNOWN_PARTITION); |
| } |
| return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), UNKNOWN_PARTITION); |
| } |
| |
| final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); |
| if (sourceTopicsInfo == null) { |
| return null; |
| } |
| return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo); |
| } |
| |
| /** |
| * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} |
| */ |
| public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final StreamPartitioner<? super K, ?> partitioner, |
| final String topologyName) { |
| Objects.requireNonNull(storeName, "storeName can't be null"); |
| Objects.requireNonNull(key, "key can't be null"); |
| Objects.requireNonNull(partitioner, "partitioner can't be null"); |
| Objects.requireNonNull(topologyName, "topologyName can't be null"); |
| |
| |
| if (!isInitialized()) { |
| return KeyQueryMetadata.NOT_AVAILABLE; |
| } |
| |
| final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName, topologyName); |
| if (sourceTopicsInfo == null) { |
| return null; |
| } |
| return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName); |
| } |
| |
| /** |
| * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the |
| * metadata |
| * |
| * @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions |
| * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions |
| * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo} |
| */ |
| synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, |
| final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap, |
| final Map<TopicPartition, PartitionInfo> topicPartitionInfo) { |
| this.partitionsByTopic = new HashMap<>(); |
| topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic |
| .computeIfAbsent(entry.getKey().topic(), topic -> new ArrayList<>()) |
| .add(entry.getValue()) |
| ); |
| |
| rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap); |
| } |
| |
| private boolean hasPartitionsForAnyTopics(final List<String> topicNames, final Set<TopicPartition> partitionForHost) { |
| for (final TopicPartition topicPartition : partitionForHost) { |
| if (topicNames.contains(topicPartition.topic())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private Set<String> getStoresOnHost(final Map<String, List<String>> storeToSourceTopics, |
| final Set<TopicPartition> sourceTopicPartitions) { |
| final Set<String> storesOnHost = new HashSet<>(); |
| for (final Map.Entry<String, List<String>> storeTopicEntry : storeToSourceTopics.entrySet()) { |
| final List<String> topicsForStore = storeTopicEntry.getValue(); |
| if (hasPartitionsForAnyTopics(topicsForStore, sourceTopicPartitions)) { |
| storesOnHost.add(storeTopicEntry.getKey()); |
| } |
| } |
| return storesOnHost; |
| } |
| |
| private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, |
| final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) { |
| if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) { |
| allMetadata = Collections.emptyList(); |
| localMetadata.set(new StreamsMetadataImpl( |
| thisHost, |
| Collections.emptySet(), |
| Collections.emptySet(), |
| Collections.emptySet(), |
| Collections.emptySet() |
| )); |
| return; |
| } |
| |
| allMetadata = topologyMetadata.hasNamedTopologies() ? |
| rebuildMetadataForNamedTopologies(activePartitionHostMap, standbyPartitionHostMap) : |
| rebuildMetadataForSingleTopology(activePartitionHostMap, standbyPartitionHostMap); |
| } |
| |
| private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, |
| final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) { |
| final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>(); |
| Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) |
| .distinct() |
| .sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port)) |
| .forEach(hostInfo -> { |
| for (final String topologyName : topologyMetadata.namedTopologiesView()) { |
| final Map<String, List<String>> storeToSourceTopics = |
| topologyMetadata.stateStoreNameToSourceTopicsForTopology(topologyName); |
| |
| final Set<TopicPartition> activePartitionsOnHost = new HashSet<>(); |
| final Set<String> activeStoresOnHost = new HashSet<>(); |
| if (activePartitionHostMap.containsKey(hostInfo)) { |
| // filter out partitions for topics that are not connected to this topology |
| activePartitionsOnHost.addAll( |
| activePartitionHostMap.get(hostInfo).stream() |
| .filter(tp -> topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic())) |
| .collect(Collectors.toSet()) |
| ); |
| activeStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, activePartitionsOnHost)); |
| } |
| |
| // TODO KAFKA-13281: when we add support for global stores with named topologies we will |
| // need to add the global stores to the activeStoresOnHost set |
| |
| final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>(); |
| final Set<String> standbyStoresOnHost = new HashSet<>(); |
| if (standbyPartitionHostMap.containsKey(hostInfo)) { |
| standbyPartitionsOnHost.addAll( |
| standbyPartitionHostMap.get(hostInfo).stream() |
| .filter(tp -> topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic())) |
| .collect(Collectors.toSet())); |
| standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost)); |
| } |
| |
| final StreamsMetadata metadata = new StreamsMetadataImpl( |
| hostInfo, |
| activeStoresOnHost, |
| activePartitionsOnHost, |
| standbyStoresOnHost, |
| standbyPartitionsOnHost, |
| topologyName |
| ); |
| |
| rebuiltMetadata.add(metadata); |
| if (hostInfo.equals(thisHost)) { |
| localMetadata.set(metadata); |
| } |
| } |
| |
| // Construct metadata across all topologies on this host for the `localMetadata` field |
| final Map<String, List<String>> storeToSourceTopics = topologyMetadata.stateStoreNameToSourceTopics(); |
| final Set<TopicPartition> localActivePartitions = activePartitionHostMap.get(thisHost); |
| final Set<TopicPartition> localStandbyPartitions = standbyPartitionHostMap.get(thisHost); |
| |
| localMetadata.set( |
| new StreamsMetadataImpl(thisHost, |
| getStoresOnHost(storeToSourceTopics, localActivePartitions), |
| localActivePartitions, |
| getStoresOnHost(storeToSourceTopics, localStandbyPartitions), |
| localStandbyPartitions) |
| ); |
| }); |
| return rebuiltMetadata; |
| } |
| |
| private List<StreamsMetadata> rebuildMetadataForSingleTopology(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap, |
| final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) { |
| final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>(); |
| final Map<String, List<String>> storeToSourceTopics = topologyMetadata.stateStoreNameToSourceTopics(); |
| Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream()) |
| .distinct() |
| .sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port)) |
| .forEach(hostInfo -> { |
| final Set<TopicPartition> activePartitionsOnHost = new HashSet<>(); |
| final Set<String> activeStoresOnHost = new HashSet<>(); |
| if (activePartitionHostMap.containsKey(hostInfo)) { |
| activePartitionsOnHost.addAll(activePartitionHostMap.get(hostInfo)); |
| activeStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, activePartitionsOnHost)); |
| } |
| activeStoresOnHost.addAll(globalStores); |
| |
| final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>(); |
| final Set<String> standbyStoresOnHost = new HashSet<>(); |
| if (standbyPartitionHostMap.containsKey(hostInfo)) { |
| standbyPartitionsOnHost.addAll(standbyPartitionHostMap.get(hostInfo)); |
| standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost)); |
| } |
| |
| final StreamsMetadata metadata = new StreamsMetadataImpl( |
| hostInfo, |
| activeStoresOnHost, |
| activePartitionsOnHost, |
| standbyStoresOnHost, |
| standbyPartitionsOnHost |
| ); |
| |
| rebuiltMetadata.add(metadata); |
| if (hostInfo.equals(thisHost)) { |
| localMetadata.set(metadata); |
| } |
| }); |
| return rebuiltMetadata; |
| } |
| |
| private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> { |
| if (!maybeMulticastPartitions.isPresent()) { |
| return null; |
| } |
| if (maybeMulticastPartitions.get().size() != 1) { |
| throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set"); |
| } |
| return maybeMulticastPartitions.get().iterator().next(); |
| }; |
| |
| private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final StreamPartitioner<? super K, ?> partitioner, |
| final SourceTopicsInfo sourceTopicsInfo) { |
| |
| final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions)); |
| final Set<TopicPartition> matchingPartitions = new HashSet<>(); |
| for (final String sourceTopic : sourceTopicsInfo.sourceTopics) { |
| matchingPartitions.add(new TopicPartition(sourceTopic, partition)); |
| } |
| |
| HostInfo activeHost = UNKNOWN_HOST; |
| final Set<HostInfo> standbyHosts = new HashSet<>(); |
| for (final StreamsMetadata streamsMetadata : allMetadata) { |
| final Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames(); |
| final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions()); |
| final Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames(); |
| final Set<TopicPartition> standbyTopicPartitions = new HashSet<>(streamsMetadata.standbyTopicPartitions()); |
| |
| topicPartitions.retainAll(matchingPartitions); |
| if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) { |
| activeHost = streamsMetadata.hostInfo(); |
| } |
| |
| standbyTopicPartitions.retainAll(matchingPartitions); |
| if (standbyStateStoreNames.contains(storeName) && !standbyTopicPartitions.isEmpty()) { |
| standbyHosts.add(streamsMetadata.hostInfo()); |
| } |
| } |
| |
| return new KeyQueryMetadata(activeHost, standbyHosts, partition); |
| } |
| |
| private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, |
| final K key, |
| final StreamPartitioner<? super K, ?> partitioner, |
| final SourceTopicsInfo sourceTopicsInfo, |
| final String topologyName) { |
| Objects.requireNonNull(topologyName, "topology name must not be null"); |
| final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions)); |
| final Set<TopicPartition> matchingPartitions = new HashSet<>(); |
| for (final String sourceTopic : sourceTopicsInfo.sourceTopics) { |
| matchingPartitions.add(new TopicPartition(sourceTopic, partition)); |
| } |
| |
| HostInfo activeHost = UNKNOWN_HOST; |
| final Set<HostInfo> standbyHosts = new HashSet<>(); |
| for (final StreamsMetadata streamsMetadata : allMetadata) { |
| final String metadataTopologyName = ((StreamsMetadataImpl) streamsMetadata).topologyName(); |
| if (metadataTopologyName != null && metadataTopologyName.equals(topologyName)) { |
| |
| final Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames(); |
| final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions()); |
| final Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames(); |
| final Set<TopicPartition> standbyTopicPartitions = new HashSet<>(streamsMetadata.standbyTopicPartitions()); |
| |
| topicPartitions.retainAll(matchingPartitions); |
| if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) { |
| activeHost = streamsMetadata.hostInfo(); |
| } |
| |
| standbyTopicPartitions.retainAll(matchingPartitions); |
| if (standbyStateStoreNames.contains(storeName) && !standbyTopicPartitions.isEmpty()) { |
| standbyHosts.add(streamsMetadata.hostInfo()); |
| } |
| } |
| } |
| |
| return new KeyQueryMetadata(activeHost, standbyHosts, partition); |
| } |
| |
| private SourceTopicsInfo getSourceTopicsInfo(final String storeName) { |
| return getSourceTopicsInfo(storeName, null); |
| } |
| |
| private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final String topologyName) { |
| final List<String> sourceTopics = new ArrayList<>(topologyMetadata.sourceTopicsForStore(storeName, topologyName)); |
| |
| if (sourceTopics.isEmpty()) { |
| return null; |
| } |
| return new SourceTopicsInfo(sourceTopics); |
| } |
| |
| private boolean isInitialized() { |
| return partitionsByTopic != null && !partitionsByTopic.isEmpty() && localMetadata.get() != null; |
| } |
| |
| public String getStoreForChangelogTopic(final String topicName) { |
| return topologyMetadata.getStoreForChangelogTopic(topicName); |
| } |
| |
| private class SourceTopicsInfo { |
| private final List<String> sourceTopics; |
| private int maxPartitions; |
| private String topicWithMostPartitions; |
| |
| private SourceTopicsInfo(final List<String> sourceTopics) { |
| this.sourceTopics = sourceTopics; |
| for (final String topic : sourceTopics) { |
| final List<PartitionInfo> partitions = partitionsByTopic.getOrDefault(topic, Collections.emptyList()); |
| if (partitions.size() > maxPartitions) { |
| maxPartitions = partitions.size(); |
| topicWithMostPartitions = topic; |
| } |
| } |
| } |
| } |
| } |