SAMZA-2377: Batch mode should be computed based on generated configs (#1215)

diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f300804..f8e0684 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -31,6 +31,7 @@
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -85,6 +86,10 @@
       generatedConfig.putAll(getGeneratedConfig(runId));
     }
 
+    if (ApplicationConfig.ApplicationMode.BATCH.name().equals(generatedConfig.get(ApplicationConfig.APP_MODE))) {
+      allowedUserConfig.remove(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED);
+    }
+
     // merge user-provided configuration with generated configuration. generated configuration has lower priority.
     Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
@@ -126,18 +131,24 @@
       generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
-    StreamConfig streamConfig = new StreamConfig(userConfig);
-    Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
-    inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
-    ApplicationConfig.ApplicationMode mode =
-        inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
-            ? ApplicationConfig.ApplicationMode.BATCH
-            : ApplicationConfig.ApplicationMode.STREAM;
-    generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
-
     Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
     generatedConfig.putAll(systemStreamConfigs);
 
+    StreamConfig streamConfig = new StreamConfig(new MapConfig(generatedConfig));
+    Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
+    inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
+
+    final ApplicationConfig.ApplicationMode mode;
+    if (inputStreamIds.isEmpty()) {
+      mode = ApplicationConfig.ApplicationMode.STREAM; // use stream by default
+    } else {
+      mode = inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+          ? ApplicationConfig.ApplicationMode.BATCH
+          : ApplicationConfig.ApplicationMode.STREAM;
+    }
+
+    generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
+
     // adding app.class in the configuration, unless it is LegacyTaskApplication
     if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
       generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());