SAMZA-2477: JobModel isn't updated with latest "task.inputs" if enabled regex topic rewriter (#1299)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e2dbf3f..37162c4 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -268,23 +268,10 @@
*/
private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = {
- def invokeRegexTopicRewriter(config: Config): Config = {
- val jobConfig = new JobConfig(config)
- JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
- case Some(rewriters) => rewriters.split(",").
- filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
- .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
- .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
- foldLeft(config)(ConfigUtil.applyRewriter(_, _))
- case _ => config
- }
- }
-
- val configAfterRegexTopicRewrite = invokeRegexTopicRewriter(config)
- val taskConfigAfterRegexTopicRewrite = new TaskConfig(configAfterRegexTopicRewrite)
+ val taskConfig = new TaskConfig(config)
// Expand regex input, if a regex-rewriter is defined in config
val inputSystemStreams =
- JavaConverters.asScalaSetConverter(taskConfigAfterRegexTopicRewrite.getInputStreams).asScala.toSet
+ JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
// Get the set of partitions for each SystemStream from the stream metadata
streamMetadataCache
@@ -339,20 +326,40 @@
}
/**
+ * Refresh Kafka topic list used as input streams if enabled {@link org.apache.samza.config.RegExTopicGenerator}
+ * @param config Samza job config
+ * @return refreshed config
+ */
+ private def refreshConfigByRegexTopicRewriter(config: Config): Config = {
+ val jobConfig = new JobConfig(config)
+ JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
+ case Some(rewriters) => rewriters.split(",").
+ filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
+ foldLeft(config)(ConfigUtil.applyRewriter(_, _))
+ case _ => config
+ }
+ }
+
+ /**
* Does the following:
* 1. Fetches metadata of the input streams defined in configuration through {@param streamMetadataCache}.
* 2. Applies the {@see SystemStreamPartitionGrouper}, {@see TaskNameGrouper} defined in the configuration
* to build the {@see JobModel}.
- * @param config the configuration of the job.
+ * @param originalConfig the configuration of the job.
* @param changeLogPartitionMapping the task to changelog partition mapping of the job.
* @param streamMetadataCache the cache that holds the partition metadata of the input streams.
* @param grouperMetadata provides the historical metadata of the application.
* @return the built {@see JobModel}.
*/
- def readJobModel(config: Config,
+ def readJobModel(originalConfig: Config,
changeLogPartitionMapping: util.Map[TaskName, Integer],
streamMetadataCache: StreamMetadataCache,
grouperMetadata: GrouperMetadata): JobModel = {
+ // refresh config if enabled regex topic rewriter
+ val config = refreshConfigByRegexTopicRewriter(originalConfig)
+
val taskConfig = new TaskConfig(config)
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index fe94e83..9908da5 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -29,9 +29,12 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GroupByContainerCount;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
@@ -47,6 +50,7 @@
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.testUtils.MockHttpServer;
+import org.apache.samza.util.ConfigUtil;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Assert;
@@ -70,7 +74,7 @@
* Unit tests for {@link JobModelManager}
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class})
+@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class, ConfigUtil.class})
public class TestJobModelManager {
private final TaskAssignmentManager mockTaskManager = mock(TaskAssignmentManager.class);
private final Map<String, Map<String, String>> localityMappings = new HashMap<>();
@@ -303,4 +307,32 @@
testSystemStreamPartition4, ImmutableList.of("task-4")
));
}
+
+ @Test
+ public void testJobModelContainsLatestTaskInputsWhenEnabledRegexTopicRewriter() {
+ ImmutableMap<String, String> rewriterConfig = ImmutableMap.of(
+ JobConfig.CONFIG_REWRITERS, "regexTopicRewriter",
+ String.format(JobConfig.CONFIG_REWRITER_CLASS, "regexTopicRewriter"), RegExTopicGenerator.class.getCanonicalName()
+ );
+
+ Config config = new MapConfig(rewriterConfig);
+ String taskInputMatchedRegex = inputStream.getSystem() + "." + inputStream.getStream();
+ Config refreshedConfig = new MapConfig(ImmutableMap.<String, String>builder()
+ .putAll(rewriterConfig)
+ .put(TaskConfig.INPUT_STREAMS, taskInputMatchedRegex)
+ .build()
+ );
+
+ PowerMockito.mockStatic(ConfigUtil.class);
+ PowerMockito.when(ConfigUtil.applyRewriter(config, "regexTopicRewriter")).thenReturn(refreshedConfig);
+
+ Map<TaskName, Integer> changeLogPartitionMapping = new HashMap<>();
+ GrouperMetadata grouperMetadata = mock(GrouperMetadata.class);
+
+ JobModel jobModel =
+ JobModelManager.readJobModel(config, changeLogPartitionMapping, mockStreamMetadataCache, grouperMetadata);
+
+ Assert.assertNotNull(jobModel);
+ Assert.assertEquals(taskInputMatchedRegex, jobModel.getConfig().get(TaskConfig.INPUT_STREAMS));
+ }
}