[STATESTORE] Cleanup local storage for statestore storage container

### Motivation

While restoring a storage container, we fetch the checkpoint from the
checkpoint store. Currently this checkpoint will never get cleaned up. Every
time we restore the storage container on pod, a new checkpoint will get added.
Over period of time the disk usage keeps going up and eventually we have to
manually delete these stale checkpoints.

### Changes

With this change, we will cleanup the local storage for a storage container
whenever we close the KVStore. This will ensure that stale checkpoints are not
left behind. It is possible that POD may restart before the cleanup can be
done. To avoid these, we will also ensure that local storage for the storage
container is cleaned up before we restore the storage container.


Reviewers: Henry Saputra <hsaputra@apache.org>

This closes #2739 from sursingh/storage-cleanup and squashes the following commits:

6bd477691 [Surinder Singh] Clean local storage for storage containers
c74683695 [Surinder Singh] Add test case for local storage cleanup
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
index 8fd5d5d..0ec0d7d 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/api/StateStoreSpec.java
@@ -55,4 +55,7 @@
     private boolean checkpointChecksumEnable = true;
     @Default
     private boolean checkpointChecksumCompatible = true;
+
+    @Default
+    private boolean localStorageCleanupEnable = false;
 }
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
index 7f95753..56aaae5 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
@@ -33,6 +33,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
 import com.google.common.primitives.SignedBytes;
 import java.io.File;
 import java.io.IOException;
@@ -132,6 +134,8 @@
         RocksDB.loadLibrary();
     }
 
