CASSANDRASC-125: Import Queue pendingImports metrics is reporting an … (#117)
The pending imports metric does not aggregate across all keyspaces/tables, in this commit
we aggregate the queue sizes and report on a per host basis.
Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRASC-125
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ec3980..d80f7b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Import Queue pendingImports metrics is reporting an incorrect value (CASSANDRASC-125)
* Add missing method to retrieve the InetSocketAddress to DriverUtils (CASSANDRASC-123)
* Reduce filesystem calls while streaming SSTables (CASSANDRASC-94)
* Record existing and additional metrics with dropwizard (CASSANDRASC-117)
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 0118c81..734dcc4 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -154,7 +154,7 @@
jmx_domain_name: sidecar.vertx.jmx_domain
include: # empty include list means include all
- type: "regex" # possible filter types are "regex" and "equals"
- value: "sidecar.*"
+ value: "Sidecar.*"
- type: "regex"
value: "vertx.*"
exclude: # empty exclude list means exclude nothing
diff --git a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
index 0e69e77..701082b 100644
--- a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
+++ b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
@@ -128,7 +128,7 @@
}
String snapshotFileName = snapshotFile.subpath(snapshotDirNameCount, snapshotFile.getNameCount()).toString();
- return new SnapshotFile(snapshotFileName, snapshotFile, attrs.size(), dataDirectoryIndex, tableId);
+ return new SnapshotFile(snapshotFileName, attrs.size(), dataDirectoryIndex, tableId);
}
catch (IOException e)
{
@@ -256,25 +256,25 @@
public static class SnapshotFile
{
public final String name;
- public final Path path;
public final long size;
public final int dataDirectoryIndex;
public final String tableId;
+ private final int hashCode;
@VisibleForTesting
SnapshotFile(Path path, long size, int dataDirectoryIndex, String tableId)
{
this(Objects.requireNonNull(path.getFileName(), "path.getFileName() cannot be null").toString(),
- path, size, dataDirectoryIndex, tableId);
+ size, dataDirectoryIndex, tableId);
}
- SnapshotFile(String name, Path path, long size, int dataDirectoryIndex, String tableId)
+ public SnapshotFile(String name, long size, int dataDirectoryIndex, String tableId)
{
this.name = name;
- this.path = path;
this.size = size;
this.dataDirectoryIndex = dataDirectoryIndex;
this.tableId = tableId;
+ this.hashCode = Objects.hash(name, size, dataDirectoryIndex, tableId);
}
@Override
@@ -282,7 +282,6 @@
{
return "SnapshotFile{" +
"name='" + name + '\'' +
- ", path=" + path +
", size=" + size +
", dataDirectoryIndex=" + dataDirectoryIndex +
", tableId='" + tableId + '\'' +
@@ -298,14 +297,13 @@
return size == that.size
&& dataDirectoryIndex == that.dataDirectoryIndex
&& Objects.equals(name, that.name)
- && Objects.equals(path, that.path)
&& Objects.equals(tableId, that.tableId);
}
@Override
public int hashCode()
{
- return Objects.hash(name, path, size, dataDirectoryIndex, tableId);
+ return hashCode;
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
index 132c38e..32eb732 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.utils;
import java.util.AbstractMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -67,7 +68,7 @@
private final InstanceMetadataFetcher metadataFetcher;
private final SSTableUploadsPathBuilder uploadPathBuilder;
@VisibleForTesting
- final Map<String, ImportQueue> importQueuePerHost;
+ final Map<ImportId, ImportQueue> importQueuePerHost;
/**
* Constructs a new instance of the SSTableImporter class
@@ -131,25 +132,23 @@
}
/**
- * Returns a key for the queues for the given {@code options}. Classes extending from {@link SSTableImporter}
- * can override the {@link #key(ImportOptions)} method and provide a different key for the queue.
+ * Returns a key for the queues for the given {@code options}.
*
* @param options the import options
* @return a key for the queues for the given {@code options}
*/
- protected String key(ImportOptions options)
+ private ImportId key(ImportOptions options)
{
- return options.host + "$" + options.keyspace + "$" + options.tableName;
+ return new ImportId(options.host, options.keyspace, options.tableName);
}
/**
- * Returns a new queue for the given {@code key}. Classes extending from {@link SSTableImporter} can override
- * this method and provide a different implementation for the queue.
+ * Returns a new queue for the given {@code key}.
*
* @param key the key for the map
* @return a new queue for the given {@code key}
*/
- protected ImportQueue initializeQueue(String key)
+ private ImportQueue initializeQueue(ImportId key)
{
return new ImportQueue();
}
@@ -161,12 +160,16 @@
*/
private void processPendingImports(Long timerId)
{
+ reportPendingImportPerHost();
for (ImportQueue queue : importQueuePerHost.values())
{
if (!queue.isEmpty())
{
executorPools.internal()
- .executeBlocking(p -> maybeDrainImportQueue(queue));
+ .executeBlocking(() -> {
+ maybeDrainImportQueue(queue);
+ return null;
+ });
}
}
}
@@ -177,7 +180,8 @@
*
* @param queue a queue of import tasks
*/
- private void maybeDrainImportQueue(ImportQueue queue)
+ @VisibleForTesting
+ void maybeDrainImportQueue(ImportQueue queue)
{
if (queue.tryLock())
{
@@ -201,12 +205,11 @@
private void drainImportQueue(ImportQueue queue)
{
int successCount = 0, failureCount = 0;
- boolean recorded = false;
InstanceMetrics instanceMetrics = null;
- while (!queue.isEmpty())
+ AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair;
+ while ((pair = queue.poll()) != null)
{
LOGGER.info("Starting SSTable import session");
- AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair = queue.poll();
Promise<Void> promise = pair.getKey();
ImportOptions options = pair.getValue();
@@ -224,13 +227,6 @@
continue;
}
- if (!recorded)
- {
- // +1 offset added to consider already polled entry
- instance.metrics().sstableImport().pendingImports.metric.setValue(queue.size() + 1);
- recorded = true;
- }
-
TableOperations tableOperations = delegate.tableOperations();
if (tableOperations == null)
{
@@ -311,6 +307,31 @@
}
/**
+ * Aggregates pending imports per host from multiple keyspaces and tables
+ */
+ private void reportPendingImportPerHost()
+ {
+ Map<String, Integer> aggregates = new HashMap<>();
+ for (Map.Entry<ImportId, ImportQueue> entry : importQueuePerHost.entrySet())
+ {
+ aggregates.compute(entry.getKey().host, (k, v) -> entry.getValue().size() + (v == null ? 0 : v));
+ }
+
+ aggregates.forEach((host, count) -> {
+ try
+ {
+ // Report aggregate metrics for the queues
+ InstanceMetadata instance = metadataFetcher.instance(host);
+ instance.metrics().sstableImport().pendingImports.metric.setValue(count);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Unable to report metrics for host={} pendingImports={}", host, count, e);
+ }
+ });
+ }
+
+ /**
* A {@link ConcurrentLinkedQueue} that allows for locking the queue while operating on it. The queue
* must be unlocked once the operations are complete.
*/
@@ -623,4 +644,36 @@
}
}
}
+
+ /**
+ * Key used for the map of queues
+ */
+ @VisibleForTesting
+ static class ImportId
+ {
+ private final String host;
+ private final int hashCode;
+
+ public ImportId(String host, String keyspace, String table)
+ {
+ this.host = host;
+ this.hashCode = Objects.hash(host, keyspace, table);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ImportId importId = (ImportId) o;
+ return hashCode == importId.hashCode
+ && Objects.equals(host, importId.host);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return hashCode;
+ }
+ }
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index 7ff87aa..86a3150 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -18,13 +18,17 @@
package org.apache.cassandra.sidecar.utils;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import com.codahale.metrics.SharedMetricRegistries;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
@@ -58,7 +62,7 @@
private TableOperations mockTableOperations1;
private ExecutorPools executorPools;
private SSTableUploadsPathBuilder mockUploadPathBuilder;
- private SSTableImporter importer;
+ private TestSSTableImporter importer;
private ServiceConfiguration serviceConfiguration;
@BeforeEach
@@ -113,17 +117,14 @@
when(mockUploadPathBuilder.resolveUploadIdDirectory(anyString(), anyString()))
.thenReturn(Future.failedFuture("fake-path"));
when(mockUploadPathBuilder.isValidDirectory("fake-path")).thenReturn(Future.failedFuture("skip cleanup"));
- importer = new SSTableImporter(vertx, mockMetadataFetcher, serviceConfiguration, executorPools,
- mockUploadPathBuilder);
+ importer = new TestSSTableImporter(vertx, mockMetadataFetcher, serviceConfiguration, executorPools,
+ mockUploadPathBuilder);
}
@AfterEach
void clear()
{
- registry().removeMatching((name, metric) -> true);
- registry(1).removeMatching((name, metric) -> true);
- registry(2).removeMatching((name, metric) -> true);
- registry(3).removeMatching((name, metric) -> true);
+ SharedMetricRegistries.clear();
}
@Test
@@ -136,9 +137,16 @@
.directory("/dir")
.uploadId("0000-0000")
.build());
+
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+ importer.latch.countDown();
+ });
+
importFuture.onComplete(context.succeeding(v -> {
assertThat(importer.importQueuePerHost).isNotEmpty();
- assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
@@ -146,7 +154,8 @@
verify(mockTableOperations1, times(1))
.importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false);
vertx.setTimer(100, handle -> {
- assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+ // after successful import, the queue must be drained
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
assertThat(instanceMetrics(1).sstableImport().successfulImports.metric.getValue()).isOne();
context.completeNow();
});
@@ -163,6 +172,13 @@
.directory("/dir3")
.uploadId("0000-0000")
.build());
+
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isOne();
+ importer.latch.countDown();
+ });
+
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(HttpException.class);
HttpException exception = (HttpException) p;
@@ -170,13 +186,14 @@
assertThat(exception.getPayload()).isEqualTo("Cassandra service is unavailable");
assertThat(importer.importQueuePerHost).isNotEmpty();
- assertThat(importer.importQueuePerHost).containsKey("127.0.0.3$ks$tbl");
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("127.0.0.3", "ks", "tbl"));
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
loopAssert(1, () -> {
- assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isOne();
+ // import queue must be drained even in the case of failure, and pendingImports metric should reflect that
+ assertThat(instanceMetrics(3).sstableImport().pendingImports.metric.getValue()).isZero();
context.completeNow();
});
}));
@@ -193,6 +210,12 @@
.uploadId("0000-0000")
.build());
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+ importer.latch.countDown();
+ });
+
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(HttpException.class);
HttpException exception = (HttpException) p;
@@ -200,13 +223,13 @@
assertThat(exception.getPayload()).isEqualTo("Failed to import from directories: [/failed-dir]");
assertThat(importer.importQueuePerHost).isNotEmpty();
- assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
vertx.setTimer(100, v -> {
- assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
assertThat(instanceMetrics(1).sstableImport().failedImports.metric.getValue()).isOne();
context.completeNow();
});
@@ -224,19 +247,25 @@
.uploadId("0000-0000")
.build());
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isOne();
+ importer.latch.countDown();
+ });
+
importFuture.onComplete(context.failing(p -> {
assertThat(p).isInstanceOf(RuntimeException.class);
RuntimeException exception = (RuntimeException) p;
assertThat(exception.getMessage()).isEqualTo("Exception during import");
assertThat(importer.importQueuePerHost).isNotEmpty();
- assertThat(importer.importQueuePerHost).containsKey("127.0.0.2$ks$tbl");
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("127.0.0.2", "ks", "tbl"));
for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
{
assertThat(queue).isEmpty();
}
vertx.setTimer(100, v -> {
- assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isOne();
+ assertThat(instanceMetrics(2).sstableImport().pendingImports.metric.getValue()).isZero();
assertThat(instanceMetrics(2).sstableImport().failedImports.metric.getValue()).isOne();
context.completeNow();
});
@@ -276,14 +305,99 @@
.uploadId("0000-0000")
.build();
Future<Void> importFuture = importer.scheduleImport(options);
+
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isOne();
+ importer.latch.countDown();
+ });
+
importFuture.onComplete(context.succeeding(v -> {
assertThat(importer.cancelImport(options)).isFalse();
context.completeNow();
}));
}
- public InstanceMetrics instanceMetrics(int id)
+ @Test
+ void testAggregatesMetricsForTheSameHost(VertxTestContext context)
+ {
+ List<Future<Void>> futures = new ArrayList<>();
+ futures.add(importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
+ .host("localhost")
+ .keyspace("ks")
+ .tableName("tbl")
+ .directory("/dir")
+ .uploadId("0000-0000")
+ .build()));
+ futures.add(importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
+ .host("localhost")
+ .keyspace("ks2")
+ .tableName("tbl")
+ .directory("/dir")
+ .uploadId("0000-0001")
+ .build()));
+
+ loopAssert(1, () -> {
+ // ensure that one element is reported in the import queue
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isEqualTo(2);
+ importer.latch.countDown();
+ });
+
+ Future.all(futures)
+ .onComplete(context.succeeding(v -> {
+ assertThat(importer.importQueuePerHost).isNotEmpty();
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks", "tbl"));
+ assertThat(importer.importQueuePerHost).containsKey(new SSTableImporter.ImportId("localhost", "ks2", "tbl"));
+ for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
+ {
+ assertThat(queue).isEmpty();
+ }
+ verify(mockTableOperations1, times(1))
+ .importNewSSTables("ks", "tbl", "/dir", true, true, true, true, true, true, false);
+ verify(mockTableOperations1, times(1))
+ .importNewSSTables("ks2", "tbl", "/dir", true, true, true, true, true, true, false);
+ vertx.setTimer(100, handle -> {
+ // after successful import, the queue must be drained
+ assertThat(instanceMetrics(1).sstableImport().pendingImports.metric.getValue()).isZero();
+ assertThat(instanceMetrics(1).sstableImport().successfulImports.metric.getValue()).isEqualTo(2);
+ context.completeNow();
+ });
+ }));
+ }
+
+ InstanceMetrics instanceMetrics(int id)
{
return new InstanceMetricsImpl(registry(id));
}
+
+ /**
+ * Injects into the maybeDrainImportQueue method to better test the class behavior
+ */
+ static class TestSSTableImporter extends SSTableImporter
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ TestSSTableImporter(Vertx vertx,
+ InstanceMetadataFetcher metadataFetcher,
+ ServiceConfiguration configuration,
+ ExecutorPools executorPools,
+ SSTableUploadsPathBuilder uploadPathBuilder)
+ {
+ super(vertx, metadataFetcher, configuration, executorPools, uploadPathBuilder);
+ }
+
+ @Override
+ void maybeDrainImportQueue(ImportQueue queue)
+ {
+ try
+ {
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ super.maybeDrainImportQueue(queue);
+ }
+ }
}