blob: e1a92edb213387f72c7fb064c3aa9305898eede8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.distribution.journal.kafka.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.VerifiableProperties;
import scala.Some;
import scala.collection.Seq;
public class KafkaLocal implements Closeable {
Logger LOG = LoggerFactory.getLogger(KafkaLocal.class);
public KafkaServer kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal() throws Exception {
this(kafkaProperties(), zookeeperProperties());
}
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws Exception {
zookeeper = new ZooKeeperLocal(zkProperties);
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(kafkaProperties));
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, new Some<String>("kafka"), reporters);
kafka.startup();
}
@Override
public void close() throws IOException {
System.out.println("stopping kafka...");
try {
kafka.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
zookeeper.close();
System.out.println("done");
}
private static Properties kafkaProperties() {
String logDir = "target/kafka-" + UUID.randomUUID().toString();
Properties kafkaProps = new Properties();
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("advertised.host.name", "localhost");
kafkaProps.put("host.name","localhost");
kafkaProps.put("port", "9092");
kafkaProps.put("broker.id", "0");
kafkaProps.put("log.dir",logDir);
kafkaProps.put("group.initial.rebalance.delay.ms", "0");
kafkaProps.put("group.min.session.timeout.ms", "1000");
kafkaProps.put("offsets.topic.replication.factor", "1");
return kafkaProps;
}
private static Properties zookeeperProperties() {
Properties zkProps = new Properties();
UUID uuid = UUID.randomUUID();
zkProps.put("dataDir", "target/zookeeper/"+uuid.toString());
zkProps.put("clientPort", "2181");
return zkProps;
}
}