blob: 64651f4f70204f7d47d5f9af10570606c648fdb8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.kafka;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Properties;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After;
import org.junit.Before;
import org.slf4j.LoggerFactory;
/**
* This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
* test, this class creates 2 kafka partitions
*/
public class KafkaOperatorTestBase
{
public static final String END_TUPLE = "END_TUPLE";
public static final int[] TEST_ZOOKEEPER_PORT = new int[] { 2182, 2183 };
public static final int[][] TEST_KAFKA_BROKER_PORT = new int[][] { new int[] { 9092, 9093 }, new int[] { 9094, 9095 } };
public static final String TEST_TOPIC = "test_topic";
static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
// since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
// multiple brokers in multiple cluster
private final KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
// multiple cluster
private final ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
public String baseDir = "target";
private final String zkBaseDir = "zookeeper-server-data";
private final String kafkaBaseDir = "kafka-server-data";
private final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" };
private final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } };
protected boolean hasMultiPartition = false;
protected boolean hasMultiCluster = false;
public void startZookeeper(final int clusterId)
{
try {
//before start, clean the zookeeper files if it exists
FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
int numConnections = 10;
int tickTime = 2000;
File dir = new File(baseDir, zkdir[clusterId]);
TestZookeeperServer kserver = new TestZookeeperServer(dir, dir, tickTime);
zkFactory[clusterId] = new NIOServerCnxnFactory();
zkFactory[clusterId].configure(new InetSocketAddress(clientPort), numConnections);
zkFactory[clusterId].startup(kserver); // start the zookeeper server.
Thread.sleep(2000);
kserver.startup();
} catch (Exception ex) {
logger.debug(ex.getLocalizedMessage());
}
}
public void stopZookeeper()
{
for (ServerCnxnFactory zkf : zkFactory) {
if (zkf != null) {
zkf.shutdown();
}
}
}
public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
{
// before start, clean the kafka dir if it exists
FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
Properties props = new Properties();
props.setProperty("broker.id", "" + brokerid);
props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
props.setProperty("default.replication.factor", "1");
// set this to 50000 to boost the performance so most test data are in memory before flush to disk
props.setProperty("log.flush.interval.messages", "50000");
if (hasMultiPartition) {
props.setProperty("num.partitions", "2");
} else {
props.setProperty("num.partitions", "1");
}
broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
broker[clusterid][brokerid].startup();
}
public void startKafkaServer()
{
boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
for (int i = 0; i < startable.length; i++) {
for (int j = 0; j < startable[i].length; j++) {
if (startable[i][j])
startKafkaServer(i, j, hasMultiPartition ? 2 : 1);
}
}
// startup is asynch operation. wait 2 sec for server to startup
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void stopKafkaServer()
{
for (int i = 0; i < broker.length; i++) {
for (int j = 0; j < broker[i].length; j++) {
if (broker[i][j] != null) {
broker[i][j].shutdown();
broker[i][j].awaitShutdown();
}
}
}
}
@Before
public void beforeTest()
{
try {
startZookeeper();
startKafkaServer();
createTopic(0, TEST_TOPIC);
if (hasMultiCluster) {
createTopic(1, TEST_TOPIC);
}
} catch (java.nio.channels.CancelledKeyException ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
public void startZookeeper()
{
startZookeeper(0);
if (hasMultiCluster) {
startZookeeper(1);
}
}
public void createTopic(int clusterid, String topicName)
{
String[] args = new String[9];
args[0] = "--zookeeper";
args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
args[2] = "--replication-factor";
args[3] = "1";
args[4] = "--partitions";
if (hasMultiPartition) {
args[5] = "2";
} else {
args[5] = "1";
}
args[6] = "--topic";
args[7] = topicName;
args[8] = "--create";
TopicCommand.main(args);
// Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the
// topic is created
// So the tests will not hit any bizarre failure
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@After
public void afterTest()
{
try {
stopKafkaServer();
stopZookeeper();
} catch (Exception ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
public void setHasMultiPartition(boolean hasMultiPartition)
{
this.hasMultiPartition = hasMultiPartition;
}
public void setHasMultiCluster(boolean hasMultiCluster)
{
this.hasMultiCluster = hasMultiCluster;
}
public static class TestZookeeperServer extends ZooKeeperServer
{
public TestZookeeperServer()
{
super();
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
{
super(snapDir, logDir, tickTime);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
}
@Override
protected void registerJMX()
{
}
@Override
protected void unregisterJMX()
{
}
}
}