blob: 7909d255ff47ae78d5fbcbbf0296c8e9e315fe01 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools;
import joptsimple.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
* it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the
* 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster.
*
* The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader.
* Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of
* class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will
* will still load the 0.8 version class.
*
* As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should
* also be used by the class loader.
*
* The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
* the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class KafkaMigrationTool
{
private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
private static Class<?> KafkaStaticConsumer_07 = null;
private static Class<?> ConsumerConfig_07 = null;
private static Class<?> ConsumerConnector_07 = null;
private static Class<?> KafkaStream_07 = null;
private static Class<?> TopicFilter_07 = null;
private static Class<?> WhiteList_07 = null;
private static Class<?> BlackList_07 = null;
private static Class<?> KafkaConsumerIteratorClass_07 = null;
private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
private static Class<?> KafkaMessageClass_07 = null;
public static void main(String[] args) throws InterruptedException, IOException {
OptionParser parser = new OptionParser();
ArgumentAcceptingOptionSpec<String> consumerConfigOpt
= parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
ArgumentAcceptingOptionSpec<String> producerConfigOpt
= parser.accepts("producer.config", "Producer config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> numProducersOpt
= parser.accepts("num.producers", "Number of producer instances")
.withRequiredArg()
.describedAs("Number of producers")
.ofType(Integer.class)
.defaultsTo(1);
ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
= parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
.withRequiredArg()
.describedAs("zkClient 0.1 jar file required by Kafka 0.7")
.ofType(String.class);
ArgumentAcceptingOptionSpec<String> kafka07JarOpt
= parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
.withRequiredArg()
.describedAs("kafka 0.7 jar")
.ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
= parser.accepts("num.streams", "Number of consumer streams")
.withRequiredArg()
.describedAs("Number of consumer threads")
.ofType(Integer.class)
.defaultsTo(1);
ArgumentAcceptingOptionSpec<String> whitelistOpt
= parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
ArgumentAcceptingOptionSpec<String> blacklistOpt
= parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
= parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
.withRequiredArg()
.describedAs("Queue size in terms of number of messages")
.ofType(Integer.class)
.defaultsTo(10000);
OptionSpecBuilder helpOpt
= parser.accepts("help", "Print this message.");
OptionSet options = parser.parse(args);
if (options.has(helpOpt)) {
parser.printHelpOn(System.out);
System.exit(0);
}
checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
int blackListCount = options.has(blacklistOpt) ? 1 : 0;
if(whiteListCount + blackListCount != 1) {
System.err.println("Exactly one of whitelist or blacklist is required.");
System.exit(1);
}
String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
String zkClientJarFile = options.valueOf(zkClient01JarOpt);
String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
int numConsumers = options.valueOf(numStreamsOpt);
String producerConfigFile_08 = options.valueOf(producerConfigOpt);
int numProducers = options.valueOf(numProducersOpt);
final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
try {
File kafkaJar_07 = new File(kafkaJarFile_07);
File zkClientJar = new File(zkClientJarFile);
ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
kafkaJar_07.toURI().toURL(),
zkClientJar.toURI().toURL()
});
/** Construct the 07 consumer config **/
ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
Properties kafkaConsumerProperties_07 = new Properties();
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
logger.warn("Shallow iterator should not be used in the migration tool");
kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
}
Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
/** Construct the 07 consumer connector **/
Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
"createMessageStreamsByFilter",
TopicFilter_07, int.class);
final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
Object filterSpec = null;
if(options.has(whitelistOpt))
filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
else
filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
Properties kafkaProducerProperties_08 = new Properties();
kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
// create a producer channel instead
int queueSize = options.valueOf(queueSizeOpt);
ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
int threadId = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
} catch(Exception e) {
logger.error("Error while shutting down Kafka consumer", e);
}
for(MigrationThread migrationThread : migrationThreads) {
migrationThread.shutdown();
}
for(ProducerThread producerThread : producerThreads) {
producerThread.shutdown();
}
for(ProducerThread producerThread : producerThreads) {
producerThread.awaitShutdown();
}
logger.info("Kafka migration tool shutdown successfully");
}
});
// start consumer threads
for(Object stream: (List)retKafkaStreams) {
MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
threadId ++;
thread.start();
migrationThreads.add(thread);
}
String clientId = kafkaProducerProperties_08.getProperty("client.id");
// start producer threads
for (int i = 0; i < numProducers; i++) {
kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
Producer producer = new Producer(producerConfig_08);
ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
producerThread.start();
producerThreads.add(producerThread);
}
}
catch (Throwable e){
System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
logger.error("Kafka migration tool failed: ", e);
}
}
private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
for(OptionSpec arg : required) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"");
parser.printHelpOn(System.err);
System.exit(1);
}
}
}
static class ProducerDataChannel<T> {
private final int producerQueueSize;
private final BlockingQueue<T> producerRequestQueue;
public ProducerDataChannel(int queueSize) {
producerQueueSize = queueSize;
producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
}
public void sendRequest(T data) throws InterruptedException {
producerRequestQueue.put(data);
}
public T receiveRequest() throws InterruptedException {
return producerRequestQueue.take();
}
}
private static class MigrationThread extends Thread {
private final Object stream;
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
private final int threadId;
private final String threadName;
private final org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
private final AtomicBoolean isRunning = new AtomicBoolean(true);
MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
stream = _stream;
producerDataChannel = _producerDataChannel;
threadId = _threadId;
threadName = "MigrationThread-" + threadId;
logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
this.setName(threadName);
}
public void run() {
try {
Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
Object iterator = ConsumerIteratorMethod.invoke(stream);
while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
int size = ((ByteBuffer)payload_07).remaining();
byte[] bytes = new byte[size];
((ByteBuffer)payload_07).get(bytes);
if(logger.isDebugEnabled())
logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
producerDataChannel.sendRequest(producerData);
}
logger.info("Migration thread " + threadName + " finished running");
} catch (InvocationTargetException t){
logger.fatal("Migration thread failure due to root cause ", t.getCause());
} catch (Throwable t){
logger.fatal("Migration thread failure due to ", t);
} finally {
shutdownComplete.countDown();
}
}
public void shutdown() {
logger.info("Migration thread " + threadName + " shutting down");
isRunning.set(false);
interrupt();
try {
shutdownComplete.await();
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of MigrationThread", ie);
}
logger.info("Migration thread " + threadName + " shutdown complete");
}
}
static class ProducerThread extends Thread {
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
private final Producer<byte[], byte[]> producer;
private final int threadId;
private String threadName;
private org.apache.log4j.Logger logger;
private CountDownLatch shutdownComplete = new CountDownLatch(1);
private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
Producer<byte[], byte[]> _producer,
int _threadId) {
producerDataChannel = _producerDataChannel;
producer = _producer;
threadId = _threadId;
threadName = "ProducerThread-" + threadId;
logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
this.setName(threadName);
}
public void run() {
try{
while(true) {
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
if(!data.equals(shutdownMessage)) {
producer.send(data);
if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
}
else
break;
}
logger.info("Producer thread " + threadName + " finished running");
} catch (Throwable t){
logger.fatal("Producer thread failure due to ", t);
} finally {
shutdownComplete.countDown();
}
}
public void shutdown() {
try {
logger.info("Producer thread " + threadName + " shutting down");
producerDataChannel.sendRequest(shutdownMessage);
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of ProducerThread", ie);
}
}
public void awaitShutdown() {
try {
shutdownComplete.await();
producer.close();
logger.info("Producer thread " + threadName + " shutdown complete");
} catch(InterruptedException ie) {
logger.warn("Interrupt during shutdown of ProducerThread", ie);
}
}
}
/**
* A parent-last class loader that will try the child class loader first and then the parent.
* This takes a fair bit of doing because java really prefers parent-first.
*/
private static class ParentLastURLClassLoader extends ClassLoader {
private ChildURLClassLoader childClassLoader;
/**
* This class allows me to call findClass on a class loader
*/
private static class FindClassClassLoader extends ClassLoader {
public FindClassClassLoader(ClassLoader parent) {
super(parent);
}
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
return super.findClass(name);
}
}
/**
* This class delegates (child then parent) for the findClass method for a URLClassLoader.
* We need this because findClass is protected in URLClassLoader
*/
private static class ChildURLClassLoader extends URLClassLoader {
private FindClassClassLoader realParent;
public ChildURLClassLoader( URL[] urls, FindClassClassLoader realParent) {
super(urls, null);
this.realParent = realParent;
}
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
try{
// first try to use the URLClassLoader findClass
return super.findClass(name);
}
catch( ClassNotFoundException e ) {
// if that fails, we ask our real parent class loader to load the class (we give up)
return realParent.loadClass(name);
}
}
}
public ParentLastURLClassLoader(URL[] urls) {
super(Thread.currentThread().getContextClassLoader());
childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
}
@Override
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
try {
// first we try to find a class inside the child class loader
return childClassLoader.findClass(name);
}
catch( ClassNotFoundException e ) {
// didn't find it, try the parent
return super.loadClass(name, resolve);
}
}
}
}