blob: 348b46b5c711230f80a07a285234fd20ad1161bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration.utils;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.io.File;
import java.util.Collections;
import java.util.List;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.junit.rules.TemporaryFolder;
/**
* Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
* default.
*
* Requires a running ZooKeeper instance to connect to.
*/
public class KafkaEmbedded {
private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
private final Properties effectiveConfig;
private final File logDir;
public final TemporaryFolder tmpFolder;
private final KafkaServer kafka;
/**
* Creates and starts an embedded Kafka broker.
* @param config Broker configuration settings. Used to modify, for example, on which port the
* broker should listen to. Note that you cannot change the `log.dirs` setting
* currently.
*/
public KafkaEmbedded(Properties config) throws IOException {
tmpFolder = new TemporaryFolder();
tmpFolder.create();
logDir = tmpFolder.newFolder();
effectiveConfig = effectiveConfigFrom(config);
boolean loggingEnabled = true;
KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
logDir, zookeeperConnect());
kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}
/**
* Creates the configuration for starting the Kafka broker by merging default values with
* overwrites.
* @param initialConfig Broker configuration settings that override the default config.
* @return
* @throws IOException
*/
private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
Properties effectiveConfig = new Properties();
effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
return effectiveConfig;
}
/**
* This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
*
* You can use this to tell Kafka producers and consumers how to connect to this instance.
*/
public String brokerList() {
return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT);
}
/**
* The ZooKeeper connection string aka `zookeeper.connect`.
*/
public String zookeeperConnect() {
return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
}
/**
* Stop the broker.
*/
public void stop() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
kafka.shutdown();
kafka.awaitShutdown();
log.debug("Removing logs.dir at {} ...", logDir);
List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
tmpFolder.delete();
log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}
/**
* Create a Kafka topic with 1 partition and a replication factor of 1.
*
* @param topic The name of the topic.
*/
public void createTopic(String topic) {
createTopic(topic, 1, 1, new Properties());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (the partitions of) this topic.
*/
public void createTopic(String topic, int partitions, int replication) {
createTopic(topic, partitions, replication, new Properties());
}
/**
* Create a Kafka topic with the given parameters.
*
* @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this topic.
* @param topicConfig Additional topic-level configuration settings.
*/
public void createTopic(String topic,
int partitions,
int replication,
Properties topicConfig) {
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
ZkClient zkClient = new ZkClient(
zookeeperConnect(),
DEFAULT_ZK_SESSION_TIMEOUT_MS,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
ZKStringSerializer$.MODULE$);
boolean isSecure = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
zkClient.close();
}
}