import kafka.consumer._
import kafka.metrics.KafkaMetricsGroup
import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer}
import kafka.serializer._
import kafka.utils._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import java.util.Random
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, TimeUnit}
import scala.collection.JavaConversions._
import joptsimple.OptionParser
object MirrorMaker extends Logging {
private var connectors: Seq[ZookeeperConsumerConnector] = null
private var consumerThreads: Seq[ConsumerThread] = null
private var producerThreads: Seq[ProducerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
private val shutdownMessage : ProducerRecord[Array[Byte],Array[Byte]] = new ProducerRecord[Array[Byte],Array[Byte]]("shutdown", "shutdown".getBytes)
def main(args: Array[String]) {
info ("Starting mirror maker")
val parser = new OptionParser
val consumerConfigOpt = parser.accepts("consumer.config",
"Consumer config to consume from a source cluster. " +
"You may specify multiple of these.")
.describedAs("config file")
val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
.describedAs("config file")
val useNewProducerOpt = parser.accepts("new.producer",
"Use the new producer implementation.")
val numProducersOpt = parser.accepts("num.producers",
"Number of producer instances")
.describedAs("Number of producers")
val numStreamsOpt = parser.accepts("num.streams",
"Number of consumption streams.")
.describedAs("Number of threads")
val bufferSizeOpt = parser.accepts("queue.size",
"Number of messages that are buffered between the consumer and producer")
.describedAs("Queue size in terms of number of messages")
val whitelistOpt = parser.accepts("whitelist",
"Whitelist of topics to mirror.")
.describedAs("Java regex (String)")
val blacklistOpt = parser.accepts("blacklist",
"Blacklist of topics to mirror.")
.describedAs("Java regex (String)")
val helpOpt = parser.accepts("help", "Print this message.")
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
val options = parser.parse(args : _*)
if (options.has(helpOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
println("Exactly one of whitelist or blacklist is required.")
val numProducers = options.valueOf(numProducersOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue()
val bufferSize = options.valueOf(bufferSizeOpt).intValue()
// create consumer streams
connectors = options.valuesOf(consumerConfigOpt).toList
.map(cfg => new ConsumerConfig(Utils.loadProps(cfg)))
.map(new ZookeeperConsumerConnector(_))
val numConsumers = connectors.size * numStreams
// create a data channel btw the consumers and the producers
val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
// create producer threads
val useNewProducer = options.has(useNewProducerOpt)
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
val clientId = producerProps.getProperty("", "")
producerThreads = (0 until numProducers).map(i => {
producerProps.setProperty("", clientId + "-" + i)
val producer =
if (useNewProducer) {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(producerProps)
new OldProducer(producerProps)
new ProducerThread(mirrorDataChannel, producer, i)
// create consumer threads
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
new Blacklist(options.valueOf(blacklistOpt))
var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
try {
streams =, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten
} catch {
case t: Throwable =>
fatal("Unable to create stream - shutting down mirror maker.")
consumerThreads = => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
assert(consumerThreads.size == numConsumers)
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
// we wait on producer's shutdown latch instead of consumers
// since the consumer threads can hit a timeout/other exception;
// but in this case the producer should still be able to shutdown
// based on the shutdown message in the channel
def cleanShutdown() {
if (isShuttingdown.compareAndSet(false, true)) {
if (connectors != null) connectors.foreach(_.shutdown)
if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown)
if (producerThreads != null) {
info("Kafka mirror maker shutdown successfully")
class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup {
val queues = new Array[BlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]]](numOutputs)
for (i <- 0 until numOutputs)
queues(i) = new ArrayBlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]](capacity)
private val counter = new AtomicInteger(new Random().nextInt())
// We use a single meter for aggregated wait percentage for the data channel.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded wait
// time should be discounted by # threads.
private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
def put(record: ProducerRecord[Array[Byte],Array[Byte]]) {
// If the key of the message is empty, use round-robin to select the queue
// Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
val queueId =
if(record.key() != null) {
Utils.abs(java.util.Arrays.hashCode(record.key())) % numOutputs
} else {
Utils.abs(counter.getAndIncrement()) % numOutputs
put(record, queueId)
def put(record: ProducerRecord[Array[Byte],Array[Byte]], queueId: Int) {
val queue = queues(queueId)
var putSucceed = false
while (!putSucceed) {
val startPutTime = SystemTime.nanoseconds
putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
def take(queueId: Int): ProducerRecord[Array[Byte],Array[Byte]] = {
val queue = queues(queueId)
var data: ProducerRecord[Array[Byte],Array[Byte]] = null
while (data == null) {
val startTakeTime = SystemTime.nanoseconds
data = queue.poll(500, TimeUnit.MILLISECONDS)
waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
mirrorDataChannel: DataChannel,
threadId: Int)
extends Thread with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
private val threadName = "mirrormaker-consumer-" + threadId
private var isCleanShutdown: Boolean = true
this.logIdent = "[%s] ".format(threadName)
override def run() {
info("Starting mirror maker consumer thread " + threadName)
try {
for (msgAndMetadata <- stream) {
val data = new ProducerRecord[Array[Byte],Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
} catch {
case e: Throwable => {
fatal("Stream unexpectedly exited.", e)
isCleanShutdown = false
} finally {
info("Consumer thread stopped")
// If it exits accidentally, stop the entire mirror maker.
if (!isCleanShutdown) {
fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
def awaitShutdown() {
try {
info("Consumer thread shutdown complete")
} catch {
case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!")
class ProducerThread (val dataChannel: DataChannel,
val producer: BaseProducer,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-producer-" + threadId
private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
private var isCleanShutdown: Boolean = true
this.logIdent = "[%s] ".format(threadName)
override def run {
info("Starting mirror maker producer thread " + threadName)
try {
while (true) {
val data: ProducerRecord[Array[Byte],Array[Byte]] = dataChannel.take(threadId)
trace("Sending message with value size %d".format(data.value().size))
if(data eq shutdownMessage) {
info("Received shutdown message")
producer.send(data.topic(), data.key(), data.value())
} catch {
case t: Throwable => {
fatal("Producer thread failure due to ", t)
isCleanShutdown = false
} finally {
info("Producer thread stopped")
// If it exits accidentally, stop the entire mirror maker.
if (!isCleanShutdown) {
fatal("Producer thread exited abnormally, stopping the whole mirror maker.")
def shutdown {
try {
info("Producer thread " + threadName + " shutting down")
dataChannel.put(shutdownMessage, threadId)
catch {
case ie: InterruptedException => {
warn("Interrupt during shutdown of ProducerThread")
def awaitShutdown {
try {
info("Producer thread shutdown complete")
} catch {
case ie: InterruptedException => {
warn("Shutdown of the producer thread interrupted")