Create store directory paths in CSM constructor for disk space monitor (#1697)
* Create store directory paths in CSM constructor to be able to monitor the disk usage of the store directories
* Fix stylecheck issues
* Refactor - init all store paths together and do not mutate the storeDirPaths. Added test
* Remove ununsed method
* Remove ununsed method
* Stylecheck, Remove ununsed import
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index f46a823..a2b3b51 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -23,7 +23,6 @@
import java.io.File;
import java.nio.file.Path;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -178,12 +177,16 @@
loggedStoreBaseDirectory);
}
- this.storeDirectoryPaths = new HashSet<>();
+ // Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
+ // of the store directories. The stores itself does not need to be created but the store directory paths need to be
+ // set to be able to monitor them, once they're created and in use.
+ this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
+ activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
+ loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
- activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
- storeDirectoryPaths, containerModel, jobContext, containerContext,
- taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
+ activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
+ containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
@@ -226,8 +229,7 @@
this.sideInputsManager = new SideInputsManager(
sideInputSystemStreams, systemFactories,
changelogSystemStreams, activeTaskChangelogSystemStreams,
- storageEngineFactories, storeDirectoryPaths,
- containerModel, jobContext, containerContext,
+ storageEngineFactories, containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
@@ -339,8 +341,7 @@
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
- this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
- this.containerModel, this.jobContext, this.containerContext,
+ this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index a221552..d6aa7bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -70,7 +70,6 @@
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
- Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<String, Serde<Object>> serdes,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
@@ -79,7 +78,6 @@
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config) {
Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
- StorageConfig storageConfig = new StorageConfig(config);
// iterate over each task and each storeName
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
@@ -90,16 +88,8 @@
}
for (String storeName : storesToCreate) {
- List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
- // A store is considered durable if it is backed by a changelog or another backupManager factory
- boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
- boolean isSideInput = sideInputStoreNames.contains(storeName);
- // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
- // for non logged stores
- File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
- File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
- taskModel.getTaskMode());
- storeDirectoryPaths.add(storeDirectory.toPath());
+ File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
+ sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
@@ -164,7 +154,6 @@
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
- Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
@@ -175,8 +164,7 @@
Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
return ContainerStorageManagerUtil.createTaskStores(
inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
- activeTaskChangelogSystemStreams, storeDirectoryPaths,
- containerModel, jobContext, containerContext, serdes,
+ activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
}
@@ -412,4 +400,42 @@
}
return sideInputStores;
}
+
+ public static Set<Path> getStoreDirPaths(Config config, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+ Map<String, SystemStream> activeTaskChangelogSystemStreams, Set<String> sideInputStoreNames,
+ ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory,
+ File nonLoggedStoreBaseDirectory) {
+ Set<Path> storeDirectoryPaths = new HashSet<>();
+ StorageConfig storageConfig = new StorageConfig(config);
+ Set<String> storeNames = new HashSet<>();
+ // Add all side input and regular stores
+ storeNames.addAll(storageConfig.getStoreNames());
+ // Add all in-memory store names
+ storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config));
+
+ for (String storeName : storeNames) {
+ for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
+ File storeDirPath =
+ getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(),
+ task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
+ storeDirectoryPaths.add(storeDirPath.toPath());
+ }
+ }
+ return storeDirectoryPaths;
+ }
+ public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
+ Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
+ File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
+ StorageConfig storageConfig = new StorageConfig(config);
+ List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
+ // A store is considered durable if it is backed by a changelog or another backupManager factory
+ boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
+ boolean isSideInput = sideInputStoreNames.contains(storeName);
+ // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
+ // for non logged stores
+ File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
+ File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
+ taskModel.getTaskMode());
+ return storeDirectory;
+ }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 128d7db..f0665e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -26,7 +26,6 @@
import scala.collection.JavaConversions;
import java.io.File;
-import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -119,7 +118,6 @@
Map<String, SystemStream> changelogSystemStreams,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
- Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
@@ -147,8 +145,7 @@
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
- activeTaskChangelogSystemStreams, storeDirectoryPaths,
- containerModel, jobContext, containerContext, serdes,
+ activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index fc7d21e..b8996d4 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -732,6 +733,39 @@
}
@Test
+ public void testStoreDirectoriesInitialized() {
+ String sideInputStore = "sideInputStore";
+ String inMemoryStore = "inMemoryStore";
+ String regularStore = "regularStore";
+ Map<String, String> storeFactories = new HashMap<>();
+ storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory");
+ storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory");
+ storeFactories.put(String.format("stores.%s.factory", inMemoryStore),
+ "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
+ Map<String, String> configMap = new HashMap<>(storeFactories);
+ Config config = new MapConfig(configMap);
+ Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
+ storageEngineFactories.put(sideInputStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+ storageEngineFactories.put(inMemoryStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+ storageEngineFactories.put(regularStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
+
+ Map<String, SystemStream> activeTaskChangelogSystemStreams = new HashMap<>();
+ activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog"));
+ Set<String> sideInputStoreNames = new HashSet<>();
+ sideInputStoreNames.add(sideInputStore);
+ ContainerModel containerModel = mock(ContainerModel.class);
+ when(containerModel.getTasks())
+ .thenReturn(ImmutableMap.of(new TaskName("task"),
+ new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1))));
+
+ Set<Path> storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
+ activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(),
+ new File("/tmp"), new File("/tmp2"));
+
+ assertEquals(3, storeDirPaths.size());
+ }
+
+ @Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(testContext.storesToSystemStreams, testContext.standbyContainerModel