Create store directory paths in CSM constructor for disk space monitor (#1697)

* Create store directory paths in CSM constructor to be able to monitor the disk usage of the store directories

* Fix stylecheck issues

* Refactor - init all store paths together and do not mutate the storeDirPaths. Added test

* Remove ununsed method

* Remove ununsed method

* Stylecheck, Remove ununsed import
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index f46a823..a2b3b51 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.nio.file.Path;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -178,12 +177,16 @@
           loggedStoreBaseDirectory);
     }
 
-    this.storeDirectoryPaths = new HashSet<>();
+    // Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
+    // of the store directories. The stores itself does not need to be created but the store directory paths need to be
+    // set to be able to monitor them, once they're created and in use.
+    this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
+        activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
 
     this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
-        activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
-        storeDirectoryPaths, containerModel, jobContext, containerContext,
-        taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
+        activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
+        containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
 
     // Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
@@ -226,8 +229,7 @@
     this.sideInputsManager = new SideInputsManager(
         sideInputSystemStreams, systemFactories,
         changelogSystemStreams, activeTaskChangelogSystemStreams,
-        storageEngineFactories, storeDirectoryPaths,
-        containerModel, jobContext, containerContext,
+        storageEngineFactories, containerModel, jobContext, containerContext,
         samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
         streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
@@ -339,8 +341,7 @@
         .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
     this.taskStores = ContainerStorageManagerUtil.createTaskStores(
         storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
-        this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
-        this.containerModel, this.jobContext, this.containerContext,
+        this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
         this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
         this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
 
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index a221552..d6aa7bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -70,7 +70,6 @@
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Set<String> sideInputStoreNames,
       Map<String, SystemStream> activeTaskChangelogSystemStreams,
-      Set<Path> storeDirectoryPaths,
       ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
       Map<String, Serde<Object>> serdes,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
@@ -79,7 +78,6 @@
       File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
       Config config) {
     Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
-    StorageConfig storageConfig = new StorageConfig(config);
 
     // iterate over each task and each storeName
     for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
@@ -90,16 +88,8 @@
       }
 
       for (String storeName : storesToCreate) {
-        List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
-        // A store is considered durable if it is backed by a changelog or another backupManager factory
-        boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
-        boolean isSideInput = sideInputStoreNames.contains(storeName);
-        // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
-        // for non logged stores
-        File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
-        File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
-            taskModel.getTaskMode());
-        storeDirectoryPaths.add(storeDirectory.toPath());
+        File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
+            sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
 
         // if taskInstanceMetrics are specified use those for store metrics,
         // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
@@ -164,7 +154,6 @@
       Map<String, SystemStream> activeTaskChangelogSystemStreams,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Set<String> sideInputStoreNames,
-      Set<Path> storeDirectoryPaths,
       ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
@@ -175,8 +164,7 @@
     Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
     return ContainerStorageManagerUtil.createTaskStores(
         inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
-        activeTaskChangelogSystemStreams, storeDirectoryPaths,
-        containerModel, jobContext, containerContext, serdes,
+        activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
         taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
   }
@@ -412,4 +400,42 @@
     }
     return sideInputStores;
   }
+
+  public static Set<Path> getStoreDirPaths(Config config, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Map<String, SystemStream> activeTaskChangelogSystemStreams, Set<String> sideInputStoreNames,
+      ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory) {
+    Set<Path> storeDirectoryPaths = new HashSet<>();
+    StorageConfig storageConfig = new StorageConfig(config);
+    Set<String> storeNames = new HashSet<>();
+    // Add all side input and regular stores
+    storeNames.addAll(storageConfig.getStoreNames());
+    // Add all in-memory store names
+    storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config));
+
+    for (String storeName : storeNames) {
+      for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
+        File storeDirPath =
+            getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(),
+                task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
+        storeDirectoryPaths.add(storeDirPath.toPath());
+      }
+    }
+    return storeDirectoryPaths;
+  }
+  public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
+      Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
+    StorageConfig storageConfig = new StorageConfig(config);
+    List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
+    // A store is considered durable if it is backed by a changelog or another backupManager factory
+    boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
+    boolean isSideInput = sideInputStoreNames.contains(storeName);
+    // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
+    // for non logged stores
+    File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
+    File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
+        taskModel.getTaskMode());
+    return storeDirectory;
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 128d7db..f0665e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -26,7 +26,6 @@
 import scala.collection.JavaConversions;
 
 import java.io.File;
-import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -119,7 +118,6 @@
       Map<String, SystemStream> changelogSystemStreams,
       Map<String, SystemStream> activeTaskChangelogSystemStreams,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Set<Path> storeDirectoryPaths,
       ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
       SamzaContainerMetrics samzaContainerMetrics,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
@@ -147,8 +145,7 @@
     // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
     this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
         sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
-        activeTaskChangelogSystemStreams, storeDirectoryPaths,
-        containerModel, jobContext, containerContext, serdes,
+        activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
         taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
 
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index fc7d21e..b8996d4 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableSet;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -732,6 +733,39 @@
   }
 
   @Test
+  public void testStoreDirectoriesInitialized() {
+    String sideInputStore = "sideInputStore";
+    String inMemoryStore = "inMemoryStore";
+    String regularStore = "regularStore";
+    Map<String, String> storeFactories = new HashMap<>();
+    storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory");
+    storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory");
+    storeFactories.put(String.format("stores.%s.factory", inMemoryStore),
+        "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
+    Map<String, String> configMap = new HashMap<>(storeFactories);
+    Config config = new MapConfig(configMap);
+    Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
+    storageEngineFactories.put(sideInputStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+    storageEngineFactories.put(inMemoryStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+    storageEngineFactories.put(regularStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+
+    Map<String, SystemStream> activeTaskChangelogSystemStreams = new HashMap<>();
+    activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog"));
+    Set<String> sideInputStoreNames = new HashSet<>();
+    sideInputStoreNames.add(sideInputStore);
+    ContainerModel containerModel = mock(ContainerModel.class);
+    when(containerModel.getTasks())
+        .thenReturn(ImmutableMap.of(new TaskName("task"),
+            new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1))));
+
+    Set<Path> storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
+        activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(),
+        new File("/tmp"), new File("/tmp2"));
+
+    assertEquals(3, storeDirPaths.size());
+  }
+
+  @Test
   public void getActiveTaskChangelogSystemStreams() {
     Map<String, SystemStream> storeToChangelogSystemStreams =
         ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(testContext.storesToSystemStreams, testContext.standbyContainerModel