blob: f5dd9fe8decdfff235adc49a0eae6c32a937fbe9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration.utils;
import kafka.server.ConfigType;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers.
*/
public class EmbeddedKafkaCluster {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
private static final int TOPIC_CREATION_TIMEOUT = 30000;
private static final int TOPIC_DELETION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private final Properties brokerConfig;
public final MockTime time;
public EmbeddedKafkaCluster(final int numBrokers) {
this(numBrokers, new Properties());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig) {
this(numBrokers, brokerConfig, System.currentTimeMillis());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final long mockTimeMillisStart) {
this(numBrokers, brokerConfig, mockTimeMillisStart, System.nanoTime());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final long mockTimeMillisStart,
final long mockTimeNanoStart) {
brokers = new KafkaEmbedded[numBrokers];
this.brokerConfig = brokerConfig;
time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
}
/**
* Creates and starts a Kafka cluster.
*/
public void start() throws IOException {
log.debug("Initiating embedded Kafka cluster startup");
log.debug("Starting a ZooKeeper instance");
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig.TransactionsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), true);
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
log.debug("Starting a Kafka instance on {} ...", brokerConfig.get(KafkaConfig.ListenersProp()));
brokers[i] = new KafkaEmbedded(brokerConfig, time);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
brokers[i].brokerList(), brokers[i].zookeeperConnect());
}
}
private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
if (!props.containsKey(propertyKey)) {
brokerConfig.put(propertyKey, propertyValue);
}
}
/**
* Stop the Kafka cluster.
*/
public void stop() {
if (brokers.length > 1) {
// delete the topics first to avoid cascading leader elections while shutting down the brokers
final Set<String> topics = getAllTopicsInCluster();
if (!topics.isEmpty()) {
try (final Admin adminClient = brokers[0].createAdminClient()) {
adminClient.deleteTopics(topics).all().get();
} catch (final InterruptedException e) {
log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e);
throw new RuntimeException(e);
} catch (final ExecutionException | RuntimeException e) {
log.warn("Couldn't delete all topics before stopping brokers", e);
}
}
}
for (final KafkaEmbedded broker : brokers) {
broker.stopAsync();
}
for (final KafkaEmbedded broker : brokers) {
broker.awaitStoppedAndPurge();
}
zookeeper.shutdown();
}
/**
* The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
* Example: `127.0.0.1:2181`.
* <p>
* You can use this to e.g. tell Kafka brokers how to connect to this instance.
*/
public String zKConnectString() {
return "127.0.0.1:" + zookeeper.port();
}
/**
* This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`.
* <p>
* You can use this to tell Kafka producers how to connect to this cluster.
*/
public String bootstrapServers() {
return brokers[0].brokerList();
}
/**
* Create multiple Kafka topics each with 1 partition and a replication factor of 1.
*
* @param topics The name of the topics.
*/
public void createTopics(final String... topics) throws InterruptedException {
for (final String topic : topics) {
createTopic(topic, 1, 1, Collections.emptyMap());
}
}
/**
* Create a Kafka topic with 1 partition and a replication factor of 1.
*
* @param topic The name of the topic.
*/
public void createTopic(final String topic) throws InterruptedException {
createTopic(topic, 1, 1, Collections.emptyMap());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (the partitions of) this topic.
*/
public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException {
createTopic(topic, partitions, replication, Collections.emptyMap());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this topic.
* @param topicConfig Additional topic-level configuration settings.
*/
public void createTopic(final String topic,
final int partitions,
final int replication,
final Map<String, String> topicConfig) throws InterruptedException {
brokers[0].createTopic(topic, partitions, replication, topicConfig);
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (int partition = 0; partition < partitions; partition++) {
topicPartitions.add(new TopicPartition(topic, partition));
}
IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
}
/**
* Deletes a topic returns immediately.
*
* @param topic the name of the topic
*/
public void deleteTopic(final String topic) throws InterruptedException {
deleteTopicsAndWait(-1L, topic);
}
/**
* Deletes a topic and blocks for max 30 sec until the topic got deleted.
*
* @param topic the name of the topic
*/
public void deleteTopicAndWait(final String topic) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
}
/**
* Deletes multiple topics returns immediately.
*
* @param topics the name of the topics
*/
public void deleteTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(-1, topics);
}
/**
* Deletes multiple topics and blocks for max 30 sec until all topics got deleted.
*
* @param topics the name of the topics
*/
public void deleteTopicsAndWait(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
}
/**
* Deletes multiple topics and blocks until all topics got deleted.
*
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
* @param topics the name of the topics
*/
public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws InterruptedException {
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException ignored) { }
}
if (timeoutMs > 0) {
TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
}
}
/**
* Deletes all topics and blocks until all topics got deleted.
*
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
*/
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
final Set<String> topics = getAllTopicsInCluster();
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
} catch (final UnknownTopicOrPartitionException ignored) { }
}
if (timeoutMs > 0) {
TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
}
}
public void waitForRemainingTopics(final long timeoutMs, final String... topics) throws InterruptedException {
TestUtils.waitForCondition(new TopicsRemainingCondition(topics), timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds.");
}
private final class TopicsDeletedCondition implements TestCondition {
final Set<String> deletedTopics = new HashSet<>();
private TopicsDeletedCondition(final String... topics) {
Collections.addAll(deletedTopics, topics);
}
private TopicsDeletedCondition(final Collection<String> topics) {
deletedTopics.addAll(topics);
}
@Override
public boolean conditionMet() {
final Set<String> allTopics = getAllTopicsInCluster();
return !allTopics.removeAll(deletedTopics);
}
}
private final class TopicsRemainingCondition implements TestCondition {
final Set<String> remainingTopics = new HashSet<>();
private TopicsRemainingCondition(final String... topics) {
Collections.addAll(remainingTopics, topics);
}
@Override
public boolean conditionMet() {
final Set<String> allTopics = getAllTopicsInCluster();
return allTopics.equals(remainingTopics);
}
}
private List<KafkaServer> brokers() {
final List<KafkaServer> servers = new ArrayList<>();
for (final KafkaEmbedded broker : brokers) {
servers.add(broker.kafkaServer());
}
return servers;
}
public Properties getLogConfig(final String topic) {
return brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.Topic(), topic);
}
public Set<String> getAllTopicsInCluster() {
final scala.collection.Iterator<String> topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
final Set<String> topics = new HashSet<>();
while (topicsIterator.hasNext()) {
topics.add(topicsIterator.next());
}
return topics;
}
}