+    private boolean cleanupLocalStoreDirEnable;
+
     public RocksdbKVStore() {
         // initialize the iterators set
         this.kvIters = Collections.synchronizedSet(Sets.newHashSet());
@@ -253,13 +257,15 @@
         checkNotNull(spec.getLocalStateStoreDir(),
             "local state store directory is not configured");
 
-
         this.name = spec.getName();
+        this.cleanupLocalStoreDirEnable = spec.isLocalStorageCleanupEnable();
 
         // initialize the coders
         this.keyCoder = (Coder<K>) spec.getKeyCoder();
         this.valCoder = (Coder<V>) spec.getValCoder();
 
+        cleanupLocalStoreDir(spec.getLocalStateStoreDir());
+
         checkpointStore = spec.getCheckpointStore();
         if (null != checkpointStore) {
             // load checkpoint from checkpoint store
@@ -410,6 +416,20 @@
         RocksUtils.close(writeOpts);
         RocksUtils.close(flushOpts);
         RocksUtils.close(cfOpts);
+
+        cleanupLocalStoreDir(dbDir);
+    }
+
+    private void cleanupLocalStoreDir(File dbDir) {
+        if (cleanupLocalStoreDirEnable) {
+            if (dbDir.exists()) {
+                try {
+                    MoreFiles.deleteRecursively(dbDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
+                } catch (IOException e) {
+                    log.error("Failed to cleanup localStoreDir", e);
+                }
+            }
+        }
     }
 
     protected void closeLocalDB() {
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVStoreCheckpoint.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVStoreCheckpoint.java
index 61c86fa..db6dc2b 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVStoreCheckpoint.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestRocksdbKVStoreCheckpoint.java
@@ -21,6 +21,8 @@
 package org.apache.bookkeeper.statelib.impl.kv;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.io.File;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo;
 import org.junit.After;
@@ -94,4 +96,47 @@
         // We should fallback to checkpoint-1
         assertEquals("transaction-1", store.get("transaction-id"));
     }
+
+    @Test
+    public void testLocalStoreCleanup() throws Exception {
+        File checkpointDir = new File(store.getLocalDir(), "checkpoints");
+
+        store.setRemoveLocal(true);
+        store.setRemoveRemote(true);
+        store.setLocalStorageCleanup(true);
+
+        String[] checkpoints = checkpointDir.list();
+        // Initially there is only one checkpoint directory that is used by the statestore
+        assertEquals(1, checkpoints.length);
+
+        store.restore();
+
+        checkpoints = checkpointDir.list();
+        // We should only have one checkpoint in the local directory.
+        assertEquals(1, checkpoints.length);
+
+        int numKvs = 100;
+        for (int i = 0; i < 3; i++) {
+            String txid = "txid-" + i;
+            store.addNumKVs(txid, numKvs, i * numKvs);
+            String checkpoint1 = store.checkpoint("checkpoint-1");
+
+            checkpoints = checkpointDir.list();
+            // Ensure the checkpoints are cleaned up
+            assertEquals(1, checkpoints.length);
+
+            store.restore();
+            assertEquals(txid, store.get("transaction-id"));
+
+            checkpoints = checkpointDir.list();
+            // We should only have one checkpoint in the local directory.
+            assertEquals(1, checkpoints.length);
+        }
+
+        store.close();
+
+        checkpoints = checkpointDir.list();
+        // We should not have any checkpoints af the store is closed.
+        assertNull(checkpoints);
+    }
 }
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
index eeae939..20e7927 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/kv/TestStateStore.java
@@ -53,8 +53,8 @@
 public class TestStateStore {
 
     private final String dbName;
-    private final boolean removeLocal;
-    private final boolean removeRemote;
+    private boolean removeLocal;
+    private boolean removeRemote;
 
     private File localDir;
     private File localCheckpointsDir;
@@ -69,6 +69,7 @@
     private boolean checkpointChecksumEnable;
     private boolean checkpointChecksumCompatible;
     private boolean enableNonChecksumCompatibility;
+    private boolean localStorageCleanup;
 
     public TestStateStore(String dbName,
                           File localDir,
@@ -82,6 +83,7 @@
         this.removeRemote = removeRemote;
         this.checkpointChecksumEnable = true;
         this.checkpointChecksumCompatible = true;
+        this.localStorageCleanup = false;
         localCheckpointsDir = new File(localDir, "checkpoints");
         remoteCheckpointsPath = Paths.get(remoteDir.getAbsolutePath(), dbName);
         enableNonChecksumCompatibility = false;
@@ -121,6 +123,18 @@
         }
     }
 
+    public void setRemoveLocal(boolean enable) {
+        removeLocal = enable;
+    }
+
+    public void setRemoveRemote(boolean enable) {
+        removeRemote = enable;
+    }
+
+    public void setLocalStorageCleanup(boolean enable) {
+        localStorageCleanup = enable;
+    }
+
     public void init() throws StateStoreException {
         checkpointStore = new FSCheckpointManager(remoteDir);
         StateStoreSpec.StateStoreSpecBuilder builder = StateStoreSpec.builder()
@@ -130,6 +144,7 @@
             .localStateStoreDir(localDir)
             .checkpointChecksumEnable(checkpointChecksumEnable)
             .checkpointChecksumCompatible(checkpointChecksumCompatible)
+            .localStorageCleanupEnable(localStorageCleanup)
             .stream(dbName);
         if (checkpointExecutor != null) {
             builder = builder.checkpointStore(checkpointStore)
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index fd036d6..289081b 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -35,6 +35,8 @@
 
     private static final String CHECKPOINT_CHECKSUM_COMPATIBLE = "checkpoint.checksum.compatible";
 
+    private static final String LOCAL_STORAGE_CLEANUP_ENABLE = "local.storage.cleanup.enable";
+
     public StorageConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
@@ -101,4 +103,7 @@
         return getBoolean(CHECKPOINT_CHECKSUM_COMPATIBLE, true);
     }
 
+    public boolean getLocalStorageCleanupEnable() {
+        return getBoolean(LOCAL_STORAGE_CLEANUP_ENABLE, true);
+    }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 9b3a3c2..527fb93 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -211,6 +211,7 @@
             .isReadonly(serveReadOnlyTable)
             .checkpointChecksumEnable(storageConf.getCheckpointChecksumEnable())
             .checkpointChecksumCompatible(storageConf.getCheckpointChecksumCompatible())
+            .localStorageCleanupEnable(storageConf.getLocalStorageCleanupEnable())
             .build();