[GOBBLIN-1497] Reduce the number of calls on FlowSpec initialization where possible,… (#3340)
* Reduce the number of calls on FlowSpec initialization where possible, and configToProperties/vice versa
* address review
* Address comments
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
index 3fa9c76..1dd1b3b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
@@ -171,15 +171,18 @@
}
private static Properties resolveTemplate(Properties jobProps, JobSpecResolver resolver) throws IOException {
+ // If there is no job template, do not spend resources creating a new JobSpec
+ if (!jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
+ return jobProps;
+ }
+
try {
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(ConfigUtils.propertiesToConfig(jobProps));
- if (jobProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
JobTemplate jobTemplate = ResourceBasedJobTemplate
.forResourcePath(jobProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH),
new PackagedTemplatesJobCatalogDecorator());
jobSpecBuilder.withTemplate(jobTemplate);
- }
- return ConfigUtils.configToProperties(resolver.resolveJobSpec(jobSpecBuilder.build()).getConfig());
+ return resolver.resolveJobSpec(jobSpecBuilder.build()).getConfigAsProperties();
} catch (JobTemplate.TemplateException | SpecNotFoundException | URISyntaxException exc) {
throw new IOException(exc);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
index a59fc85..3af1cf2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigUtils.java
@@ -22,12 +22,10 @@
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
-import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -60,7 +58,8 @@
}
public static String serializeFlowConfig(FlowConfig flowConfig) throws IOException {
- Properties properties = ConfigUtils.configToProperties(ConfigFactory.parseMap(flowConfig.getProperties()));
+ Properties properties = new Properties();
+ properties.putAll(flowConfig.getProperties());
properties.setProperty(FLOWCONFIG_ID_NAME, flowConfig.getId().getFlowName());
properties.setProperty(FLOWCONFIG_ID_GROUP, flowConfig.getId().getFlowGroup());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 1a257f2..f83bf7d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -202,8 +202,9 @@
while (specUris.hasNext()) {
Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
- if (spec instanceof FlowSpec) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
+ (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
onAddSpec(modifiedSpec);
} else {