blob: db752dd139c1cd53366556e2bbe5895010bd849c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.utils.{Utils, ZKConfig}
import kafka.message.Message
/**
* Configuration settings for the kafka server
*/
class KafkaConfig(props: Properties) extends ZKConfig(props) {
/* the port to listen and accept connections on */
val port: Int = Utils.getInt(props, "port", 6667)
/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
val hostName: String = Utils.getString(props, "hostname", null)
/* the broker id for this server */
val brokerId: Int = Utils.getInt(props, "brokerid")
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
/* the SO_RCVBUFF buffer of the socket sever sockets */
val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
/* the maximum number of bytes in a socket request */
val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
/* the maximum size of message that the server can receive */
val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))
/* the number of worker threads that the server uses for handling all client requests*/
val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
/* the interval in which to measure performance statistics */
val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
/* the default number of log partitions per topic */
val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
/* the directory in which the log data is kept */
val logDir = Utils.getString(props, "log.dir")
/* the maximum size of a single log file */
val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))
/* the maximum time before a new log segment is rolled out */
val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
/* the number of hours to keep a log file before deleting it */
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
/* the maximum size of the log before deleting it */
val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
/* the maximum size of the log for some specific topic before deleting it */
val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
/* enable zookeeper registration in the server */
val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
/* the maximum length of topic name*/
val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue))
}