Add JMX call to getSSTableCountPerTWCSBucket for TWCS
Patch by Stefan Podkowinski; reviewed by Caleb Rackliffe and Marcus Eriksson for CASSANDRA-17774
Co-authored-by: Stefan Podkowinski <s.podkowinski@gmail.com>
Co-authored-by: Josh McKenzie <jmckenzie@apache.org>
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c9137f..bdeef81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
* When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to know when to check the ring but checks that the ring wasn't changed in -Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we publish load stats (CASSANDRA-17776)
* When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754)
* Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 94ca180..a1a5ce4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -22,7 +22,6 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
-import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -609,6 +608,7 @@
return directories;
}
+ @Override
public List<String> getDataPaths() throws IOException
{
List<String> dataPaths = new ArrayList<>();
@@ -1925,12 +1925,14 @@
}
}
+ @Override
public void beginLocalSampling(String sampler, int capacity, int durationMillis)
{
metric.samplers.get(SamplerType.valueOf(sampler)).beginSampling(capacity, durationMillis);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
public List<CompositeData> finishLocalSampling(String sampler, int count) throws OpenDataException
{
Sampler samplerImpl = metric.samplers.get(SamplerType.valueOf(sampler));
@@ -1949,11 +1951,13 @@
return result;
}
+ @Override
public boolean isCompactionDiskSpaceCheckEnabled()
{
return compactionSpaceCheck;
}
+ @Override
public void compactionDiskSpaceCheck(boolean enable)
{
compactionSpaceCheck = enable;
@@ -2964,21 +2968,31 @@
return indexManager.getBuiltIndexNames();
}
+ @Override
public int getUnleveledSSTables()
{
return compactionStrategyManager.getUnleveledSSTables();
}
+ @Override
public int[] getSSTableCountPerLevel()
{
return compactionStrategyManager.getSSTableCountPerLevel();
}
+ @Override
public long[] getPerLevelSizeBytes()
{
return compactionStrategyManager.getPerLevelSizeBytes();
}
+ @Override
+ public int[] getSSTableCountPerTWCSBucket()
+ {
+ return compactionStrategyManager.getSSTableCountPerTWCSBucket();
+ }
+
+ @Override
public int getLevelFanoutSize()
{
return compactionStrategyManager.getLevelFanoutSize();
@@ -3074,6 +3088,7 @@
}
}
+ @Override
public double getDroppableTombstoneRatio()
{
double allDroppable = 0;
@@ -3088,6 +3103,7 @@
return allColumns > 0 ? allDroppable / allColumns : 0;
}
+ @Override
public long trueSnapshotsSize()
{
return getDirectories().trueSnapshotsSize();
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 5b6fd16..d674011 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -218,6 +218,12 @@
public long[] getPerLevelSizeBytes();
/**
+ * @return sstable count for each bucket in TWCS. null unless time window compaction is used.
+ * array index corresponds to bucket(int[0] is for most recent, ...).
+ */
+ public int[] getSSTableCountPerTWCSBucket();
+
+ /**
* @return sstable fanout size for level compaction strategy.
*/
public int getLevelFanoutSize();
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 65359b1..ca67ddb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -23,14 +23,19 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -141,6 +146,8 @@
private volatile long maxSSTableSizeBytes;
private volatile String name;
+ public static int TWCS_BUCKET_COUNT_MAX = 128;
+
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
@@ -610,6 +617,32 @@
}
}
+ public int[] getSSTableCountPerTWCSBucket()
+ {
+ readLock.lock();
+ try
+ {
+ List<Map<Long, Integer>> countsByBucket = Stream.concat(
+ StreamSupport.stream(repaired.allStrategies().spliterator(), false),
+ StreamSupport.stream(unrepaired.allStrategies().spliterator(), false))
+ .filter((TimeWindowCompactionStrategy.class)::isInstance)
+ .map(s -> ((TimeWindowCompactionStrategy)s).getSSTableCountByBuckets())
+ .collect(Collectors.toList());
+ return countsByBucket.isEmpty() ? null : sumCountsByBucket(countsByBucket, TWCS_BUCKET_COUNT_MAX);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
+ static int[] sumCountsByBucket(List<Map<Long, Integer>> countsByBucket, int max)
+ {
+ TreeMap<Long, Integer> merged = new TreeMap<>(Comparator.reverseOrder());
+ countsByBucket.stream().flatMap(e -> e.entrySet().stream()).forEach(e -> merged.merge(e.getKey(), e.getValue(), Integer::sum));
+ return merged.values().stream().limit(max).mapToInt(i -> i).toArray();
+ }
+
static int[] sumArrays(int[] a, int[] b)
{
int[] res = new int[Math.max(a.length, b.length)];
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index d3b3021..9b9b82c 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -30,6 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
@@ -57,6 +59,9 @@
private long lastExpiredCheck;
private long highestWindowSeen;
+ // This is accessed in both the threading context of compaction / repair and also JMX
+ private volatile Map<Long, Integer> sstableCountByBuckets = Collections.emptyMap();
+
public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
super(cfs, options);
@@ -179,6 +184,7 @@
this.highestWindowSeen);
this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks;
+ this.sstableCountByBuckets = buckets.left.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> buckets.left.get(k).size()));
if (!mostInteresting.sstables.isEmpty())
return mostInteresting.sstables;
return null;
@@ -412,6 +418,10 @@
return Long.MAX_VALUE;
}
+ public Map<Long, Integer> getSSTableCountByBuckets()
+ {
+ return sstableCountByBuckets;
+ }
public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
{
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
index 8b5090b..48b795b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
@@ -71,6 +71,7 @@
public String droppedMutations;
public List<String> sstablesInEachLevel = new ArrayList<>();
public List<String> sstableBytesInEachLevel = new ArrayList<>();
+ public int[] sstableCountPerTWCSBucket = null;
public Boolean isInCorrectLocation = null; // null: option not active
public double droppableTombstoneRatio;
public Map<String, String> topSizePartitions;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
index c6b2301..fc6ef14 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
@@ -241,6 +241,7 @@
statsTable.sstablesInEachLevel.add(count + ((count > maxCount) ? "/" + maxCount : ""));
}
}
+ statsTable.sstableCountPerTWCSBucket = table.getSSTableCountPerTWCSBucket();
long[] leveledSSTablesBytes = table.getPerLevelSizeBytes();
if (leveledSSTablesBytes != null)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index 9960e8e..8a9f2fb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -28,11 +28,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -395,6 +398,41 @@
}
}
+ @Test
+ public void testCountsByBuckets()
+ {
+ Assert.assertArrayEquals(
+ new int[] {2, 2, 4},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1),
+ ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+ Assert.assertArrayEquals(
+ new int[] {1, 1, 3},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ ImmutableMap.of(60000L, 1, 0L, 1),
+ ImmutableMap.of(0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+ Assert.assertArrayEquals(
+ new int[] {1, 1},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ ImmutableMap.of(60000L, 1, 0L, 1),
+ ImmutableMap.of()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+ Assert.assertArrayEquals(
+ new int[] {8, 4},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4),
+ ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4)), 2));
+ Assert.assertArrayEquals(
+ new int[] {1, 1, 2},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ Collections.emptyMap(),
+ ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+ Assert.assertArrayEquals(
+ new int[] {},
+ CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+ Collections.emptyMap(),
+ Collections.emptyMap()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+ }
+
private MockCFS createJBODMockCFS(int disks)
{
// Create #disks data directories to simulate JBOD
@@ -464,8 +502,6 @@
return index;
}
-
-
class MockBoundaryManager
{
private final ColumnFamilyStore cfs;