CASSANDRASC-56: Create staging directory if it doesn't exists

During SSTable upload, the upload will fail if the configured staging directory does not
exist. When this occurs an operator must manually create the directory, which increases
the configuration toil.

In this commit, we automatically create the staging directory if it doesn't exists during
SSTable upload. This improves the overall operational experience when running the Sidecar.

patch by Francisco Guerrero; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRASC-56
diff --git a/CHANGES.txt b/CHANGES.txt
index 86f8bf0..7131a8b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Create staging directory if it doesn't exists (CASSANDRASC-56)
  * Remove RESTEasy (CASSANDRASC-57)
  * Use in-jvm dtest framework for integration tests (CASSANDRASC-51)
  * Sidecar returns own version in node settings (CASSANDRASC-52)
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
index 849fd3d..bffb9a5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java
@@ -66,7 +66,7 @@
                                SocketAddress remoteAddress,
                                String uploadId)
     {
-        uploadPathBuilder.resolveStagingDirectory(host, uploadId)
+        uploadPathBuilder.resolveUploadIdDirectory(host, uploadId)
                          .compose(uploadPathBuilder::isValidDirectory)
                          .compose(stagingDirectory -> context.vertx()
                                                              .fileSystem()
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
index c5d5695..75f8049 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandler.java
@@ -109,7 +109,7 @@
                              }
                              else if (importResult.failed())
                              {
-                                 context.fail(importResult.cause());
+                                 processFailure(importResult.cause(), context, host, remoteAddress, request);
                              }
                              else
                              {
@@ -121,29 +121,33 @@
                                               request, remoteAddress, host);
                              }
                          })
-                         .onFailure(cause -> {
-                             if (cause instanceof NoSuchFileException)
-                             {
-                                 logger.error("Upload directory not found for request={}, remoteAddress={}, " +
-                                              "instance={}", request, remoteAddress, host, cause);
-                                 context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage()));
-                             }
-                             else if (cause instanceof IllegalArgumentException)
-                             {
-                                 context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage(),
-                                                                cause));
-                             }
-                             else if (cause instanceof HttpException)
-                             {
-                                 context.fail(cause);
-                             }
-                             else
-                             {
-                                 logger.error("Unexpected error during import SSTables for request={}, " +
-                                              "remoteAddress={}, instance={}", request, remoteAddress, host, cause);
-                                 context.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
-                             }
-                         });
+                         .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request));
+    }
+
+    @Override
+    protected void processFailure(Throwable cause,
+                                  RoutingContext context,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  SSTableImportRequest request)
+    {
+        if (cause instanceof NoSuchFileException)
+        {
+            logger.error("Upload directory not found for request={}, remoteAddress={}, " +
+                         "instance={}", request, remoteAddress, host, cause);
+            context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage()));
+        }
+        else if (cause instanceof IllegalArgumentException)
+        {
+            context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage(),
+                                           cause));
+        }
+        else if (cause instanceof HttpException)
+        {
+            context.fail(cause);
+        }
+
+        super.processFailure(cause, context, host, remoteAddress, request);
     }
 
     @Override
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
index 7952d5d..99babe1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java
@@ -32,7 +32,6 @@
 import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.Configuration;
-import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.data.SSTableUploadResponse;
 import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
@@ -103,8 +102,6 @@
         // accept the upload.
         httpRequest.pause();
 
