blob: 28c15a842f668ce8e07fb5b27878dd2750946de2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.DockerImageVersions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;
/** Base class for Kafka Table IT Cases. */
public abstract class KafkaTableTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int zkTimeoutMills = 30000;
@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) {
@Override
protected void doStart() {
super.doStart();
if (LOG.isInfoEnabled()) {
this.followOutput(new Slf4jLogConsumer(LOG));
}
}
}.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withEnv(
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
String.valueOf(Duration.ofHours(2).toMillis()))
// Disable log deletion to prevent records from being deleted during test run
.withEnv("KAFKA_LOG_RETENTION_MS", "-1");
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");
@Before
public void setup() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
// Probe Kafka broker status per 30 seconds
scheduleTimeoutLogger(
Duration.ofSeconds(30),
() -> {
// List all non-internal topics
final Map<String, TopicDescription> topicDescriptions =
describeExternalTopics();
LOG.info("Current existing topics: {}", topicDescriptions.keySet());
// Log status of topics
logTopicPartitionStatus(topicDescriptions);
});
}
@After
public void after() {
// Cancel timer for debug logging
cancelTimeoutLogger();
}
public Properties getStandardProps() {
Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", "flink-tests");
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
return standardProps;
}
public String getBootstrapServers() {
return KAFKA_CONTAINER.getBootstrapServers();
}
public void createTestTopic(String topic, int numPartitions, int replicationFactor) {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)));
}
}
public void deleteTestTopic(String topic) {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.deleteTopics(Collections.singletonList(topic));
}
}
// ------------------------ For Debug Logging Purpose ----------------------------------
private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {
TimerTask timeoutLoggerTask =
new TimerTask() {
@Override
public void run() {
try {
loggingAction.run();
} catch (Exception e) {
throw new RuntimeException("Failed to execute logging action", e);
}
}
};
loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
}
private void cancelTimeoutLogger() {
loggingTimer.cancel();
}
private Map<String, TopicDescription> describeExternalTopics() {
final AdminClient adminClient = AdminClient.create(getStandardProps());
try {
final List<String> topics =
adminClient.listTopics().listings().get().stream()
.filter(listing -> !listing.isInternal())
.map(TopicListing::name)
.collect(Collectors.toList());
return adminClient.describeTopics(topics).all().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
}
private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
final Properties properties = getStandardProps();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging");
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String, String>(properties);
List<TopicPartition> partitions = new ArrayList<>();
topicDescriptions.forEach(
(topic, description) ->
description
.partitions()
.forEach(
tpInfo ->
partitions.add(
new TopicPartition(
topic, tpInfo.partition()))));
final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
partitions.forEach(
partition ->
LOG.info(
"TopicPartition \"{}\": starting offset: {}, stopping offset: {}",
partition,
beginningOffsets.get(partition),
endOffsets.get(partition)));
}
}