blob: c465da58019f84b44c49d8556674accfda1857ac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi.producer
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
{
def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
* @param message the producer data object that encapsulates the topic, key and message data
*/
def send(message: KeyedMessage[K,V]) {
underlying.send(message)
}
/**
* Use this API to send data to multiple topics
* @param messages list of producer data objects that encapsulate the topic, key and message data
*/
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
}
/**
* Close API to close the producer pool connections to all Kafka brokers. Also closes
* the zookeeper client connection if one exists
*/
def close = underlying.close
}