CASSANDRASC-125: Import Queue pendingImports metrics is reporting an … (#117)

The pending imports metric does not aggregate across all keyspaces/tables, in this commit
we aggregate the queue sizes and report on a per host basis.

Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-125
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ec3980..d80f7b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Import Queue pendingImports metrics is reporting an incorrect value (CASSANDRASC-125)
  * Add missing method to retrieve the InetSocketAddress to DriverUtils (CASSANDRASC-123)
  * Reduce filesystem calls while streaming SSTables (CASSANDRASC-94)
  * Record existing and additional metrics with dropwizard (CASSANDRASC-117)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 0118c81..734dcc4 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -154,7 +154,7 @@
     jmx_domain_name: sidecar.vertx.jmx_domain
   include:                                    # empty include list means include all
     - type: "regex"                           # possible filter types are "regex" and "equals"
-      value: "sidecar.*"
+      value: "Sidecar.*"
     - type: "regex"
       value: "vertx.*"
   exclude:                                    # empty exclude list means exclude nothing
diff --git a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
index 0e69e77..701082b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
+++ b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
@@ -128,7 +128,7 @@
                                 }
 
                                 String snapshotFileName = snapshotFile.subpath(snapshotDirNameCount, snapshotFile.getNameCount()).toString();
-                                return new SnapshotFile(snapshotFileName, snapshotFile, attrs.size(), dataDirectoryIndex, tableId);
+                                return new SnapshotFile(snapshotFileName, attrs.size(), dataDirectoryIndex, tableId);
                             }
                             catch (IOException e)
                             {
@@ -256,25 +256,25 @@
     public static class SnapshotFile
     {
         public final String name;
-        public final Path path;
         public final long size;
         public final int dataDirectoryIndex;
         public final String tableId;
+        private final int hashCode;
 
         @VisibleForTesting
         SnapshotFile(Path path, long size, int dataDirectoryIndex, String tableId)
         {
             this(Objects.requireNonNull(path.getFileName(), "path.getFileName() cannot be null").toString(),
-                 path, size, dataDirectoryIndex, tableId);
+                 size, dataDirectoryIndex, tableId);
         }
 
-        SnapshotFile(String name, Path path, long size, int dataDirectoryIndex, String tableId)
+        public SnapshotFile(String name, long size, int dataDirectoryIndex, String tableId)
         {
             this.name = name;
-            this.path = path;
             this.size = size;
             this.dataDirectoryIndex = dataDirectoryIndex;
             this.tableId = tableId;
+            this.hashCode = Objects.hash(name, size, dataDirectoryIndex, tableId);
         }
 
         @Override
@@ -282,7 +282,6 @@
         {
             return "SnapshotFile{" +
                    "name='" + name + '\'' +
-                   ", path=" + path +
                    ", size=" + size +
                    ", dataDirectoryIndex=" + dataDirectoryIndex +
                    ", tableId='" + tableId + '\'' +
@@ -298,14 +297,13 @@
             return size == that.size
                    && dataDirectoryIndex == that.dataDirectoryIndex
                    && Objects.equals(name, that.name)
-                   && Objects.equals(path, that.path)
                    && Objects.equals(tableId, that.tableId);
         }
 
         @Override
         public int hashCode()
         {
-            return Objects.hash(name, path, size, dataDirectoryIndex, tableId);
+            return hashCode;
         }
     }
 
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index 132c38e..32eb732 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.utils;
 
 import java.util.AbstractMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -67,7 +68,7 @@
     private final InstanceMetadataFetcher metadataFetcher;
     private final SSTableUploadsPathBuilder uploadPathBuilder;
     @VisibleForTesting
-    final Map<String, ImportQueue> importQueuePerHost;
+    final Map<ImportId, ImportQueue> importQueuePerHost;
 
     /**
      * Constructs a new instance of the SSTableImporter class
@@ -131,25 +132,23 @@
     }
 
     /**
-     * Returns a key for the queues for the given {@code options}. Classes extending from {@link SSTableImporter}
-     * can override the {@link #key(ImportOptions)} method and provide a different key for the queue.
+     * Returns a key for the queues for the given {@code options}.
      *
      * @param options the import options
      * @return a key for the queues for the given {@code options}
      */
-    protected String key(ImportOptions options)
+    private ImportId key(ImportOptions options)
     {
-        return options.host + "$" + options.keyspace + "$" + options.tableName;
+        return new ImportId(options.host, options.keyspace, options.tableName);
     }
 
     /**
-     * Returns a new queue for the given {@code key}. Classes extending from {@link SSTableImporter} can override
-     * this method and provide a different implementation for the queue.
+     * Returns a new queue for the given {@code key}.
      *
      * @param key the key for the map
      * @return a new queue for the given {@code key}
      */
-    protected ImportQueue initializeQueue(String key)
+    private ImportQueue initializeQueue(ImportId key)
     {
         return new ImportQueue();
     }
