| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.producer |
| |
| import java.util.concurrent.atomic.AtomicBoolean |
| import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} |
| |
| import kafka.common.{AppInfo, QueueFullException} |
| import kafka.metrics._ |
| import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} |
| import kafka.serializer.Encoder |
| import kafka.utils._ |
| |
| |
| class Producer[K,V](val config: ProducerConfig, |
| private val eventHandler: EventHandler[K,V]) // only for unit testing |
| extends Logging { |
| |
| private val hasShutdown = new AtomicBoolean(false) |
| private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages) |
| |
| private var sync: Boolean = true |
| private var producerSendThread: ProducerSendThread[K,V] = null |
| private val lock = new Object() |
| |
| config.producerType match { |
| case "sync" => |
| case "async" => |
| sync = false |
| producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, |
| queue, |
| eventHandler, |
| config.queueBufferingMaxMs, |
| config.batchNumMessages, |
| config.clientId) |
| producerSendThread.start() |
| } |
| |
| private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) |
| |
| KafkaMetricsReporter.startReporters(config.props) |
| AppInfo.registerInfo() |
| |
| def this(config: ProducerConfig) = |
| this(config, |
| new DefaultEventHandler[K,V](config, |
| Utils.createObject[Partitioner](config.partitionerClass, config.props), |
| Utils.createObject[Encoder[V]](config.serializerClass, config.props), |
| Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), |
| new ProducerPool(config))) |
| |
| /** |
| * Sends the data, partitioned by key to the topic using either the |
| * synchronous or the asynchronous producer |
| * @param messages the producer data object that encapsulates the topic, key and message data |
| */ |
| def send(messages: KeyedMessage[K,V]*) { |
| lock synchronized { |
| if (hasShutdown.get) |
| throw new ProducerClosedException |
| recordStats(messages) |
| sync match { |
| case true => eventHandler.handle(messages) |
| case false => asyncSend(messages) |
| } |
| } |
| } |
| |
| private def recordStats(messages: Seq[KeyedMessage[K,V]]) { |
| for (message <- messages) { |
| producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark() |
| producerTopicStats.getProducerAllTopicsStats.messageRate.mark() |
| } |
| } |
| |
| private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { |
| for (message <- messages) { |
| val added = config.queueEnqueueTimeoutMs match { |
| case 0 => |
| queue.offer(message) |
| case _ => |
| try { |
| config.queueEnqueueTimeoutMs < 0 match { |
| case true => |
| queue.put(message) |
| true |
| case _ => |
| queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) |
| } |
| } |
| catch { |
| case e: InterruptedException => |
| false |
| } |
| } |
| if(!added) { |
| producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() |
| producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() |
| throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) |
| }else { |
| trace("Added to send queue an event: " + message.toString) |
| trace("Remaining queue size: " + queue.remainingCapacity) |
| } |
| } |
| } |
| |
| /** |
| * Close API to close the producer pool connections to all Kafka brokers. Also closes |
| * the zookeeper client connection if one exists |
| */ |
| def close() = { |
| lock synchronized { |
| val canShutdown = hasShutdown.compareAndSet(false, true) |
| if(canShutdown) { |
| info("Shutting down producer") |
| val startTime = System.nanoTime() |
| KafkaMetricsGroup.removeAllProducerMetrics(config.clientId) |
| if (producerSendThread != null) |
| producerSendThread.shutdown |
| eventHandler.close |
| info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms") |
| } |
| } |
| } |
| } |
| |
| |