blob: 50ba4ea7180120aa633772685880ee26ac6671d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.test.kafka;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
/**
* Provides a JUnit Rule for interacting with the {@link EmbeddedKafkaSingleton}.
*
*/
public class KafkaTestInstanceRule extends ExternalResource {
private static final Logger logger = LoggerFactory.getLogger(KafkaTestInstanceRule.class);
private static final EmbeddedKafkaInstance kafkaInstance = EmbeddedKafkaSingleton.getInstance();
private String kafkaTopicName;
private final boolean createTopic;
/**
* @param createTopic - If true, a topic shall be created for the value
* provided by {@link #getKafkaTopicName()}. If false, no topics
* shall be created.
*/
public KafkaTestInstanceRule(final boolean createTopic) {
this.createTopic = createTopic;
}
/**
* @return A unique topic name for this test execution. If multiple topics are required by a test, use this value as
* a prefix.
*/
public String getKafkaTopicName() {
if (kafkaTopicName == null) {
throw new IllegalStateException("Cannot get Kafka Topic Name outside of a test execution.");
}
return kafkaTopicName;
}
@Override
protected void before() throws Throwable {
// Get the next kafka topic name.
kafkaTopicName = kafkaInstance.getUniqueTopicName();
if(createTopic) {
createTopic(kafkaTopicName);
}
}
@Override
protected void after() {
kafkaTopicName = null;
}
/**
* Utility method to provide additional unique topics if they are required.
* @param topicName - The Kafka topic to create.
*/
public void createTopic(final String topicName) {
// Setup Kafka.
ZkUtils zkUtils = null;
try {
logger.info("Creating Kafka Topic: '{}'", topicName);
zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
finally {
if(zkUtils != null) {
zkUtils.close();
}
}
}
/**
* Marks a topic for deletion. You may have to wait some time for the delete to actually complete.
*
* @param topicName - The topic that will be deleted. (not null)
*/
public void deleteTopic(final String topicName) {
ZkUtils zkUtils = null;
try {
logger.info("Deleting Kafka Topic: '{}'", topicName);
zkUtils = ZkUtils.apply(new ZkClient(kafkaInstance.getZookeeperConnect(), 30000, 30000, ZKStringSerializer$.MODULE$), false);
AdminUtils.deleteTopic(zkUtils, topicName);
}
finally {
if(zkUtils != null) {
zkUtils.close();
}
}
}
/**
* Delete all of the topics that are in the embedded Kafka instance.
*
* @throws InterruptedException Interrupted while waiting for the topics to be deleted.
*/
public void deleteAllTopics() throws InterruptedException {
// Setup the consumer that is used to list topics for the source.
final Properties consumerProperties = createBootstrapServerConfig();
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try(final Consumer<String, String> listTopicsConsumer = new KafkaConsumer<>(consumerProperties)) {
// Mark all existing topics for deletion.
Set<String> topics = listTopicsConsumer.listTopics().keySet();
for(final String topic : topics) {
deleteTopic(topic);
}
// Loop and wait until they are all gone.
while(!topics.isEmpty()) {
Thread.sleep(100);
topics = listTopicsConsumer.listTopics().keySet();
}
}
}
/**
* @return A new Property object containing the correct value for Kafka's
* {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG}.
*/
public Properties createBootstrapServerConfig() {
return kafkaInstance.createBootstrapServerConfig();
}
/**
* @return The hostnames of the Zookeeper servers.
*/
public String getZookeeperServers() {
return kafkaInstance.getZookeeperConnect();
}
/**
* @return The hostname of the Kafka Broker.
*/
public String getKafkaHostname() {
return kafkaInstance.getBrokerHost();
}
/**
* @return The port of the Kafka Broker.
*/
public String getKafkaPort() {
return kafkaInstance.getBrokerPort();
}
}