SAMZA-2377: Batch mode should be computed based on generated configs (#1215)
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f300804..f8e0684 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -31,6 +31,7 @@
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -85,6 +86,10 @@
generatedConfig.putAll(getGeneratedConfig(runId));
}
+ if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
+ allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
+ }
+
// merge user-provided configuration with generated configuration. generated configuration has lower priority.
Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
@@ -126,18 +131,24 @@
generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
}
- StreamConfig streamConfig = new StreamConfig(userConfig);
- Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
- inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
- ApplicationConfig.ApplicationMode mode =
- inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
- ? ApplicationConfig.ApplicationMode.BATCH
- : ApplicationConfig.ApplicationMode.STREAM;
- generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
-
Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
generatedConfig.putAll(systemStreamConfigs);
+ StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig));
+ Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
+ inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
+
+ final ApplicationConfig.ApplicationMode mode;
+ if (inputStreamIds.isEmpty()) {
+ mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default
+ } else {
+ mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+ ? ApplicationConfig.ApplicationMode.BATCH
+ : ApplicationConfig.ApplicationMode.STREAM;
+ }
+
+ generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
+
// adding app.class in the configuration, unless it is LegacyTaskApplication
if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());