SAMZA-2477: JobModel isn't updated with latest "task.inputs" if enabled regex topic rewriter (#1299)

diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e2dbf3f..37162c4 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -268,23 +268,10 @@
     */
   private def getInputStreamPartitions(config: Config, streamMetadataCache: StreamMetadataCache): Set[SystemStreamPartition] = {
 
-    def invokeRegexTopicRewriter(config: Config): Config = {
-      val jobConfig = new JobConfig(config)
-      JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
-        case Some(rewriters) => rewriters.split(",").
-          filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
-            .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-            .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
-          foldLeft(config)(ConfigUtil.applyRewriter(_, _))
-        case _ => config
-      }
-    }
-
-    val configAfterRegexTopicRewrite = invokeRegexTopicRewriter(config)
-    val taskConfigAfterRegexTopicRewrite = new TaskConfig(configAfterRegexTopicRewrite)
+    val taskConfig = new TaskConfig(config)
     // Expand regex input, if a regex-rewriter is defined in config
     val inputSystemStreams =
-      JavaConverters.asScalaSetConverter(taskConfigAfterRegexTopicRewrite.getInputStreams).asScala.toSet
+      JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
 
     // Get the set of partitions for each SystemStream from the stream metadata
     streamMetadataCache
@@ -339,20 +326,40 @@
   }
 
   /**
+   * Refresh Kafka topic list used as input streams if enabled {@link org.apache.samza.config.RegExTopicGenerator}
+   * @param config Samza job config
+   * @return refreshed config
+   */
+  private def refreshConfigByRegexTopicRewriter(config: Config): Config = {
+    val jobConfig = new JobConfig(config)
+    JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
+      case Some(rewriters) => rewriters.split(",").
+        filter(rewriterName => JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
+          .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+          .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
+        foldLeft(config)(ConfigUtil.applyRewriter(_, _))
+      case _ => config
+    }
+  }
+
+  /**
     * Does the following:
     * 1. Fetches metadata of the input streams defined in configuration through {@param streamMetadataCache}.
     * 2. Applies the {@see SystemStreamPartitionGrouper}, {@see TaskNameGrouper} defined in the configuration
     * to build the {@see JobModel}.
-    * @param config the configuration of the job.
+    * @param originalConfig the configuration of the job.
     * @param changeLogPartitionMapping the task to changelog partition mapping of the job.
     * @param streamMetadataCache the cache that holds the partition metadata of the input streams.
     * @param grouperMetadata provides the historical metadata of the application.
     * @return the built {@see JobModel}.
     */
-  def readJobModel(config: Config,
+  def readJobModel(originalConfig: Config,
                    changeLogPartitionMapping: util.Map[TaskName, Integer],
                    streamMetadataCache: StreamMetadataCache,
                    grouperMetadata: GrouperMetadata): JobModel = {
+    // refresh config if enabled regex topic rewriter
+    val config = refreshConfigByRegexTopicRewriter(originalConfig)
+
     val taskConfig = new TaskConfig(config)
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index fe94e83..9908da5 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -29,9 +29,12 @@
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.GroupByContainerCount;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
 import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
@@ -47,6 +50,7 @@
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.testUtils.MockHttpServer;
+import org.apache.samza.util.ConfigUtil;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.junit.Assert;
@@ -70,7 +74,7 @@
  * Unit tests for {@link JobModelManager}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class})
+@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class, ConfigUtil.class})
 public class TestJobModelManager {
   private final TaskAssignmentManager mockTaskManager = mock(TaskAssignmentManager.class);
   private final Map<String, Map<String, String>> localityMappings = new HashMap<>();
@@ -303,4 +307,32 @@
         testSystemStreamPartition4, ImmutableList.of("task-4")
     ));
   }
+
+  @Test
+  public void testJobModelContainsLatestTaskInputsWhenEnabledRegexTopicRewriter() {
+    ImmutableMap<String, String> rewriterConfig = ImmutableMap.of(
+        JobConfig.CONFIG_REWRITERS, "regexTopicRewriter",
+        String.format(JobConfig.CONFIG_REWRITER_CLASS, "regexTopicRewriter"), RegExTopicGenerator.class.getCanonicalName()
+    );
+
+    Config config = new MapConfig(rewriterConfig);
+    String taskInputMatchedRegex = inputStream.getSystem() + "." + inputStream.getStream();
+    Config refreshedConfig = new MapConfig(ImmutableMap.<String, String>builder()
+        .putAll(rewriterConfig)
+        .put(TaskConfig.INPUT_STREAMS, taskInputMatchedRegex)
+        .build()
+    );
+
+    PowerMockito.mockStatic(ConfigUtil.class);
+    PowerMockito.when(ConfigUtil.applyRewriter(config, "regexTopicRewriter")).thenReturn(refreshedConfig);
+
+    Map<TaskName, Integer> changeLogPartitionMapping = new HashMap<>();
+    GrouperMetadata grouperMetadata = mock(GrouperMetadata.class);
+
+    JobModel jobModel =
+        JobModelManager.readJobModel(config, changeLogPartitionMapping, mockStreamMetadataCache, grouperMetadata);
+
+    Assert.assertNotNull(jobModel);
+    Assert.assertEquals(taskInputMatchedRegex, jobModel.getConfig().get(TaskConfig.INPUT_STREAMS));
+  }
 }