blob: f4e3f48661078e50dcd847e38c7d6bb009be1b77 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.config
import org.apache.samza.config.JobConfig._
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
class TestRegExTopicGenerator {
private def REWRITER_NAME = "test"
private def getRegexConfigKey = REGEX_RESOLVED_STREAMS format REWRITER_NAME
private def getRegexConfigSystem = REGEX_RESOLVED_SYSTEM format REWRITER_NAME
private def getRegexConfigInherited = REGEX_INHERITED_CONFIG format REWRITER_NAME
def configsAreRewrittenCorrectly = {
val unrelated = "unrelated.key.howdy" -> "should.survive"
val map = Map(
getRegexConfigKey -> ".*cat",
getRegexConfigSystem -> "test",
getRegexConfigInherited + ".ford" -> "mustang",
getRegexConfigInherited + ".alfa.romeo" -> "spider",
getRegexConfigInherited + ".b.triumph" -> "spitfire",
val config = new MapConfig(map.asJava)
// Don't actually talk to ZooKeeper
val rewriter = new RegExTopicGenerator() {
override def getTopicsFromSystemAdmin(rewriterName: String, config: Config): Seq[String] = List("catdog", "dogtired", "cow", "scaredycat", "Homer", "crazycat")
val rewritten = rewriter.rewrite(REWRITER_NAME, config)
val expected = Map(
"task.inputs" -> "test.crazycat,test.scaredycat",
"" -> "mustang",
"systems.test.streams.scaredycat.alfa.romeo" -> "spider",
"systems.test.streams.scaredycat.b.triumph" -> "spitfire",
"" -> "mustang",
"systems.test.streams.crazycat.alfa.romeo" -> "spider",
"systems.test.streams.crazycat.b.triumph" -> "spitfire",
expected.foreach(e => assertEquals(e._2, rewritten.get(e._1))) // Compiler bug in 2.8 requires this dumb syntax
val inputStreams = rewritten.get(TaskConfig.INPUT_STREAMS).split(",").toSet
assertEquals(2, inputStreams.size)
assertEquals(Set("test.crazycat", "test.scaredycat"), inputStreams)
def emptyInputStreamsWorks = {
// input.streams is required but appears as the empty string, which has been problematic.
val map = Map(
TaskConfig.INPUT_STREAMS -> "",
getRegexConfigKey -> "yo.*",
getRegexConfigSystem -> "test",
getRegexConfigInherited + ".config.zorp" -> "morp")
val rewriter = new RegExTopicGenerator() {
override def getTopicsFromSystemAdmin(rewriterName: String, config: Config): Seq[String] = List("yoyoyo")
val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava))
assertEquals("test.yoyoyo", config.get(TaskConfig.INPUT_STREAMS))