blob: c531cd1feea649202438500a7762833fa7fec34b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import java.util.Properties
import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
object ConsumerConfig {
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 10 * 1000
val MaxQueuedChunks = 10
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.SmallestTimeString
val ConsumerTimeoutMs = -1
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
}
class ConsumerConfig(props: Properties) extends ZKConfig(props) {
import ConsumerConfig._
/** a string that uniquely identifies a set of consumers within the same consumer group */
val groupId = Utils.getString(props, "groupid")
/** consumer id: generated automatically if not set.
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */
if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
/** the socket timeout for network requests */
val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
/** to avoid repeatedly polling a broker node which has no new data
we will backoff every time we get an empty set from the broker*/
val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
/** max number of retries during rebalance */
val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
/** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
}