blob: 9a44d46d29a1997958a9d2bbf7be0bde860fff64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka
import org.apache.samza.util.Logging
import org.apache.kafka.clients.producer.{RecordMetadata, Callback, ProducerRecord, Producer}
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.TimerUtils
import org.apache.samza.util.KafkaUtil
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.{Map => javaMap}
import org.apache.samza.SamzaException
import org.apache.kafka.common.errors.RetriableException
import org.apache.kafka.common.PartitionInfo
import java.util
import java.util.concurrent.Future
import scala.collection.JavaConversions._
class KafkaSystemProducer(systemName: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
getProducer: () => Producer[Array[Byte], Array[Byte]],
metrics: KafkaSystemProducerMetrics,
val clock: () => Long = () => System.nanoTime) extends SystemProducer with Logging with TimerUtils
{
var producer: Producer[Array[Byte], Array[Byte]] = null
val latestFuture: javaMap[String, Future[RecordMetadata]] = new util.HashMap[String, Future[RecordMetadata]]()
val sendFailed: AtomicBoolean = new AtomicBoolean(false)
var exceptionThrown: AtomicReference[Exception] = new AtomicReference[Exception]()
val StreamNameNullOrEmptyErrorMsg = "Stream Name should be specified in the stream configuration file.";
def start() {
}
def stop() {
if (producer != null) {
latestFuture.keys.foreach(flush(_))
producer.close
producer = null
}
}
def register(source: String) {
if(latestFuture.containsKey(source)) {
throw new SamzaException("%s is already registered with the %s system producer" format (source, systemName))
}
latestFuture.put(source, null)
}
def send(source: String, envelope: OutgoingMessageEnvelope) {
trace("Enqueueing message: %s, %s." format (source, envelope))
if(producer == null) {
info("Creating a new producer for system %s." format systemName)
producer = getProducer()
debug("Created a new producer for system %s." format systemName)
}
// Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
// Any kind of custom partitioning has to be done on the client-side
val topicName = envelope.getSystemStream.getStream
if (topicName == null || topicName == "") {
throw new IllegalArgumentException(StreamNameNullOrEmptyErrorMsg)
}
val partitions: java.util.List[PartitionInfo] = producer.partitionsFor(topicName)
val partitionKey = if(envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
val record = new ProducerRecord(envelope.getSystemStream.getStream,
partitionKey,
envelope.getKey.asInstanceOf[Array[Byte]],
envelope.getMessage.asInstanceOf[Array[Byte]])
sendFailed.set(false)
retryBackoff.run(
loop => {
if(sendFailed.get()) {
throw exceptionThrown.get()
}
val futureRef: Future[RecordMetadata] =
producer.send(record, new Callback {
def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) {
//send was successful. Don't retry
metrics.sendSuccess.inc
}
else {
//If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
//Hence, fail container!
exceptionThrown.compareAndSet(null, exception)
sendFailed.set(true)
}
}
})
latestFuture.put(source, futureRef)
metrics.sends.inc
if(!sendFailed.get())
loop.done
},
(exception, loop) => {
if(exception != null && !exception.isInstanceOf[RetriableException]) { // Exception is thrown & not retriable
debug("Exception detail : ", exception)
//Close producer
stop()
producer = null
//Mark loop as done as we are not going to retry
loop.done
metrics.sendFailed.inc
throw new SamzaException("Failed to send message. Exception:\n %s".format(exception))
} else {
warn("Retrying send messsage due to RetriableException - %s. Turn on debugging to get a full stack trace".format(exception))
debug("Exception detail:", exception)
metrics.retries.inc
}
}
)
}
def flush(source: String) {
updateTimer(metrics.flushNs) {
metrics.flushes.inc
//if latestFuture is null, it probably means that there has been no calls to "send" messages
//Hence, nothing to do in flush
if(latestFuture.get(source) != null) {
while (!latestFuture.get(source).isDone && !sendFailed.get()) {
//do nothing
}
if (sendFailed.get()) {
logger.error("Unable to send message from %s to system %s" format(source, systemName))
//Close producer.
if (producer != null) {
producer.close
}
producer = null
metrics.flushFailed.inc
throw new SamzaException("Unable to send message from %s to system %s." format(source, systemName), exceptionThrown.get)
} else {
trace("Flushed %s." format (source))
}
}
}
}
}