blob: 0b32aeeffcd9d4755ac90573448d197d3f729749 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.util.Properties
import scala.collection._
import kafka.common._
/**
* Configuration settings for a log
* @param segmentSize The soft maximum for the size of a segment file in the log
* @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
* @param flushInterval The number of messages that can be written to the log before a flush is forced
* @param flushMs The amount of time the log can have dirty data before a flush is forced
* @param retentionSize The approximate total number of bytes this log can use
* @param retentionMs The age approximate maximum age of the last segment that is retained
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param dedupe Should old segments in this log be deleted or deduplicated?
*/
case class LogConfig(val segmentSize: Int = 1024*1024,
val segmentMs: Long = Long.MaxValue,
val flushInterval: Long = Long.MaxValue,
val flushMs: Long = Long.MaxValue,
val retentionSize: Long = Long.MaxValue,
val retentionMs: Long = Long.MaxValue,
val maxMessageSize: Int = Int.MaxValue,
val maxIndexSize: Int = 1024*1024,
val indexInterval: Int = 4096,
val fileDeleteDelayMs: Long = 60*1000,
val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
val minCleanableRatio: Double = 0.5,
val dedupe: Boolean = false) {
def toProps: Properties = {
val props = new Properties()
import LogConfig._
props.put(SegmentBytesProp, segmentSize.toString)
props.put(SegmentMsProp, segmentMs.toString)
props.put(SegmentIndexBytesProp, maxIndexSize.toString)
props.put(FlushMessagesProp, flushInterval.toString)
props.put(FlushMsProp, flushMs.toString)
props.put(RetentionBytesProp, retentionSize.toString)
props.put(RententionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
props
}
}
object LogConfig {
val SegmentBytesProp = "segment.bytes"
val SegmentMsProp = "segment.ms"
val SegmentIndexBytesProp = "segment.index.bytes"
val FlushMessagesProp = "flush.messages"
val FlushMsProp = "flush.ms"
val RetentionBytesProp = "retention.bytes"
val RententionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = "delete.retention.ms"
val FileDeleteDelayMsProp = "file.delete.delay.ms"
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
val ConfigNames = Set(SegmentBytesProp,
SegmentMsProp,
SegmentIndexBytesProp,
FlushMessagesProp,
FlushMsProp,
RetentionBytesProp,
RententionMsProp,
MaxMessageBytesProp,
IndexIntervalBytesProp,
FileDeleteDelayMsProp,
DeleteRetentionMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp)
/**
* Parse the given properties instance into a LogConfig object
*/
def fromProps(props: Properties): LogConfig = {
new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt,
segmentMs = props.getProperty(SegmentMsProp).toLong,
maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt,
flushInterval = props.getProperty(FlushMessagesProp).toLong,
flushMs = props.getProperty(FlushMsProp).toLong,
retentionSize = props.getProperty(RetentionBytesProp).toLong,
retentionMs = props.getProperty(RententionMsProp).toLong,
maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
}
/**
* Create a log config instance using the given properties and defaults
*/
def fromProps(defaults: Properties, overrides: Properties): LogConfig = {
val props = new Properties(defaults)
props.putAll(overrides)
fromProps(props)
}
/**
* Check that property names are valid
*/
def validateNames(props: Properties) {
import JavaConversions._
for(name <- props.keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
}
/**
* Check that the given properties contain only valid log config names, and that all values can be parsed.
*/
def validate(props: Properties) {
validateNames(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
}