blob: 42456f76a2e076071204b6acb7d592de411b18c2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.regex.Pattern
import java.util.{Collections, Properties}
import com.yammer.metrics.core.Gauge
import joptsimple.OptionParser
import kafka.consumer.{BaseConsumer, BaseConsumerRecord, Blacklist, ConsumerIterator, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector, ConsumerConfig => OldConsumerConfig}
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, CoreUtils, Logging, ZKConfig}
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.Record
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
/**
* The mirror maker has the following architecture:
* - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream.
* - All the mirror maker threads share one producer.
* - Each mirror maker thread periodically flushes the producer and then commits all offsets.
*
* @note For mirror maker, the following settings are set by default to make sure there is no data loss:
* 1. use new producer with following settings
* acks=all
* retries=max integer
* max.block.ms=max long
* max.in.flight.requests.per.connection=1
* 2. Consumer Settings
* auto.commit.enable=false
* 3. Mirror Maker Setting:
* abort.on.send.failure=true
*/
object MirrorMaker extends Logging with KafkaMetricsGroup {
private[tools] var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var messageHandler: MirrorMakerMessageHandler = null
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@volatile private var exitingOnSendFailure: Boolean = false
// If a message send failed after retries are exhausted. The offset of the messages will also be removed from
// the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that
// message was not really acked, but was skipped. This metric records the number of skipped offsets.
newGauge("MirrorMaker-numDroppedMessages",
new Gauge[Int] {
def value = numDroppedMessages.get()
})
def main(args: Array[String]) {
info("Starting mirror maker")
try {
val parser = new OptionParser
val consumerConfigOpt = parser.accepts("consumer.config",
"Embedded consumer config for consuming from the source cluster.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
val useNewConsumerOpt = parser.accepts("new.consumer",
"Use new consumer in mirror maker (this is the default).")
val producerConfigOpt = parser.accepts("producer.config",
"Embedded producer config.")
.withRequiredArg()
.describedAs("config file")
.ofType(classOf[String])
val numStreamsOpt = parser.accepts("num.streams",
"Number of consumption streams.")
.withRequiredArg()
.describedAs("Number of threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val whitelistOpt = parser.accepts("whitelist",
"Whitelist of topics to mirror.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(classOf[String])
val blacklistOpt = parser.accepts("blacklist",
"Blacklist of topics to mirror. Only old consumer supports blacklist.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(classOf[String])
val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
"Offset commit interval in ms.")
.withRequiredArg()
.describedAs("offset commit interval in millisecond")
.ofType(classOf[java.lang.Integer])
.defaultsTo(60000)
val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
"The consumer rebalance listener to use for mirror maker consumer.")
.withRequiredArg()
.describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
.ofType(classOf[String])
val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
"Arguments used by custom rebalance listener for mirror maker consumer.")
.withRequiredArg()
.describedAs("Arguments passed to custom rebalance listener constructor as a string.")
.ofType(classOf[String])
val messageHandlerOpt = parser.accepts("message.handler",
"Message handler which will process every record in-between consumer and producer.")
.withRequiredArg()
.describedAs("A custom message handler of type MirrorMakerMessageHandler")
.ofType(classOf[String])
val messageHandlerArgsOpt = parser.accepts("message.handler.args",
"Arguments used by custom message handler for mirror maker.")
.withRequiredArg()
.describedAs("Arguments passed to message handler constructor.")
.ofType(classOf[String])
val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
"Configure the mirror maker to exit on a failed send.")
.withRequiredArg()
.describedAs("Stop the entire mirror maker when a send failure occurs")
.ofType(classOf[String])
.defaultsTo("true")
val helpOpt = parser.accepts("help", "Print this message.")
if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
val options = parser.parse(args: _*)
if (options.has(helpOpt)) {
parser.printHelpOn(System.out)
sys.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt)
val consumerProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
val useOldConsumer = consumerProps.containsKey(ZKConfig.ZkConnectProp)
if (useOldConsumer) {
if (options.has(useNewConsumerOpt)) {
error(s"The consumer configuration parameter `${ZKConfig.ZkConnectProp}` is not valid when using --new.consumer")
sys.exit(1)
}
if (consumerProps.containsKey(NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
error(s"The configuration parameters `${ZKConfig.ZkConnectProp}` (old consumer) and " +
s"`${NewConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}` (new consumer) cannot be used together.")
sys.exit(1)
}
if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) {
error("Exactly one of whitelist or blacklist is required.")
sys.exit(1)
}
} else {
if (options.has(blacklistOpt)) {
error("blacklist can not be used when using new consumer in mirror maker. Use whitelist instead.")
sys.exit(1)
}
if (!options.has(whitelistOpt)) {
error("whitelist must be specified when using new consumer in mirror maker.")
sys.exit(1)
}
if (!consumerProps.containsKey(NewConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG))
System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based mirror maker will " +
"change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If " +
"you prefer to make this switch in advance of that release add the following to the corresponding new-consumer " +
"config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'")
}
abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue()
Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
override def run() {
cleanShutdown()
}
})
// create producer
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
// Defaults to no data loss settings.
maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
// Always set producer key and value serializer to ByteArraySerializer.
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producer = new MirrorMakerProducer(producerProps)
// Create consumers
val mirrorMakerConsumers = if (useOldConsumer) {
val customRebalanceListener = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
if (rebalanceListenerArgs != null) {
Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
} else {
Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
}
} else {
None
}
}
if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener")
createOldConsumers(
numStreams,
consumerProps,
customRebalanceListener,
Option(options.valueOf(whitelistOpt)),
Option(options.valueOf(blacklistOpt)))
} else {
val customRebalanceListener = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
if (rebalanceListenerArgs != null) {
Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
} else {
Some(CoreUtils.createObject[org.apache.kafka.clients.consumer.ConsumerRebalanceListener](customRebalanceListenerClass))
}
} else {
None
}
}
if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
throw new IllegalArgumentException("The rebalance listener should be an instance of" +
"org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
createNewConsumers(
numStreams,
consumerProps,
customRebalanceListener,
Option(options.valueOf(whitelistOpt)))
}
// Create mirror maker threads.
mirrorMakerThreads = (0 until numStreams) map (i =>
new MirrorMakerThread(mirrorMakerConsumers(i), i))
// Create and initialize message handler
val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
messageHandler = {
if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs)
else
CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
} else {
defaultMirrorMakerMessageHandler
}
}
} catch {
case ct : ControlThrowable => throw ct
case t : Throwable =>
error("Exception when starting mirror maker.", t)
}
mirrorMakerThreads.foreach(_.start())
mirrorMakerThreads.foreach(_.awaitShutdown())
}
private def createOldConsumers(numStreams: Int,
consumerConfigProps: Properties,
customRebalanceListener: Option[ConsumerRebalanceListener],
whitelist: Option[String],
blacklist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
// Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure
// Offsets are still committed for those low volume pipelines.
maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000")
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
val connectors = (0 until numStreams) map { i =>
consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
val consumerConfig = new OldConsumerConfig(consumerConfigProps)
new ZookeeperConsumerConnector(consumerConfig)
}
// create filters
val filterSpec = if (whitelist.isDefined)
new Whitelist(whitelist.get)
else if (blacklist.isDefined)
new Blacklist(blacklist.get)
else
throw new IllegalArgumentException("Either whitelist or blacklist should be defined!")
(0 until numStreams) map { i =>
val consumer = new MirrorMakerOldConsumer(connectors(i), filterSpec)
val consumerRebalanceListener = new InternalRebalanceListenerForOldConsumer(consumer, customRebalanceListener)
connectors(i).setConsumerRebalanceListener(consumerRebalanceListener)
consumer
}
}
def createNewConsumers(numStreams: Int,
consumerConfigProps: Properties,
customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
whitelist: Option[String]) : Seq[MirrorMakerBaseConsumer] = {
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
// Hardcode the deserializer to ByteArrayDeserializer
consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
val consumers = (0 until numStreams) map { i =>
consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
}
whitelist.getOrElse(throw new IllegalArgumentException("White list cannot be empty for new consumer"))
consumers.map(consumer => new MirrorMakerNewConsumer(consumer, customRebalanceListener, whitelist))
}
def commitOffsets(mirrorMakerConsumer: MirrorMakerBaseConsumer) {
if (!exitingOnSendFailure) {
trace("Committing offsets.")
try {
mirrorMakerConsumer.commit()
} catch {
case e: WakeupException =>
// we only call wakeup() once to close the consumer,
// so if we catch it in commit we can safely retry
// and re-throw to break the loop
mirrorMakerConsumer.commit()
throw e
case _: CommitFailedException =>
warn("Failed to commit offsets because the consumer group has rebalanced and assigned partitions to " +
"another instance. If you see this regularly, it could indicate that you need to either increase " +
s"the consumer's ${consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number of records " +
s"handled on each iteration with ${consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
}
} else {
info("Exiting on send failure, skip committing offsets.")
}
}
def cleanShutdown() {
if (isShuttingdown.compareAndSet(false, true)) {
info("Start clean shutdown.")
// Shutdown consumer threads.
info("Shutting down consumer threads.")
if (mirrorMakerThreads != null) {
mirrorMakerThreads.foreach(_.shutdown())
mirrorMakerThreads.foreach(_.awaitShutdown())
}
info("Closing producer.")
producer.close()
info("Kafka mirror maker shutdown successfully")
}
}
private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) {
val propertyValue = properties.getProperty(propertyName)
properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue))
if (properties.getProperty(propertyName) != defaultValue)
info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue))
}
class MirrorMakerThread(mirrorMakerConsumer: MirrorMakerBaseConsumer,
val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
private val threadName = "mirrormaker-thread-" + threadId
private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
private var lastOffsetCommitMs = System.currentTimeMillis()
@volatile private var shuttingDown: Boolean = false
this.logIdent = "[%s] ".format(threadName)
setName(threadName)
override def run() {
info("Starting mirror maker thread " + threadName)
try {
mirrorMakerConsumer.init()
// We need the two while loop to make sure when old consumer is used, even there is no message we
// still commit offset. When new consumer is used, this is handled by poll(timeout).
while (!exitingOnSendFailure && !shuttingDown) {
try {
while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
val data = mirrorMakerConsumer.receive()
trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
val records = messageHandler.handle(data)
records.asScala.foreach(producer.send)
maybeFlushAndCommitOffsets()
}
} catch {
case _: ConsumerTimeoutException =>
trace("Caught ConsumerTimeoutException, continue iteration.")
case _: WakeupException =>
trace("Caught ConsumerWakeupException, continue iteration.")
}
maybeFlushAndCommitOffsets()
}
} catch {
case t: Throwable =>
exitingOnSendFailure = true
fatal("Mirror maker thread failure due to ", t)
} finally {
CoreUtils.swallow {
info("Flushing producer.")
producer.flush()
// note that this commit is skipped if flush() fails which ensures that we don't lose messages
info("Committing consumer offsets.")
commitOffsets(mirrorMakerConsumer)
}
info("Shutting down consumer connectors.")
CoreUtils.swallow(mirrorMakerConsumer.stop())
CoreUtils.swallow(mirrorMakerConsumer.cleanup())
shutdownLatch.countDown()
info("Mirror maker thread stopped")
// if it exits accidentally, stop the entire mirror maker
if (!isShuttingdown.get()) {
fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
sys.exit(-1)
}
}
}
def maybeFlushAndCommitOffsets() {
val commitRequested = mirrorMakerConsumer.commitRequested()
if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
debug("Committing MirrorMaker state.")
producer.flush()
commitOffsets(mirrorMakerConsumer)
lastOffsetCommitMs = System.currentTimeMillis()
if (commitRequested)
mirrorMakerConsumer.notifyCommit()
}
}
def shutdown() {
try {
info(threadName + " shutting down")
shuttingDown = true
mirrorMakerConsumer.stop()
}
catch {
case _: InterruptedException =>
warn("Interrupt during shutdown of the mirror maker thread")
}
}
def awaitShutdown() {
try {
shutdownLatch.await()
info("Mirror maker thread shutdown complete")
} catch {
case _: InterruptedException =>
warn("Shutdown of the mirror maker thread interrupted")
}
}
}
private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer {
def init()
def commitRequested(): Boolean
def notifyCommit()
def requestAndWaitForCommit()
def hasData : Boolean
}
private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector,
filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
private var immediateCommitRequested: Boolean = false
override def init() {
// Creating one stream per each connector instance
val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(streams.size == 1)
val stream = streams.head
iter = stream.iterator()
}
override def requestAndWaitForCommit() {
this.synchronized {
// skip wait() if mirrorMakerConsumer has not been initialized
if (iter != null) {
immediateCommitRequested = true
this.wait()
}
}
}
override def notifyCommit() {
this.synchronized {
immediateCommitRequested = false
this.notifyAll()
}
}
override def commitRequested(): Boolean = {
this.synchronized {
immediateCommitRequested
}
}
override def hasData = iter.hasNext()
override def receive() : BaseConsumerRecord = {
val messageAndMetadata = iter.next()
BaseConsumerRecord(messageAndMetadata.topic,
messageAndMetadata.partition,
messageAndMetadata.offset,
messageAndMetadata.timestamp,
messageAndMetadata.timestampType,
messageAndMetadata.key,
messageAndMetadata.message)
}
override def stop() {
// Do nothing
}
override def cleanup() {
connector.shutdown()
}
override def commit() {
connector.commitOffsets
}
}
// Only for testing
private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
whitelistOpt: Option[String])
extends MirrorMakerBaseConsumer {
val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
// TODO: we need to manually maintain the consumed offsets for new consumer
// since its internal consumed position is updated in batch rather than one
// record at a time, this can be resolved when we break the unification of both consumers
private val offsets = new HashMap[TopicPartition, Long]()
override def init() {
debug("Initiating new consumer")
val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
whitelistOpt.foreach { whitelist =>
try {
consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener)
} catch {
case pse: RuntimeException =>
error(s"Invalid expression syntax: $whitelist")
throw pse
}
}
}
override def requestAndWaitForCommit() {
// Do nothing
}
override def notifyCommit() {
// Do nothing
}
override def commitRequested(): Boolean = {
false
}
override def hasData = true
override def receive() : BaseConsumerRecord = {
if (recordIter == null || !recordIter.hasNext) {
// In scenarios where data does not arrive within offsetCommitIntervalMs and
// offsetCommitIntervalMs is less than poll's timeout, offset commit will be delayed for any
// uncommitted record since last poll. Using one second as poll's timeout ensures that
// offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset
// commit.
recordIter = consumer.poll(1000).iterator
if (!recordIter.hasNext)
throw new ConsumerTimeoutException
}
val record = recordIter.next()
val tp = new TopicPartition(record.topic, record.partition)
offsets.put(tp, record.offset + 1)
BaseConsumerRecord(record.topic,
record.partition,
record.offset,
record.timestamp,
record.timestampType,
record.key,
record.value)
}
override def stop() {
consumer.wakeup()
}
override def cleanup() {
consumer.close()
}
override def commit() {
consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))}.asJava)
offsets.clear()
}
}
private class InternalRebalanceListenerForNewConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
customRebalanceListenerForNewConsumer: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])
extends org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
producer.flush()
commitOffsets(mirrorMakerConsumer)
customRebalanceListenerForNewConsumer.foreach(_.onPartitionsRevoked(partitions))
}
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
customRebalanceListenerForNewConsumer.foreach(_.onPartitionsAssigned(partitions))
}
}
private class InternalRebalanceListenerForOldConsumer(mirrorMakerConsumer: MirrorMakerBaseConsumer,
customRebalanceListenerForOldConsumer: Option[ConsumerRebalanceListener])
extends ConsumerRebalanceListener {
override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
// The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset
mirrorMakerConsumer.requestAndWaitForCommit()
// invoke custom consumer rebalance listener
customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership))
}
override def beforeStartingFetchers(consumerId: String,
partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
customRebalanceListenerForOldConsumer.foreach(_.beforeStartingFetchers(consumerId, partitionAssignment))
}
}
private[tools] class MirrorMakerProducer(val producerProps: Properties) {
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
def send(record: ProducerRecord[Array[Byte], Array[Byte]]) {
if (sync) {
this.producer.send(record).get()
} else {
this.producer.send(record,
new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
}
}
def flush() {
this.producer.flush()
}
def close() {
this.producer.close()
}
def close(timeout: Long) {
this.producer.close(timeout, TimeUnit.MILLISECONDS)
}
}
private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte])
extends ErrorLoggingCallback(topic, key, value, false) {
override def onCompletion(metadata: RecordMetadata, exception: Exception) {
if (exception != null) {
// Use default call back to log error. This means the max retries of producer has reached and message
// still could not be sent.
super.onCompletion(metadata, exception)
// If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
if (abortOnSendFailure) {
info("Closing producer due to send failure.")
exitingOnSendFailure = true
producer.close(0)
}
numDroppedMessages.incrementAndGet()
}
}
}
/**
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}
private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value))
}
}
}