[GOBBLIN-1496] Initialize arrays and maps related to gobblin as a service with an in… (#3339)

* optimize some maps and arrays that can be initialized with a fixed size

* use newHashmapWithExpectedSize

* fix checkstyle

* fix bug

* undo accidental change getTimeUnit
diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/utils/HttpUtils.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/utils/HttpUtils.java
index 9eab50d..0aba843 100644
--- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/utils/HttpUtils.java
+++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/utils/HttpUtils.java
@@ -143,7 +143,7 @@
    */
   public static void updateStatusType(ResponseStatus status, int statusCode, Set<String> errorCodeWhitelist) {
     if (statusCode >= 300 & statusCode < 500) {
-      List<String> whitelist = new ArrayList<>();
+      List<String> whitelist = new ArrayList<>(2);
       whitelist.add(Integer.toString(statusCode));
       if (statusCode > 400) {
         whitelist.add(HttpConstants.CODE_4XX);
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 68b04bd..ed9972c 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -17,11 +17,11 @@
 
 package org.apache.gobblin.service;
 
+import com.google.common.collect.Maps;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -125,7 +125,7 @@
     matcher.find();
     String allFields = matcher.group("flowStatusIdParams");
     String[] flowStatusIdParams = allFields.split(",");
-    Map<String, String> paramsMap = new HashMap<>();
+    Map<String, String> paramsMap = Maps.newHashMapWithExpectedSize(flowStatusIdParams.length);
     for (String flowStatusIdParam : flowStatusIdParams) {
       paramsMap.put(flowStatusIdParam.split(":")[0], flowStatusIdParam.split(":")[1]);
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 9dad5a1..d5ab116 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -318,7 +318,7 @@
      */
     public Builder withResourceTemplates(List<URI> templateURIs) {
       try {
-        List<JobTemplate> templates = new ArrayList<>();
+        List<JobTemplate> templates = new ArrayList<>(templateURIs.size());
         for(URI uri : templateURIs) {
           templates.add(ResourceBasedJobTemplate.forResourcePath(uri.getPath()));
         }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
index efb3c2e..3fa9c76 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/util/SchedulerUtils.java
@@ -76,7 +76,7 @@
     Collection<Config> configs =
         loader.loadPullFilesRecursively(rootPath, sysConfig, true);
 
-    List<Properties> jobConfigs = Lists.newArrayList();
+    List<Properties> jobConfigs = Lists.newArrayListWithCapacity(configs.size());
     for (Config config : configs) {
       try {
         jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
@@ -107,7 +107,7 @@
     Collection<Config> configs =
         loader.loadPullFilesRecursively(commonPropsPath.getParent(), sysConfig, true);
 
-    List<Properties> jobConfigs = Lists.newArrayList();
+    List<Properties> jobConfigs = Lists.newArrayListWithCapacity(configs.size());
     for (Config config : configs) {
       try {
         jobConfigs.add(resolveTemplate(ConfigUtils.configToProperties(config), resolver));
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index fb296f6..6df5b99 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -347,7 +347,7 @@
     //Get the logical names of SpecExecutors where the FlowEdge can be executed.
     List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
     //Load all the SpecExecutor configurations for this FlowEdge from the SpecExecutor Catalog.
-    List<SpecExecutor> specExecutors = new ArrayList<>();
+    List<SpecExecutor> specExecutors = new ArrayList<>(specExecutorNames.size());
     for (String specExecutorName: specExecutorNames) {
       URI specExecutorUri = new URI(specExecutorName);
       specExecutors.add(this.topologySpecMap.get(specExecutorUri).getSpecExecutor());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index 03bbbf9..6ce550d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -17,10 +17,10 @@
 
 package org.apache.gobblin.service.modules.flow;
 
+import com.google.common.collect.Maps;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -60,8 +60,8 @@
 public class FlowGraphPath {
   @Getter
   private List<List<FlowEdgeContext>> paths;
-  private FlowSpec flowSpec;
-  private Long flowExecutionId;
+  private final FlowSpec flowSpec;
+  private final Long flowExecutionId;
 
   public FlowGraphPath(FlowSpec flowSpec, Long flowExecutionId) {
     this.flowSpec = flowSpec;
@@ -142,26 +142,26 @@
    */
    private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext, Config sysConfig)
       throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
-    FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
-    DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
-    DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
-    Config mergedConfig = flowEdgeContext.getMergedConfig();
-    SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
+     FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
+     DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor();
+     DatasetDescriptor outputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
+     Config mergedConfig = flowEdgeContext.getMergedConfig();
+     SpecExecutor specExecutor = flowEdgeContext.getSpecExecutor();
 
-    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
-    Map<String, String> templateToJobNameMap = new HashMap<>();
+     //Get resolved job configs from the flow template
+     List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
 
-    //Get resolved job configs from the flow template
-    List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor);
-    //Iterate over each resolved job config and convert the config to a JobSpec.
-    for (Config resolvedJobConfig : resolvedJobConfigs) {
-      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId, sysConfig);
-      jobExecutionPlans.add(jobExecutionPlan);
-      templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString(
-          ConfigurationKeys.JOB_NAME_KEY));
-    }
-    updateJobDependencies(jobExecutionPlans, templateToJobNameMap);
-    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(resolvedJobConfigs.size());
+     Map<String, String> templateToJobNameMap = Maps.newHashMapWithExpectedSize(resolvedJobConfigs.size());
+     //Iterate over each resolved job config and convert the config to a JobSpec.
+     for (Config resolvedJobConfig : resolvedJobConfigs) {
+       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId, sysConfig);
+       jobExecutionPlans.add(jobExecutionPlan);
+       templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString(
+           ConfigurationKeys.JOB_NAME_KEY));
+     }
+     updateJobDependencies(jobExecutionPlans, templateToJobNameMap);
+     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
   /**
@@ -198,9 +198,10 @@
   private void updateJobDependencies(List<JobExecutionPlan> jobExecutionPlans, Map<String, String> templateToJobNameMap) {
     for (JobExecutionPlan jobExecutionPlan: jobExecutionPlans) {
       JobSpec jobSpec = jobExecutionPlan.getJobSpec();
-      List<String> updatedDependenciesList = new ArrayList<>();
       if (jobSpec.getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)) {
-        for (String dependency : ConfigUtils.getStringList(jobSpec.getConfig(), ConfigurationKeys.JOB_DEPENDENCIES)) {
+        List<String> jobDependencies = ConfigUtils.getStringList(jobSpec.getConfig(), ConfigurationKeys.JOB_DEPENDENCIES);
+        List<String> updatedDependenciesList = new ArrayList<>(jobDependencies.size());
+        for (String dependency : jobDependencies) {
           if (!templateToJobNameMap.containsKey(dependency)) {
             //We should never hit this condition. The logic here is a safety check.
             throw new RuntimeException("TemplateToJobNameMap does not contain dependency " + dependency);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 382d623..fa46efd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -82,7 +82,7 @@
 
     //Base condition 2: Check if we are already at the target. If so, return an empty path.
     if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
-      return new ArrayList<>();
+      return new ArrayList<>(0);
     }
 
     LinkedList<FlowEdgeContext> edgeQueue =
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index ed6f47a..fede631 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -17,10 +17,10 @@
 
 package org.apache.gobblin.service.modules.spec;
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -44,8 +44,9 @@
 
   public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> jobExecutionPlans) {
     //Maintain a mapping between job name and the corresponding JobExecutionPlan.
-    Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap = new HashMap<>();
-    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    Map<String, Dag.DagNode<JobExecutionPlan>> jobExecutionPlanMap =
+        Maps.newHashMapWithExpectedSize(jobExecutionPlans.size());
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(jobExecutionPlans.size());
     /**
      * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link JobSpec} in the flow. Add this node
      * to a HashMap.
@@ -89,7 +90,7 @@
    */
   private static List<String> getDependencies(Config config) {
     return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
-        .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>();
+        .asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new ArrayList<>(0);
   }
 
   /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index c13e032..d8f3bf9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -188,7 +188,7 @@
     Config outputDescriptorConfig = outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
     userConfig = userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
 
-    List<Config> resolvedJobConfigs = new ArrayList<>();
+    List<Config> resolvedJobConfigs = new ArrayList<>(getJobTemplates().size());
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
     for (JobTemplate jobTemplate: getJobTemplates()) {
       ResolvedJobSpec resolvedJobSpec = this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(jobTemplate).build());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 21abc4e..c91fd83 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -72,8 +72,8 @@
     Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
     String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
     try {
-      List<JobStatus> jobStatuses = new ArrayList<>();
       List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
+      List<JobStatus> jobStatuses = new ArrayList<>(tableNames.size());
       for (String tableName: tableNames) {
         List<State> jobStates = this.stateStore.getAll(storeName, tableName);
         if (jobStates.isEmpty()) {
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
index 0af6614..7508025 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
@@ -23,7 +23,6 @@
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -97,7 +96,7 @@
    * @return a {@link Properties} instance
    */
   public static Properties configToProperties(Config config) {
-    return configToProperties(config, Optional.<String>absent());
+    return configToProperties(config, Optional.absent());
   }
 
   /**
@@ -181,7 +180,7 @@
    * @return a {@link Config} instance
    */
   public static Config propertiesToConfig(Properties properties) {
-    return propertiesToConfig(properties, Optional.<String>absent());
+    return propertiesToConfig(properties, Optional.absent());
   }
 
   /**
@@ -229,7 +228,7 @@
    * @return a {@link Config} instance
    */
   public static Config propertiesToConfig(Properties properties, Optional<String> prefix) {
-    Set<String> blacklistedKeys = new HashSet<>();
+    Set<String> blacklistedKeys = new HashSet<>(0);
     if (properties.containsKey(GOBBLIN_CONFIG_BLACKLIST_KEYS)) {
       blacklistedKeys = new HashSet<>(Splitter.on(',').omitEmptyStrings().trimResults()
           .splitToList(properties.getProperty(GOBBLIN_CONFIG_BLACKLIST_KEYS)));
@@ -303,7 +302,7 @@
    * values Strings. This implementation will try to recognize booleans and numbers. All keys are
    * treated as strings.*/
   private static Map<String, Object> guessPropertiesTypes(Map<Object, Object> srcProperties) {
-    Map<String, Object> res = new HashMap<>();
+    Map<String, Object> res = Maps.newHashMapWithExpectedSize(srcProperties.size());
     for (Map.Entry<Object, Object> prop : srcProperties.entrySet()) {
       Object value = prop.getValue();
       if (null != value && value instanceof String && !Strings.isNullOrEmpty(value.toString())) {
@@ -478,7 +477,7 @@
       return Collections.emptyList();
     }
 
-    List<String> valueList = Lists.newArrayList();
+    List<String> valueList;
     try {
       valueList = config.getStringList(path);
     } catch (ConfigException.WrongType e) {
@@ -494,7 +493,7 @@
        * b
        * 10,12
        */
-      try (CSVReader csvr = new CSVReader(new StringReader(config.getString(path)));) {
+      try (CSVReader csvr = new CSVReader(new StringReader(config.getString(path)))) {
         valueList = Lists.newArrayList(csvr.readNext());
       } catch (IOException ioe) {
         throw new RuntimeException(ioe);
@@ -561,7 +560,7 @@
     Config encryptedConfig = config.getConfig(encConfigPath.get());
 
     PasswordManager passwordManager = PasswordManager.getInstance(configToProperties(config));
-    Map<String, String> tmpMap = Maps.newHashMap();
+    Map<String, String> tmpMap = Maps.newHashMapWithExpectedSize(encryptedConfig.entrySet().size());
     for (Map.Entry<String, ConfigValue> entry : encryptedConfig.entrySet()) {
       String val = entry.getValue().unwrapped().toString();
       val = passwordManager.readPassword(val);