@@ -161,12 +160,16 @@
      */
     private void processPendingImports(Long timerId)
     {
+        reportPendingImportPerHost();
         for (ImportQueue queue : importQueuePerHost.values())
         {
             if (!queue.isEmpty())
             {
                 executorPools.internal()
-                             .executeBlocking(p -> maybeDrainImportQueue(queue));
+                             .executeBlocking(() -> {
+                                 maybeDrainImportQueue(queue);
+                                 return null;
+                             });
             }
         }
     }
@@ -177,7 +180,8 @@
      *
      * @param queue a queue of import tasks
      */
-    private void maybeDrainImportQueue(ImportQueue queue)
+    @VisibleForTesting
+    void maybeDrainImportQueue(ImportQueue queue)
     {
         if (queue.tryLock())
         {
@@ -201,12 +205,11 @@
     private void drainImportQueue(ImportQueue queue)
     {
         int successCount = 0, failureCount = 0;
-        boolean recorded = false;
         InstanceMetrics instanceMetrics = null;
-        while (!queue.isEmpty())
+        AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair;
+        while ((pair = queue.poll()) != null)
         {
             LOGGER.info("Starting SSTable import session");
-            AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair = queue.poll();
             Promise<Void> promise = pair.getKey();
             ImportOptions options = pair.getValue();
 
@@ -224,13 +227,6 @@
                 continue;
             }
 
-            if (!recorded)
-            {
-                // +1 offset added to consider already polled entry
-                instance.metrics().sstableImport().pendingImports.metric.setValue(queue.size() + 1);
-                recorded = true;
-            }
-
             TableOperations tableOperations = delegate.tableOperations();
             if (tableOperations == null)
             {
@@ -311,6 +307,31 @@
     }
 
     /**
+     * Aggregates pending imports per host from multiple keyspaces and tables
+     */
+    private void reportPendingImportPerHost()
+    {
+        Map<String, Integer> aggregates = new HashMap<>();
+        for (Map.Entry<ImportId, ImportQueue> entry : importQueuePerHost.entrySet())
+        {
+            aggregates.compute(entry.getKey().host, (k, v) -> entry.getValue().size() + (v == null ? 0 : v));
+        }
+
+        aggregates.forEach((host, count) -> {
+            try
+            {
+                // Report aggregate metrics for the queues
+                InstanceMetadata instance = metadataFetcher.instance(host);
+                instance.metrics().sstableImport().pendingImports.metric.setValue(count);
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Unable to report metrics for host={} pendingImports={}", host, count, e);
+            }
+        });
+    }
+
+    /**
      * A {@link ConcurrentLinkedQueue} that allows for locking the queue while operating on it. The queue
      * must be unlocked once the operations are complete.
      */
@@ -623,4 +644,36 @@
             }
         }
     }
+
+    /**
+     * Key used for the map of queues
+     */
+    @VisibleForTesting
+    static class ImportId
+    {
+        private final String host;
+        private final int hashCode;
+
+        public ImportId(String host, String keyspace, String table)
+        {
+            this.host = host;
+            this.hashCode = Objects.hash(host, keyspace, table);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            ImportId importId = (ImportId) o;
+            return hashCode == importId.hashCode
+                   && Objects.equals(host, importId.host);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return hashCode;
+        }
+    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index 7ff87aa..86a3150 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.codahale.metrics.SharedMetricRegistries;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
@@ -58,7 +62,7 @@
     private TableOperations mockTableOperations1;
     private ExecutorPools executorPools;
     private SSTableUploadsPathBuilder mockUploadPathBuilder;
-    private SSTableImporter importer;
+    private TestSSTableImporter importer;
     private ServiceConfiguration serviceConfiguration;
 
     @BeforeEach
@@ -113,17 +117,14 @@
         when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(), anyString()))
         .thenReturn(Future.failedFuture("fake-path"));
         when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip cleanup"));
-        importer = new SSTableImporter(vertx, mockMetadataFetcher, serviceConfiguration, executorPools,
-                                       mockUploadPathBuilder);
+        importer = new TestSSTableImporter(vertx, mockMetadataFetcher, serviceConfiguration, executorPools,
+                                           mockUploadPathBuilder);
     }
 
     @AfterEach
     void clear()
     {
-        registry().removeMatching((name, metric) -> true);
-        registry(1).removeMatching((name, metric) -> true);
-        registry(2).removeMatching((name, metric) -> true);
-        registry(3).removeMatching((name, metric) -> true);
+        SharedMetricRegistries.clear();
     }
 
     @Test
@@ -136,9 +137,16 @@
                                                             .directory("/dir")
                                                             .uploadId("0000-0000")
                                                             .build());
