package org.apache.samza.config
import java.util.regex.Pattern
import org.apache.samza.util.Util
import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
import kafka.consumer.ConsumerConfig
import java.util.{Properties, UUID}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.samza.SamzaException
import java.util
import scala.collection.JavaConverters._
import org.apache.samza.system.kafka.KafkaSystemFactory
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.kafka.common.serialization.ByteArraySerializer
object KafkaConfig {
val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
// Helper regular expression definitions to extract/match configurations
val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
* Defines how many bytes to use for the buffered prefetch messages for job as a whole.
* The bytes for a single system/stream/partition are computed based on this.
* This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// checkpoints
def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
* are defined with this property in the config.
def getFetchMessageMaxBytesTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
.filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
.map {
case (fetchMessageMaxBytes, fetchSizeValue) =>
(fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
* Returns a map of topic -> auto.offset.reset value for all streams that
* are defined with this property in the config.
def getAutoOffsetResetTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
.filterKeys(k => k.endsWith(""))
.map {
case (topicAutoOffsetReset, resetValue) =>
(topicAutoOffsetReset.replace("", ""), resetValue)
// regex resolver
def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
// The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
def getKafkaChangelogEnabledStores() = {
val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
var storeToChangelog = Map[String, String]()
for((changelogConfig, changelogName) <- changelogConfigs){
// Lookup the factory for this particular stream and verify if it's a kafka system
val systemStream = Util.getSystemStreamFromNames(changelogName)
val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
val matcher = pattern.matcher(changelogConfig)
val storeName = if(matcher.find()) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream)
storeToChangelog += storeName -> systemStream.getStream
// Get all kafka properties for changelog stream topic creation
def getChangelogKafkaProperties(name: String) = {
val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
val kafkaChangeLogProperties = new Properties
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
// kafka config
def getKafkaSystemConsumerConfig(
systemName: String,
clientId: String = "undefined-samza-consumer-%s" format UUID.randomUUID.toString,
groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.consumer." format systemName, true)
val consumerProps = new Properties()
consumerProps.put("", groupId)
consumerProps.put("", clientId)
new ConsumerConfig(consumerProps)
def getKafkaSystemProducerConfig(
systemName: String,
clientId: String = "undefined-samza-producer-%s" format UUID.randomUUID.toString,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
val producerProps = new util.HashMap[String, Object]()
producerProps.put("", clientId)
new KafkaProducerConfig(systemName, clientId, producerProps)
class KafkaProducerConfig(val systemName: String,
val clientId: String = "",
properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
// Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
//Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
def getProducerProperties = {
val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
if(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
if(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
&& producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
} else {
&& producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
} else {
// Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
// for producer failure
producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
val bootsrapServers = {
warn("Kafka producer configuration contains ''. This configuration is deprecated . Samza has been upgraded " +
"to use Kafka's new producer API. Please update your configurations based on the documentation at")
.getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))