blob: 8a5b53cb0ac2530f6e877e253594d4b3824b55e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer
import async.AsyncProducerConfigShared
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.common.InvalidConfigException
class ProducerConfig(val props: Properties) extends ZKConfig(props)
with AsyncProducerConfigShared with SyncProducerConfigShared{
/** For bypassing zookeeper based auto partition discovery, use this config *
* to pass in static broker and per-broker partition information. Format- *
* brokerid1:host1:port1, brokerid2:host2:port2*/
val brokerList = Utils.getString(props, "broker.list", null)
if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
/**
* If DefaultEventHandler is used, this specifies the number of times to
* retry if an error is encountered during send. Currently, it is only
* appropriate when broker.list points to a VIP. If the zk.connect option
* is used instead, this will not have any effect because with the zk-based
* producer, brokers are not re-selected upon retry. So retries would go to
* the same (potentially still down) broker. (KAFKA-253 will help address
* this.)
*/
val numRetries = Utils.getInt(props, "num.retries", 0)
/** If both broker.list and zk.connect options are specified, throw an exception */
if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
/** the partitioner class for partitioning events amongst sub-topics */
val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
/** this parameter specifies whether the messages are sent asynchronously *
* or not. Valid values are - async for asynchronous send *
* sync for synchronous send */
val producerType = Utils.getString(props, "producer.type", "sync")
/**
* This parameter allows you to specify the compression codec for all data generated *
* by this producer. The default is NoCompressionCodec
*/
val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
/** This parameter allows you to set whether compression should be turned *
* on for particular topics
*
* If the compression codec is anything other than NoCompressionCodec,
*
* Enable compression only for specified topics if any
*
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
*
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
* updated by the zookeeper watcher listeners. During some events like a broker bounce, the
* producer ZK cache can get into an inconsistent state, for a small time period. In this time
* period, it could end up picking a broker partition that is unavailable. When this happens, the
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3)
}