Add JMX call to getSSTableCountPerTWCSBucket for TWCS

Patch by Stefan Podkowinski; reviewed by Caleb Rackliffe and Marcus Eriksson for CASSANDRA-17774

Co-authored-by: Stefan Podkowinski <s.podkowinski@gmail.com>
Co-authored-by: Josh McKenzie <jmckenzie@apache.org>
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c9137f..bdeef81 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774)
  * When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to know when to check the ring but checks that the ring wasn't changed in -Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we publish load stats (CASSANDRA-17776)
  * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754)
  * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 94ca180..a1a5ce4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -22,7 +22,6 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
-import java.nio.file.Path;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -609,6 +608,7 @@
         return directories;
     }
 
+    @Override
     public List<String> getDataPaths() throws IOException
     {
         List<String> dataPaths = new ArrayList<>();
@@ -1925,12 +1925,14 @@
         }
     }
 
+    @Override
     public void beginLocalSampling(String sampler, int capacity, int durationMillis)
     {
         metric.samplers.get(SamplerType.valueOf(sampler)).beginSampling(capacity, durationMillis);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
     public List<CompositeData> finishLocalSampling(String sampler, int count) throws OpenDataException
     {
         Sampler samplerImpl = metric.samplers.get(SamplerType.valueOf(sampler));
@@ -1949,11 +1951,13 @@
         return result;
     }
 
+    @Override
     public boolean isCompactionDiskSpaceCheckEnabled()
     {
         return compactionSpaceCheck;
     }
 
+    @Override
     public void compactionDiskSpaceCheck(boolean enable)
     {
         compactionSpaceCheck = enable;
@@ -2964,21 +2968,31 @@
         return indexManager.getBuiltIndexNames();
     }
 
+    @Override
     public int getUnleveledSSTables()
     {
         return compactionStrategyManager.getUnleveledSSTables();
     }
 
+    @Override
     public int[] getSSTableCountPerLevel()
     {
         return compactionStrategyManager.getSSTableCountPerLevel();
     }
 
+    @Override
     public long[] getPerLevelSizeBytes()
     {
         return compactionStrategyManager.getPerLevelSizeBytes();
     }
 
+    @Override
+    public int[] getSSTableCountPerTWCSBucket()
+    {
+        return compactionStrategyManager.getSSTableCountPerTWCSBucket();
+    }
+
+    @Override
     public int getLevelFanoutSize()
     {
         return compactionStrategyManager.getLevelFanoutSize();
@@ -3074,6 +3088,7 @@
         }
     }
 
+    @Override
     public double getDroppableTombstoneRatio()
     {
         double allDroppable = 0;
@@ -3088,6 +3103,7 @@
         return allColumns > 0 ? allDroppable / allColumns : 0;
     }
 
+    @Override
     public long trueSnapshotsSize()
     {
         return getDirectories().trueSnapshotsSize();
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 5b6fd16..d674011 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -218,6 +218,12 @@
     public long[] getPerLevelSizeBytes();
 
     /**
+     * @return sstable count for each bucket in TWCS. null unless time window compaction is used.
+     *         array index corresponds to bucket(int[0] is for most recent, ...).
+     */
+    public int[] getSSTableCountPerTWCSBucket();
+
+    /**
      * @return sstable fanout size for level compaction strategy.
      */
     public int getLevelFanoutSize();
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 65359b1..ca67ddb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -23,14 +23,19 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -141,6 +146,8 @@
     private volatile long maxSSTableSizeBytes;
     private volatile String name;
 
+    public static int TWCS_BUCKET_COUNT_MAX = 128;
+
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
         this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
@@ -610,6 +617,32 @@
         }
     }
 
