Add separate thread pool for Secondary Index building so it doesn't block compactions
Patch by Chris Lohfink; reviewed by Caleb Rackliffe, Josh McKenzie, Sam Tunnicliffe, and Marcus Eriksson for CASSANDRA-17781
Co-authored-by: Chris Lohfink <clohfink@apple.com>
Co-authored-by: Josh McKenzie <jmckenzie@apache.org>
diff --git a/CHANGES.txt b/CHANGES.txt
index 105522a..4dda88e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add separate thread pool for Secondary Index building so it doesn't block compactions (CASSANDRA-17781)
* Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
* When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to know when to check the ring but checks that the ring wasn't changed in -Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we publish load stats (CASSANDRA-17776)
* When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a17c359..f7eabff 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -327,6 +327,9 @@
public volatile int concurrent_materialized_view_builders = 1;
public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
+ // The number of executors to use for building secondary indexes
+ public int concurrent_index_builders = 2;
+
/**
* @deprecated retry support removed on CASSANDRA-10992
*/
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index b60321e..0af1ef8 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2033,6 +2033,11 @@
return conf.concurrent_validations;
}
+ public static int getConcurrentIndexBuilders()
+ {
+ return conf.concurrent_index_builders;
+ }
+
public static void setConcurrentValidations(int value)
{
value = value > 0 ? value : Integer.MAX_VALUE;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 925d900..49b999e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -129,7 +129,13 @@
private final CompactionExecutor cacheCleanupExecutor = new CacheCleanupExecutor();
private final CompactionExecutor viewBuildExecutor = new ViewBuildExecutor();
- private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor);
+ // We can't house 2i builds in SecondaryIndexManagement because it could cause deadlocks with itself, and can cause
+ // massive to indefinite pauses if prioritized either before or after normal compactions so we instead put it in its
+ // own pool to prevent either scenario.
+ private final SecondaryIndexExecutor secondaryIndexExecutor = new SecondaryIndexExecutor();
+
+ private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor, viewBuildExecutor, secondaryIndexExecutor);
+
@VisibleForTesting
final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
@@ -244,6 +250,7 @@
validationExecutor.shutdown();
viewBuildExecutor.shutdown();
cacheCleanupExecutor.shutdown();
+ secondaryIndexExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : active.getCompactions())
@@ -254,7 +261,8 @@
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
- for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
+ for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor,
+ cacheCleanupExecutor, secondaryIndexExecutor))
{
try
{
@@ -1772,7 +1780,7 @@
}
};
- return executor.submitIfRunning(runnable, "index build");
+ return secondaryIndexExecutor.submitIfRunning(runnable, "index build");
}
/**
@@ -2015,6 +2023,13 @@
metrics.sstablesDropppedFromCompactions.inc(num);
}
+ private static class SecondaryIndexExecutor extends CompactionExecutor
+ {
+ public SecondaryIndexExecutor()
+ {
+ super(DatabaseDescriptor.getConcurrentIndexBuilders(), "SecondaryIndexExecutor", Integer.MAX_VALUE);
+ }
+ }
public List<Map<String, String>> getCompactions()
{