blob: 69a99664c1b81cabe6a3aaae1577893f6d71b0eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config
import java.util
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import java.util.Properties
import com.google.common.collect.ImmutableMap
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Logging, StreamUtil}
import scala.collection.JavaConverters._
object KafkaConfig {
val TOPIC_REPLICATION_FACTOR = "replication.factor"
val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
val SEGMENT_BYTES = "segment.bytes"
val MAX_MESSAGE_BYTES = "max.message.bytes"
// The default max message bytes for log compact topic
val DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES = "1000012"
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
val CHECKPOINT_MAX_MESSAGE_BYTES = "task.checkpoint." + MAX_MESSAGE_BYTES
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
val CHANGELOG_MAX_MESSAGE_BYTES = "stores.%s.changelog." + MAX_MESSAGE_BYTES
// Helper regular expression definitions to extract/match configurations
val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
val JOB_COORDINATOR_MAX_MESSAGE_BYTES = "job.coordinator." + MAX_MESSAGE_BYTES
val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"
val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers"
val PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s"
val CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect"
/**
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
*/
val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold"
val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
/**
* Defines how many bytes to use for the buffered prefetch messages for job as a whole.
* The bytes for a single system/stream/partition are computed based on this.
* This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
*/
val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes"
val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
/**
* Gets the System to use for reading/writing checkpoints. Uses the following precedence.
*
* 1. If task.checkpoint.system is defined, that value is used.
* 2. If job.default.system is defined, that value is used.
* 3. None
*/
def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM,
new JobConfig(config).getDefaultSystem().orElse(null)))
/**
* Gets the replication factor for the checkpoint topic. Uses the following precedence.
*
* 1. If task.checkpoint.replication.factor is configured, that value is used.
* 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used.
* 3. None
*
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
*/
def getCheckpointReplicationFactor() = {
val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3")
val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor)
Option(replicationFactor)
}
def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
defaultReplicationFactor
}
/**
* Gets the max message bytes for the checkpoint topic. Uses the following precedence.
*
* 1. If task.checkpoint.max.message.bytes is configured, that value is used.
* 2. If systems.checkpoint-system.default.stream.max.message.bytes is configured, that value is used.
* 3. 1000012
*
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
*/
def getCheckpointMaxMessageBytes() = {
val defaultmessageBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES.toInt)
getInt(KafkaConfig.CHECKPOINT_MAX_MESSAGE_BYTES, defaultmessageBytes)
}
/**
* Gets the segment bytes for the checkpoint topic. Uses the following precedence.
*
* 1. If task.checkpoint.segment.bytes is configured, that value is used.
* 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used.
* 3. None
*
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
*/
def getCheckpointSegmentBytes() = {
val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
}
/**
* Gets the max message bytes for the coordinator topic. Uses the following precedence.
*
* 1. If job.coordinator.max.message.bytes is configured, that value is used.
* 2. If systems.coordinator-system.default.stream.max.message.bytes is configured, that value is used.
* 3. 1000012
*
* Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
*/
def getCoordinatorMaxMessageByte = getOption(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES) match {
case Some(maxMessageBytes) => maxMessageBytes
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
val systemMaxMessageBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
systemMaxMessageBytes
}
/**
* Gets the replication factor for the coordinator topic. Uses the following precedence.
*
* 1. If job.coordinator.replication.factor is configured, that value is used.
* 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used.
* 3. 3
*
* Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
*/
def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match {
case Some(rplFactor) => rplFactor
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
systemReplicationFactor
}
/**
* Gets the segment bytes for the coordinator topic. Uses the following precedence.
*
* 1. If job.coordinator.segment.bytes is configured, that value is used.
* 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used.
* 3. None
*
* Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
*/
def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match {
case Some(segBytes) => segBytes
case _ =>
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
segBytes
}
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
/**
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
* are defined with this property in the config.
*/
def getFetchMessageMaxBytesTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
subConf
.asScala
.filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
.map {
case (fetchMessageMaxBytes, fetchSizeValue) =>
(fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
}.toMap
}
/**
* Returns a map of topic -> auto.offset.reset value for all streams that
* are defined with this property in the config.
*/
def getAutoOffsetResetTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
subConf
.asScala
.filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
.map {
case (topicAutoOffsetReset, resetValue) =>
(topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
}.toMap
}
/**
* Gets the replication factor for the changelog topics. Uses the following precedence.
*
* 1. If stores.{storeName}.changelog.replication.factor is configured, that value is used.
* 2. If it is not configured, the value configured for stores.default.changelog.replication.factor is used.
* 3. If it is not configured, the RF value configured for the store's changelog's system, configured using
* stores.{storeName}.changelog={systemName}.{streamName}, is used.
* 4. If it is not configured, the value for the RF of job.changelog.system is used.
* 5. If it is not configured, the value for the RF of job.default.system is used.
* 6. If it is not configured, the RF is chosen as 2.
*/
def getChangelogStreamReplicationFactor(storeName: String) = {
var changelogRF = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format storeName)
if(!changelogRF.isDefined) {
changelogRF = getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR)
}
if(!changelogRF.isDefined) {
val changelogSystemStream = new StorageConfig(config).getChangelogStream(storeName)
if (!changelogSystemStream.isPresent) {
throw new SamzaException("Cannot deduce replication factor. Changelog system-stream not defined for store " + storeName)
}
val changelogSystem = StreamUtil.getSystemStreamFromNames(changelogSystemStream.get()).getSystem
changelogRF = Option.apply(getSystemDefaultReplicationFactor(changelogSystem, null))
}
changelogRF.getOrElse(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR)
}
/**
* Gets the max message bytes for the changelog topics. Uses the following precedence.
*
* 1. If stores.myStore.changelog.max.message.bytes is configured, that value is used.
* 2. If systems.changelog-system.default.stream.max.message.bytes is configured, that value is used.
* 3. 1000012
*
* Note that the changelog-system has a similar precedence. See [[StorageConfig]]
*/
def getChangelogStreamMaxMessageByte(name: String) = getOption(KafkaConfig.CHANGELOG_MAX_MESSAGE_BYTES format name) match {
case Some(maxMessageBytes) => maxMessageBytes
case _ =>
val changelogSystem = StreamUtil.getSystemStreamFromNames(JavaOptionals.toRichOptional(new StorageConfig(config).getChangelogStream(name)).toOption.getOrElse(throw new SamzaException("System-stream not defined for store:"+name))).getSystem
val systemMaxMessageBytes = new SystemConfig(config).getDefaultStreamProperties(changelogSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
systemMaxMessageBytes
}
// The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
def getKafkaChangelogEnabledStores(): util.HashMap[String, String] = {
val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
//var storeToChangelog = Map[String, String]()
var storeToChangelog :util.HashMap[String, String] = new util.HashMap();
val storageConfig = new StorageConfig(config)
val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
for ((changelogConfig, cn) <- changelogConfigs) {
// Lookup the factory for this particular stream and verify if it's a kafka system
val matcher = pattern.matcher(changelogConfig)
val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
JavaOptionals.toRichOptional(storageConfig.getChangelogStream(storeName)).toOption.foreach(changelogName => {
val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
storeToChangelog.put(storeName, systemStream.getStream)
})
}
storeToChangelog
}
// Get all kafka properties for changelog stream topic creation
def getChangelogKafkaProperties(name: String) = {
val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
val kafkaChangeLogProperties = new Properties
// SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
// 1.0.2, or 1.1.0 (see KAFKA-6568)
// Adjust changelog topic setting, when TTL is set on a RocksDB store
// - Disable log compaction on Kafka changelog topic
// - Set topic TTL to be the same as RocksDB TTL
Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
case Some(rocksDbTtl) =>
if (!rocksDbTtl.isEmpty && rocksDbTtl.toLong < 0) {
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
} else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
}
}
case _ =>
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
}
val storageConfig = new StorageConfig(config)
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name)))
// To enable transactional state, we will need to avoid the head of the changelog
// (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates.
// We use min.compaction.log.ms to control the compaction time.
if (new TaskConfig(this).getTransactionalStateRestoreEnabled) {
kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS,
String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name)))
}
filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
kafkaChangeLogProperties
}
// Set the checkpoint topic configs to have a very small segment size and
// enable log compaction. This keeps job startup time small since there
// are fewer useless (overwritten) messages to read from the checkpoint
// topic.
def getCheckpointTopicProperties() = {
val segmentBytes: Int = getCheckpointSegmentBytes()
val maxMessageBytes: Int = getCheckpointMaxMessageBytes()
val appConfig = new ApplicationConfig(config)
val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
val properties = new Properties()
if (isStreamMode) {
properties.putAll(ImmutableMap.of(
"cleanup.policy", "compact",
"segment.bytes", String.valueOf(segmentBytes),
"max.message.bytes", String.valueOf(maxMessageBytes)))
} else {
properties.putAll(ImmutableMap.of(
"cleanup.policy", "compact,delete",
"retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
"segment.bytes", String.valueOf(segmentBytes),
"max.message.bytes", String.valueOf(maxMessageBytes)))
}
properties
}
def getKafkaSystemProducerConfig( systemName: String,
clientId: String,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
val producerProps = new util.HashMap[String, String]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
producerProps.putAll(injectedProps.asJava)
new KafkaProducerConfig(systemName, clientId, producerProps)
}
}
class KafkaProducerConfig(val systemName: String,
val clientId: String = "",
properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
// Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
val RECONNECT_BACKOFF_MS_DEFAULT = 10L
//Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
val LINGER_MS_DEFAULT: java.lang.Integer = 10
def getProducerProperties = {
val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
producerProperties.putAll(properties)
if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
}
if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
}
if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
&& producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
} else {
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
}
if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
}
producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
}
producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
producerProperties
}
val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
.getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
val bootsrapServers = {
if (properties.containsKey("metadata.broker.list"))
warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
"to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
Option(properties.get("bootstrap.servers"))
.getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
.asInstanceOf[String]
}
}