+    public int[] getSSTableCountPerTWCSBucket()
+    {
+        readLock.lock();
+        try
+        {
+            List<Map<Long, Integer>> countsByBucket = Stream.concat(
+                                                                StreamSupport.stream(repaired.allStrategies().spliterator(), false),
+                                                                StreamSupport.stream(unrepaired.allStrategies().spliterator(), false))
+                                                            .filter((TimeWindowCompactionStrategy.class)::isInstance)
+                                                            .map(s -> ((TimeWindowCompactionStrategy)s).getSSTableCountByBuckets())
+                                                            .collect(Collectors.toList());
+            return countsByBucket.isEmpty() ? null : sumCountsByBucket(countsByBucket, TWCS_BUCKET_COUNT_MAX);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
+    static int[] sumCountsByBucket(List<Map<Long, Integer>> countsByBucket, int max)
+    {
+        TreeMap<Long, Integer> merged = new TreeMap<>(Comparator.reverseOrder());
+        countsByBucket.stream().flatMap(e -> e.entrySet().stream()).forEach(e -> merged.merge(e.getKey(), e.getValue(), Integer::sum));
+        return merged.values().stream().limit(max).mapToInt(i -> i).toArray();
+    }
+
     static int[] sumArrays(int[] a, int[] b)
     {
         int[] res = new int[Math.max(a.length, b.length)];
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index d3b3021..9b9b82c 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -30,6 +30,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
@@ -57,6 +59,9 @@
     private long lastExpiredCheck;
     private long highestWindowSeen;
 
+    // This is accessed in both the threading context of compaction / repair and also JMX
+    private volatile Map<Long, Integer> sstableCountByBuckets = Collections.emptyMap();
+
     public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
         super(cfs, options);
@@ -179,6 +184,7 @@
                 this.highestWindowSeen);
 
         this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks;
+        this.sstableCountByBuckets = buckets.left.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> buckets.left.get(k).size()));
         if (!mostInteresting.sstables.isEmpty())
             return mostInteresting.sstables;
         return null;
@@ -412,6 +418,10 @@
         return Long.MAX_VALUE;
     }
 
+    public Map<Long, Integer> getSSTableCountByBuckets()
+    {
+        return sstableCountByBuckets;
+    }
 
     public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
     {
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
index 8b5090b..48b795b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
@@ -71,6 +71,7 @@
     public String droppedMutations;
     public List<String> sstablesInEachLevel = new ArrayList<>();
     public List<String> sstableBytesInEachLevel = new ArrayList<>();
+    public int[] sstableCountPerTWCSBucket = null;
     public Boolean isInCorrectLocation = null; // null: option not active
     public double droppableTombstoneRatio;
     public Map<String, String> topSizePartitions;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
index c6b2301..fc6ef14 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
@@ -241,6 +241,7 @@
                         statsTable.sstablesInEachLevel.add(count + ((count > maxCount) ? "/" + maxCount : ""));
                     }
                 }
+                statsTable.sstableCountPerTWCSBucket = table.getSSTableCountPerTWCSBucket();
 
                 long[] leveledSSTablesBytes = table.getPerLevelSizeBytes();
                 if (leveledSSTablesBytes != null)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index 9960e8e..8a9f2fb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -28,11 +28,14 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -395,6 +398,41 @@
         }
     }
 
+    @Test
+    public void testCountsByBuckets()
+    {
+        Assert.assertArrayEquals(
+            new int[] {2, 2, 4},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1),
+                ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+        Assert.assertArrayEquals(
+            new int[] {1, 1, 3},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                ImmutableMap.of(60000L, 1, 0L, 1),
+                ImmutableMap.of(0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+        Assert.assertArrayEquals(
+            new int[] {1, 1},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                ImmutableMap.of(60000L, 1, 0L, 1),
+                ImmutableMap.of()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+        Assert.assertArrayEquals(
+            new int[] {8, 4},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4),
+                ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4)), 2));
+        Assert.assertArrayEquals(
+            new int[] {1, 1, 2},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                Collections.emptyMap(),
+                ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+        Assert.assertArrayEquals(
+            new int[] {},
+            CompactionStrategyManager.sumCountsByBucket(ImmutableList.of(
+                Collections.emptyMap(),
+                Collections.emptyMap()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX));
+    }
+
     private MockCFS createJBODMockCFS(int disks)
     {
         // Create #disks data directories to simulate JBOD
@@ -464,8 +502,6 @@
         return index;
     }
 
-
-
     class MockBoundaryManager
     {
         private final ColumnFamilyStore cfs;