blob: 18225118ee5a3c70898274b28fe4ff0a11b55a12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config
import java.util.regex.Pattern
import org.apache.samza.util.Util
import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
import kafka.consumer.ConsumerConfig
import java.util.{Properties, UUID}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.samza.SamzaException
import java.util
import scala.collection.JavaConverters._
import org.apache.samza.system.kafka.KafkaSystemFactory
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.kafka.common.serialization.ByteArraySerializer
object KafkaConfig {
val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
// Helper regular expression definitions to extract/match configurations
val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
/**
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
*/
val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
/**
* Defines how many bytes to use for the buffered prefetch messages for job as a whole.
* The bytes for a single system/stream/partition are computed based on this.
* This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
* the bytes limit + size of max message in the partition for a given stream.
* If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
*/
val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// checkpoints
def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
/**
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
* are defined with this property in the config.
*/
def getFetchMessageMaxBytesTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
subConf
.filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
.map {
case (fetchMessageMaxBytes, fetchSizeValue) =>
(fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
}.toMap
}
/**
* Returns a map of topic -> auto.offset.reset value for all streams that
* are defined with this property in the config.
*/
def getAutoOffsetResetTopics(systemName: String) = {
val subConf = config.subset("systems.%s.streams." format systemName, true)
subConf
.filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
.map {
case (topicAutoOffsetReset, resetValue) =>
(topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
}.toMap
}
// regex resolver
def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
// The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
def getKafkaChangelogEnabledStores() = {
val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
var storeToChangelog = Map[String, String]()
for((changelogConfig, changelogName) <- changelogConfigs){
// Lookup the factory for this particular stream and verify if it's a kafka system
val systemStream = Util.getSystemStreamFromNames(changelogName)
val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){
val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
val matcher = pattern.matcher(changelogConfig)
val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream)
storeToChangelog += storeName -> systemStream.getStream
}
}
storeToChangelog
}
// Get all kafka properties for changelog stream topic creation
def getChangelogKafkaProperties(name: String) = {
val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
val kafkaChangeLogProperties = new Properties
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
kafkaChangeLogProperties
}
// kafka config
def getKafkaSystemConsumerConfig(
systemName: String,
clientId: String = "undefined-samza-consumer-%s" format UUID.randomUUID.toString,
groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.consumer." format systemName, true)
val consumerProps = new Properties()
consumerProps.putAll(subConf)
consumerProps.put("group.id", groupId)
consumerProps.put("client.id", clientId)
consumerProps.putAll(injectedProps)
new ConsumerConfig(consumerProps)
}
def getKafkaSystemProducerConfig(
systemName: String,
clientId: String = "undefined-samza-producer-%s" format UUID.randomUUID.toString,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
val producerProps = new util.HashMap[String, Object]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
producerProps.putAll(injectedProps)
new KafkaProducerConfig(systemName, clientId, producerProps)
}
}
class KafkaProducerConfig(val systemName: String,
val clientId: String = "",
properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()) extends Logging {
// Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
val RECONNECT_BACKOFF_MS_DEFAULT = 10L
//Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
def getProducerProperties = {
val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
producerProperties.putAll(properties)
if(!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
}
if(!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
debug("%s undefined. Defaulting to %s." format (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
}
if(producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
&& producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
format (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
} else {
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
}
if(producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)
&& producerProperties.get(ProducerConfig.RETRIES_CONFIG).asInstanceOf[String].toInt < RETRIES_DEFAULT) {
warn("Samza does not provide producer failure handling. Consider setting '%s' to a large value, like Int.MAX." format ProducerConfig.RETRIES_CONFIG)
} else {
// Retries config is set to Max so that when all attempts fail, Samza also fails the send. We do not have any special handler
// for producer failure
producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
}
producerProperties
}
val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
.getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
val bootsrapServers = {
if(properties.containsKey("metadata.broker.list"))
warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
"to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
Option(properties.get("bootstrap.servers"))
.getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
.asInstanceOf[String]
}
}