+
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+            importer.latch.countDown();
+        });
+
         importFuture.onComplete(context.succeeding(v -> {
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
@@ -146,7 +154,8 @@
             verify(mockTableOperations1, times(1))
             .importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false);
             vertx.setTimer(100, handle -> {
-                assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+                // after successful import, the queue must be drained
+                assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
                 assertThat(instanceMetrics(1).sstableImport().successfulImports.metric.getValue()).isOne();
                 context.completeNow();
             });
@@ -163,6 +172,13 @@
                                                             .directory("/dir3")
                                                             .uploadId("0000-0000")
                                                             .build());
+
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isOne();
+            importer.latch.countDown();
+        });
+
         importFuture.onComplete(context.failing(p -> {
             assertThat(p).isInstanceOf(HttpException.class);
             HttpException exception = (HttpException) p;
@@ -170,13 +186,14 @@
             assertThat(exception.getPayload()).isEqualTo("Cassandra service is unavailable");
 
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("127.0.0.3$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("127.0.0.3", "ks", "tbl"));
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
             }
             loopAssert(1, () -> {
-                assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isOne();
+                // import queue must be drained even in the case of failure, and pendingImports metric should reflect that
+                assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isZero();
                 context.completeNow();
             });
         }));
@@ -193,6 +210,12 @@
                                                             .uploadId("0000-0000")
                                                             .build());
 
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+            importer.latch.countDown();
+        });
+
         importFuture.onComplete(context.failing(p -> {
             assertThat(p).isInstanceOf(HttpException.class);
             HttpException exception = (HttpException) p;
@@ -200,13 +223,13 @@
             assertThat(exception.getPayload()).isEqualTo("Failed to import from directories: [/failed-dir]");
 
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
             }
             vertx.setTimer(100, v -> {
-                assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+                assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
                 assertThat(instanceMetrics(1).sstableImport().failedImports.metric.getValue()).isOne();
                 context.completeNow();
             });
@@ -224,19 +247,25 @@
                                                             .uploadId("0000-0000")
                                                             .build());
 
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isOne();
+            importer.latch.countDown();
+        });
+
         importFuture.onComplete(context.failing(p -> {
             assertThat(p).isInstanceOf(RuntimeException.class);
             RuntimeException exception = (RuntimeException) p;
             assertThat(exception.getMessage()).isEqualTo("Exception during import");
 
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("127.0.0.2$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("127.0.0.2", "ks", "tbl"));
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
             }
             vertx.setTimer(100, v -> {
-                assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isOne();
+                assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isZero();
                 assertThat(instanceMetrics(2).sstableImport().failedImports.metric.getValue()).isOne();
                 context.completeNow();
             });
@@ -276,14 +305,99 @@
                                                 .uploadId("0000-0000")
                                                 .build();
         Future<Void> importFuture = importer.scheduleImport(options);
+
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+            importer.latch.countDown();
+        });
+
         importFuture.onComplete(context.succeeding(v -> {
             assertThat(importer.cancelImport(options)).isFalse();
             context.completeNow();
         }));
     }
 
-    public InstanceMetrics instanceMetrics(int id)
+    @Test
+    void testAggregatesMetricsForTheSameHost(VertxTestContext context)
+    {
+        List<Future<Void>> futures = new ArrayList<>();
+        futures.add(importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
+                                            .host("localhost")
+                                            .keyspace("ks")
+                                            .tableName("tbl")
+                                            .directory("/dir")
+                                            .uploadId("0000-0000")
+                                            .build()));
+        futures.add(importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
+                                            .host("localhost")
+                                            .keyspace("ks2")
+                                            .tableName("tbl")
+                                            .directory("/dir")
+                                            .uploadId("0000-0001")
+                                            .build()));
+
+        loopAssert(1, () -> {
+            // ensure that one element is reported in the import queue
+            assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isEqualTo(2);
+            importer.latch.countDown();
+        });
+
+        Future.all(futures)
+              .onComplete(context.succeeding(v -> {
+                  assertThat(importer.importQueuePerHost).isNotEmpty();
+                  assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
+                  assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks2", "tbl"));
+                  for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
+                  {
+                      assertThat(queue).isEmpty();
+                  }
+                  verify(mockTableOperations1, times(1))
+                  .importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false);
+                  verify(mockTableOperations1, times(1))
+                  .importNewSSTables("ks2", "tbl", "/dir", true, true, true, true, true, true, false);
+                  vertx.setTimer(100, handle -> {
+                      // after successful import, the queue must be drained
+                      assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
+                      assertThat(instanceMetrics(1).sstableImport().successfulImports.metric.getValue()).isEqualTo(2);
+                      context.completeNow();
+                  });
+              }));
+    }
+
+    InstanceMetrics instanceMetrics(int id)
     {
         return new InstanceMetricsImpl(registry(id));
     }
+
+    /**
+     * Injects into the maybeDrainImportQueue method to better test the class behavior
+     */
+    static class TestSSTableImporter extends SSTableImporter
+    {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        TestSSTableImporter(Vertx vertx,
+                            InstanceMetadataFetcher metadataFetcher,
+                            ServiceConfiguration configuration,
+                            ExecutorPools executorPools,
+                            SSTableUploadsPathBuilder uploadPathBuilder)
+        {
+            super(vertx, metadataFetcher, configuration, executorPools, uploadPathBuilder);
+        }
+
+        @Override
+        void maybeDrainImportQueue(ImportQueue queue)
+        {
+            try
+            {
+                latch.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            super.maybeDrainImportQueue(queue);
+        }
+    }
 }