-        InstanceMetadata instanceMetadata = metadataFetcher.instance(host);
-
         long startTimeInNanos = System.nanoTime();
         if (!limiter.tryAcquire())
         {
@@ -116,7 +113,8 @@
         context.addEndHandler(v -> limiter.releasePermit());
 
         validateKeyspaceAndTable(host, request)
-        .compose(validRequest -> ensureSufficientSpaceAvailable(instanceMetadata))
+        .compose(validRequest -> uploadPathBuilder.resolveStagingDirectory(host))
+        .compose(this::ensureSufficientSpaceAvailable)
         .compose(v -> uploadPathBuilder.build(host, request))
         .compose(uploadDirectory -> uploader.uploadComponent(httpRequest, uploadDirectory, request.component(),
                                                              request.expectedChecksum()))
@@ -187,17 +185,17 @@
      * Ensures there is sufficient space available as per configured in the
      * {@link Configuration#getMinSpacePercentRequiredForUpload()}.
      *
-     * @param instanceMetadata instance meta data
+     * @param uploadDirectory the directory where the SSTables are uploaded
      * @return a succeeded future if there is sufficient space available, or failed future otherwise
      */
-    private Future<Void> ensureSufficientSpaceAvailable(InstanceMetadata instanceMetadata)
+    private Future<String> ensureSufficientSpaceAvailable(String uploadDirectory)
     {
         float minimumPercentageRequired = configuration.getMinSpacePercentRequiredForUpload();
         if (minimumPercentageRequired == 0)
         {
-            return Future.succeededFuture();
+            return Future.succeededFuture(uploadDirectory);
         }
-        return fs.fsProps(instanceMetadata.stagingDir())
+        return fs.fsProps(uploadDirectory)
                  .compose(fsProps -> {
                      // calculate available disk space percentage
                      long totalSpace = fsProps.totalSpace();
@@ -213,12 +211,12 @@
                      if (availableDiskSpacePercentage < minimumPercentageRequired)
                      {
                          logger.warn("Insufficient space available for upload in stagingDir={}, available={}%, " +
-                                     "required={}%", instanceMetadata.stagingDir(),
+                                     "required={}%", uploadDirectory,
                                      availableDiskSpacePercentage, minimumPercentageRequired);
                          return Future.failedFuture(wrapHttpException(HttpResponseStatus.INSUFFICIENT_STORAGE,
                                                                       "Insufficient space available for upload"));
                      }
-                     return Future.succeededFuture();
+                     return Future.succeededFuture(uploadDirectory);
                  });
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
index f7f7c56..fc0b532 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java
@@ -97,6 +97,17 @@
     }
 
     /**
+     * Creates the directory if it doesn't exist, and then validates that {@code path} is a valid directory.
+     *
+     * @param path the path to the directory
+     * @return a future of the validated {@code path}, a failed future otherwise
+     */
+    public Future<String> ensureDirectoryExists(String path)
+    {
+        return fs.mkdirs(path).compose(v -> Future.succeededFuture(path));
+    }
+
+    /**
      * @param filename  the path
      * @param predicate a predicate that evaluates based on {@link FileProps}
      * @return a future of the {@code filename} if it exists and {@code predicate} evaluates to true,
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index af5f5dc..b6f502c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -252,7 +252,7 @@
      */
     private void cleanup(ImportOptions options)
     {
-        uploadPathBuilder.resolveStagingDirectory(options.host, options.uploadId)
+        uploadPathBuilder.resolveUploadIdDirectory(options.host, options.uploadId)
                          .compose(uploadPathBuilder::isValidDirectory)
                          .compose(stagingDirectory -> vertx.fileSystem()
                                                            .deleteRecursive(stagingDirectory, true))
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
index cbf0bc3..9a26a54 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploadsPathBuilder.java
@@ -74,25 +74,36 @@
     public <T extends SSTableUploads> Future<String> build(String host, T request)
     {
         return validate(request)
-               .compose(validRequest -> resolveStagingDirectory(host, request.uploadId()))
+               .compose(validRequest -> resolveUploadIdDirectory(host, request.uploadId()))
                .compose(stagingDirectory ->
                         resolveUploadDirectory(stagingDirectory, request.keyspace(), request.tableName()));
     }
 
     /**
+     * Builds the path to the configured staging directory for the given {@code host}. Attempt to create the
+     * staging directory if it doesn't exist.
+     *
+     * @param host the name of the host
+     * @return a future to the created and validated staging directory
+     */
+    public Future<String> resolveStagingDirectory(String host)
+    {
+        InstanceMetadata instanceMeta = instancesConfig.instanceFromHost(host);
+        return ensureDirectoryExists(StringUtils.removeEnd(instanceMeta.stagingDir(), File.separator));
+    }
+
+    /**
      * Builds the path to the {@code uploadId} staging directory inside the specified {@code host}.
      *
      * @param host     the name of the host
      * @param uploadId an identifier for the upload ID
      * @return the absolute path of the {@code uploadId} staging directory
      */
-    public Future<String> resolveStagingDirectory(String host, String uploadId)
+    public Future<String> resolveUploadIdDirectory(String host, String uploadId)
     {
         return validateUploadId(uploadId)
-               .compose(validUploadId -> {
-                   InstanceMetadata instanceMeta = instancesConfig.instanceFromHost(host);
-                   return isValidDirectory(StringUtils.removeEnd(instanceMeta.stagingDir(), File.separator));
-               })
+               .compose(validUploadId -> resolveStagingDirectory(host))
+               .compose(this::isValidDirectory)
                .compose(directory -> Future.succeededFuture(directory + File.separatorChar + uploadId));
     }
 
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
index 5b0a51c..e73fa53 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerTest.java
@@ -103,12 +103,17 @@
     }
 
     @Test
-    void testNonExistentUploadDirectory(VertxTestContext context)
+    void testNonExistentUploadDirectory(VertxTestContext context) throws InterruptedException
     {
         UUID uploadId = UUID.randomUUID();
-        client.put(config.getPort(), "localhost", "/api/v1/uploads/" + uploadId + "/keyspaces/ks/tables/table/import")
-              .expect(ResponsePredicate.SC_NOT_FOUND)
-              .send(context.succeedingThenComplete());
+
+        TableOperations mockCFOperations = mock(TableOperations.class);
+        when(mockDelegate.tableOperations()).thenReturn(mockCFOperations);
+
+        String requestURI = "/api/v1/uploads/" + uploadId + "/keyspaces/ks/tables/table/import";
+        clientRequest(context, requestURI,
+                      response -> assertThat(response.statusCode())
+                                  .isEqualTo(HttpResponseStatus.NOT_FOUND.code()));
     }
 
     @Test
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
index 6888f3d..e29f055 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandlerTest.java
@@ -57,7 +57,6 @@
     void testUploadWithoutMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
     }
@@ -66,7 +65,6 @@
     void testUploadWithCorrectMd5_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-correct-md5.db", "jXd/OF09/siBXSD3SWAm3A==",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.OK.code(), false);
     }
