blob: 78467bf1298653242e0b14d810e53a92a2ed139f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.config
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ ZkUtils, ZKStringSerializer }
import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
import org.apache.samza.SamzaException
import org.apache.samza.util.Util
import collection.JavaConversions._
import org.apache.samza.util.Logging
import scala.collection._
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.system.SystemStream
import scala.util.Sorting
* Dynamically determine the Kafka topics to use as input streams to the task via a regular expression.
* For each topic that matches the regular expression, generate a series of config values for it and
* add it to the task's input streams setting.
* job.config.rewriter.regex-input-rewriter.regex=.*stream
* job.config.rewriter.regex-input-rewriter.system=kafka
* Would result in:
* task.inputs=kafka.somestream
* @see samza.config.KafkaConfig.getRegexResolvedStreams
class RegExTopicGenerator extends ConfigRewriter with Logging {
def rewrite(rewriterName: String, config: Config): Config = {
val regex = config
.getOrElse(throw new SamzaException("No %s defined in config" format REGEX_RESOLVED_STREAMS))
val systemName = config
.getOrElse(throw new SamzaException("No system defined for %s." format rewriterName))
val topics = getTopicsFromZK(rewriterName, config)
val existingInputStreams = config.getInputStreams
val newInputStreams = new mutable.HashSet[SystemStream]
val keysAndValsToAdd = new mutable.HashMap[String, String]
// Find all the topics that match this regex
val matchingStreams = topics
.map(new SystemStream(systemName, _))
for (m <- matchingStreams) {
info("Generating new configs for matching stream %s." format m)
if (existingInputStreams.contains(m)) {
throw new SamzaException("Regex '%s' matches existing, statically defined input %s." format (regex, m))
// For each topic that matched, generate all the specified configs
.foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + ".streams." + m.getStream + "." + kv._1, kv._2))
// Build new inputs
info("Generated config values for %d new topics" format newInputStreams.size)
val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams)
.sortWith(_ < _)
new MapConfig((keysAndValsToAdd ++ config) += inputStreams)
def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
val systemName = config
.getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName)
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
try {
} finally {