blob: 32839f3794af46e322e2d289a2e85dbeda40325e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.testutils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.DumperOptions;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.TypeDescription;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.constructor.Constructor;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Node;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.SequenceNode;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Tag;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Representer;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.CommonClientConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Reads metadata from yaml file and lazily refreshes periodically. This implementation assumes that
* specified topics exist in the clusters that are contained in the yaml metadata. Therefore, topic
* is used as the stream name. This is designed to integrate with K8s configmap and cluster
* migration.
*
* <p>Files must be of the form:
*
* <pre>{@code
* - streamId: stream0
* clusterMetadataList:
* - clusterId: cluster0
* bootstrapServers: bootstrap-server-0:443
* topics:
* - topic0
* - topic1
* - clusterId: cluster1
* bootstrapServers: bootstrap-server-1:443
* topics:
* - topic2
* - topic3
* - streamId: stream1
* clusterMetadataList:
* - clusterId: cluster2
* bootstrapServers: bootstrap-server-2:443
* topics:
* - topic4
* - topic5
* }</pre>
*
* <p>Typically, usage will look like: first consuming from one cluster, second adding new cluster
* and consuming from both clusters, and third consuming from only from the new cluster after all
* data from the old cluster has been read.
*/
public class YamlFileMetadataService implements KafkaMetadataService {
private static final Logger logger = LoggerFactory.getLogger(YamlFileMetadataService.class);
private final String metadataFilePath;
private final Duration refreshInterval;
private Instant lastRefresh;
// current metadata should be accessed from #getAllStreams()
private transient Set<KafkaStream> streamMetadata;
private transient Yaml yaml;
/**
* Constructs a metadata service based on cluster information stored in a file.
*
* @param metadataFilePath location of the metadata file
* @param metadataTtl ttl of metadata that controls how often to refresh
*/
public YamlFileMetadataService(String metadataFilePath, Duration metadataTtl) {
this.metadataFilePath = metadataFilePath;
this.refreshInterval = metadataTtl;
this.lastRefresh = Instant.MIN;
}
/**
* {@inheritDoc}
*
* <p>This obtains the all stream metadata and enforces the ttl configuration on the metadata.
*/
@Override
public Set<KafkaStream> getAllStreams() {
refreshIfNeeded();
return streamMetadata;
}
/** {@inheritDoc} */
@Override
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
ImmutableMap.Builder<String, KafkaStream> builder = ImmutableMap.builder();
Set<KafkaStream> streams = getAllStreams();
for (KafkaStream stream : streams) {
if (streamIds.contains(stream.getStreamId())) {
builder.put(stream.getStreamId(), stream);
}
}
return builder.build();
}
/** {@inheritDoc} */
@Override
public boolean isClusterActive(String kafkaClusterId) {
return getAllStreams().stream()
.flatMap(kafkaStream -> kafkaStream.getClusterMetadataMap().keySet().stream())
.anyMatch(cluster -> cluster.equals(kafkaClusterId));
}
/** {@inheritDoc} */
@Override
public void close() throws Exception {}
/**
* A utility method for writing metadata in the expected yaml format.
*
* @param streamMetadata list of {@link StreamMetadata}
* @param metadataFile the metadata {@link File}
*/
public static void saveToYaml(List<StreamMetadata> streamMetadata, File metadataFile)
throws IOException {
logger.debug("Writing stream infos to file: {}", streamMetadata);
Yaml yaml = initYamlParser();
FileWriter fileWriter = new FileWriter(metadataFile, false);
yaml.dump(streamMetadata, fileWriter);
fileWriter.close();
}
/**
* A utility method for writing metadata in the expected yaml format.
*
* @param kafkaStreams list of {@link KafkaStream}
* @param metadataFile the metadata {@link File}
*/
public static void saveToYamlFromKafkaStreams(List<KafkaStream> kafkaStreams, File metadataFile)
throws IOException {
saveToYaml(
kafkaStreams.stream()
.map(YamlFileMetadataService::convertToStreamMetadata)
.collect(Collectors.toList()),
metadataFile);
}
private static StreamMetadata convertToStreamMetadata(KafkaStream kafkaStream) {
return new StreamMetadata(
kafkaStream.getStreamId(),
kafkaStream.getClusterMetadataMap().entrySet().stream()
.map(
entry ->
new StreamMetadata.ClusterMetadata(
entry.getKey(),
entry.getValue()
.getProperties()
.getProperty(
CommonClientConfigs
.BOOTSTRAP_SERVERS_CONFIG),
new ArrayList<>(entry.getValue().getTopics())))
.collect(Collectors.toList()));
}
private void refreshIfNeeded() {
Instant now = Instant.now();
try {
if (now.isAfter(lastRefresh.plus(refreshInterval.toMillis(), ChronoUnit.MILLIS))) {
streamMetadata = parseFile();
lastRefresh = now;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@VisibleForTesting
Set<KafkaStream> parseFile() throws IOException {
if (yaml == null) {
yaml = initYamlParser();
}
List<StreamMetadata> streamMetadataList =
yaml.load(Files.newInputStream(Paths.get(metadataFilePath)));
if (logger.isDebugEnabled()) {
logger.debug(
"Input stream of metadata file has size: {}",
Files.newInputStream(Paths.get(metadataFilePath)).available());
}
Set<KafkaStream> kafkaStreams = new HashSet<>();
for (StreamMetadata streamMetadata : streamMetadataList) {
Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>();
for (StreamMetadata.ClusterMetadata clusterMetadata :
streamMetadata.getClusterMetadataList()) {
final String kafkaClusterId;
if (clusterMetadata.getClusterId() != null) {
kafkaClusterId = clusterMetadata.getClusterId();
} else {
kafkaClusterId = clusterMetadata.getBootstrapServers();
}
Properties properties = new Properties();
properties.setProperty(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterMetadata.getBootstrapServers());
clusterMetadataMap.put(
kafkaClusterId,
new ClusterMetadata(
new HashSet<>(clusterMetadata.getTopics()), properties));
}
kafkaStreams.add(new KafkaStream(streamMetadata.getStreamId(), clusterMetadataMap));
}
logger.debug("From {} loaded metadata: {}", metadataFilePath, kafkaStreams);
return kafkaStreams;
}
private static Yaml initYamlParser() {
Representer representer = new Representer();
representer.addClassTag(StreamMetadata.class, Tag.MAP);
TypeDescription typeDescription = new TypeDescription(StreamMetadata.class);
representer.addTypeDescription(typeDescription);
representer.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
return new Yaml(new ListConstructor<>(StreamMetadata.class), representer);
}
/** A custom constructor is required to read yaml lists at the root. */
private static class ListConstructor<T> extends Constructor {
private final Class<T> clazz;
public ListConstructor(final Class<T> clazz) {
this.clazz = clazz;
}
@Override
protected Object constructObject(final Node node) {
if (node instanceof SequenceNode && isRootNode(node)) {
((SequenceNode) node).setListType(clazz);
}
return super.constructObject(node);
}
private boolean isRootNode(final Node node) {
return node.getStartMark().getIndex() == 0;
}
}
/** Internal class for snake yaml parsing. A mutable, no arg, public class is required. */
public static class StreamMetadata {
private String streamId;
private List<ClusterMetadata> clusterMetadataList;
public StreamMetadata() {}
public StreamMetadata(String streamId, List<ClusterMetadata> clusterMetadataList) {
this.streamId = streamId;
this.clusterMetadataList = clusterMetadataList;
}
public String getStreamId() {
return streamId;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
public List<ClusterMetadata> getClusterMetadataList() {
return clusterMetadataList;
}
public void setClusterMetadataList(List<ClusterMetadata> clusterMetadata) {
this.clusterMetadataList = clusterMetadata;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("streamId", streamId)
.add("clusterMetadataList", clusterMetadataList)
.toString();
}
/** Information to connect to a particular cluster. */
public static class ClusterMetadata {
private String clusterId;
private String bootstrapServers;
private List<String> topics;
public ClusterMetadata() {}
public ClusterMetadata(String clusterId, String bootstrapServers, List<String> topics) {
this.clusterId = clusterId;
this.bootstrapServers = bootstrapServers;
this.topics = topics;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public List<String> getTopics() {
return topics;
}
public void setTopics(List<String> topics) {
this.topics = topics;
}
}
}
}