@@ -75,7 +73,6 @@
     void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-incorrect-md5.db", "incorrectMd5",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
@@ -85,7 +82,6 @@
     void testInvalidFileName_expectErrorCode(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "ks$tbl-me-4-big-Data.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
@@ -95,7 +91,6 @@
     void testUploadWithoutContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-content-length.db",
                                    "jXd/OF09/siBXSD3SWAm3A==", 0, HttpResponseStatus.OK.code(), false);
     }
@@ -106,7 +101,6 @@
         // if we send more than actual length, vertx goes hung, probably looking for more data than exists in the file,
         // we should see timeout error in this case
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-higher-content-length.db", "", 1000, -1, true);
     }
 
@@ -114,7 +108,6 @@
     void testUploadWithLesserContentLength_expectSuccessfulUpload(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "with-lesser-content-length.db",
                                    "", Files.size(Paths.get(FILE_TO_BE_UPLOADED)) - 2, HttpResponseStatus.OK.code(),
                                    false);
@@ -124,7 +117,6 @@
     public void testInvalidKeyspace(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "invalidKeyspace", "tbl", "with-lesser-content-length.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
@@ -134,7 +126,6 @@
     public void testInvalidTable(VertxTestContext context) throws IOException
     {
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "invalidTableName", "with-lesser-content-length.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
                                    false);
@@ -146,7 +137,6 @@
         when(mockConfiguration.getMinSpacePercentRequiredForUpload()).thenReturn(100F);
 
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.INSUFFICIENT_STORAGE.code(), false);
@@ -158,7 +148,6 @@
         when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(0);
 
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", "without-md5.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
                                    HttpResponseStatus.TOO_MANY_REQUESTS.code(), false);
@@ -170,7 +159,6 @@
         when(mockConfiguration.getConcurrentUploadsLimit()).thenReturn(1);
 
         UUID uploadId = UUID.randomUUID();
-        ensureStagingDirectoryExists();
         CountDownLatch latch = new CountDownLatch(1);
         sendUploadRequestAndVerify(latch, context, uploadId, "invalidKeyspace", "tbl", "without-md5.db", "",
                                    Files.size(Paths.get(FILE_TO_BE_UPLOADED)), HttpResponseStatus.BAD_REQUEST.code(),
@@ -251,9 +239,4 @@
             client.close();
         });
     }
-
-    private void ensureStagingDirectoryExists() throws IOException
-    {
-        Files.createDirectories(Paths.get(SnapshotUtils.makeStagingDir(temporaryFolder.getAbsolutePath())));
-    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index eb716a6..a20ca22 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -92,7 +92,7 @@
         // get NullPointerExceptions because the mock is not wired up, and we need to prevent vertx from actually
         // doing a vertx.filesystem().deleteRecursive(). So we return a failed future with a fake path when checking
         // if the directory exists.
-        when(mockUploadPathBuilder.resolveStagingDirectory(anyString(), anyString()))
+        when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(), anyString()))
         .thenReturn(Future.failedFuture("fake-path"));
         when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip cleanup"));
         importer = new SSTableImporter(vertx, mockMetadataFetcher, mockConfiguration, executorPools,