blob: 235eeba615cbbb890dcaef8d21e707a032e05336 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kafka;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Properties;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
/**
* This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
* test, this class creates 2 kafka partitions
*/
public class KafkaOperatorTestBase
{
public static final String END_TUPLE = "END_TUPLE";
public static final int[] TEST_ZOOKEEPER_PORT;
public static final int[] TEST_KAFKA_BROKER_PORT;
public static final String TEST_TOPIC = "testtopic";
public static int testCounter = 0;
// get available ports
static {
ServerSocket[] listeners = new ServerSocket[6];
int[] p = new int[6];
try {
for (int i = 0; i < 6; i++) {
listeners[i] = new ServerSocket(0);
p[i] = listeners[i].getLocalPort();
}
for (int i = 0; i < 6; i++) {
listeners[i].close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
TEST_KAFKA_BROKER_PORT = new int[]{p[2], p[3]};
}
static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
// since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
// multiple brokers in multiple cluster
private static KafkaServerStartable[] broker = new KafkaServerStartable[2];
// multiple cluster
private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
public static String baseDir = "target";
private static final String zkBaseDir = "zookeeper-server-data";
private static final String kafkaBaseDir = "kafka-server-data";
private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
private static final String[] kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
protected boolean hasMultiPartition = false;
protected boolean hasMultiCluster = false;
public static void startZookeeper(final int clusterId)
{
try {
int numConnections = 100;
int tickTime = 2000;
File dir = new File(baseDir, zkdir[clusterId]);
zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
zkFactory[clusterId] = new NIOServerCnxnFactory();
zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
Thread.sleep(2000);
//kserver.startup();
} catch (Exception ex) {
logger.error(ex.getLocalizedMessage());
}
}
public static void stopZookeeper()
{
for (ZooKeeperServer zs : zkServer) {
if (zs != null) {
zs.shutdown();
}
}
for (ServerCnxnFactory zkf : zkFactory) {
if (zkf != null) {
zkf.closeAll();
zkf.shutdown();
}
}
zkServer = new ZooKeeperServer[2];
zkFactory = new ServerCnxnFactory[2];
}
public static void startKafkaServer(int clusterid, int brokerid)
{
Properties props = new Properties();
props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid]).toString());
props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid]);
props.setProperty("default.replication.factor", "1");
// set this to 50000 to boost the performance so most test data are in memory before flush to disk
props.setProperty("log.flush.interval.messages", "50000");
broker[clusterid] = new KafkaServerStartable(new KafkaConfig(props));
broker[clusterid].startup();
}
public static void startKafkaServer()
{
FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
//boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
// new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
startKafkaServer(0, 0);
//startKafkaServer(0, 1);
startKafkaServer(1, 0);
//startKafkaServer(1, 1);
// startup is asynch operation. wait 2 sec for server to startup
}
public static void stopKafkaServer()
{
for (int i = 0; i < broker.length; i++) {
if (broker[i] != null) {
broker[i].shutdown();
broker[i].awaitShutdown();
broker[i] = null;
}
}
}
@BeforeClass
public static void beforeTest()
{
try {
startZookeeper();
startKafkaServer();
} catch (java.nio.channels.CancelledKeyException ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
public static void startZookeeper()
{
FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
startZookeeper(0);
startZookeeper(1);
}
public void createTopic(int clusterid, String topicName)
{
String[] args = new String[9];
args[0] = "--zookeeper";
args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
args[2] = "--replication-factor";
args[3] = "1";
args[4] = "--partitions";
if (hasMultiPartition) {
args[5] = "2";
} else {
args[5] = "1";
}
args[6] = "--topic";
args[7] = topicName;
args[8] = "--create";
ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
}
@AfterClass
public static void afterTest()
{
try {
stopKafkaServer();
stopZookeeper();
} catch (Exception ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
public void setHasMultiPartition(boolean hasMultiPartition)
{
this.hasMultiPartition = hasMultiPartition;
}
public void setHasMultiCluster(boolean hasMultiCluster)
{
this.hasMultiCluster = hasMultiCluster;
}
public static class TestZookeeperServer extends ZooKeeperServer
{
public TestZookeeperServer()
{
super();
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
{
super(snapDir, logDir, tickTime);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
{
super(txnLogFactory, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
throws IOException
{
super(txnLogFactory, tickTime, treeBuilder);
// TODO Auto-generated constructor stub
}
public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
{
super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
// TODO Auto-generated constructor stub
}
@Override
protected void registerJMX()
{
}
@Override
protected void unregisterJMX()
{
}
}
}