IR may leak SSTables with pending repair when coming from streaming

patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19182
diff --git a/CHANGES.txt b/CHANGES.txt
index 64c6391..5db267e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
  * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
  * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
  * Fix few types issues and implement types compatibility tests (CASSANDRA-19479)
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 95fc7b8..824b22f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -181,6 +181,7 @@
         return new GroupedSSTableContainer(this);
     }
 
+    public abstract void addSSTable(SSTableReader sstable);
     public abstract void addSSTables(GroupedSSTableContainer sstables);
 
     public abstract void removeSSTables(GroupedSSTableContainer sstables);
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 129ee79..a4084a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -139,6 +139,12 @@
     }
 
     @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        getStrategyFor(sstable).addSSTable(sstable);
+    }
+
+    @Override
     public void addSSTables(GroupedSSTableContainer sstables)
     {
         Preconditions.checkArgument(sstables.numGroups() == strategies.size());
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 99e2ce9..93105c8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -434,6 +434,20 @@
         }
     }
 
+    @VisibleForTesting
+    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+    {
+        readLock.lock();
+        try
+        {
+            return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) || transientRepairs.hasPendingRepairSSTable(sessionID, sstable);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
     public void shutdown()
     {
         writeLock.lock();
@@ -608,7 +622,7 @@
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
         for (SSTableReader sstable : added)
-            compactionStrategyFor(sstable).addSSTable(sstable);
+            getHolder(sstable).addSSTable(sstable);
     }
 
     private int getHolderIndex(SSTableReader sstable)
@@ -1147,6 +1161,8 @@
       */
     public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
     {
+        if (sstables.isEmpty())
+            return;
         Set<SSTableReader> changed = new HashSet<>();
 
         writeLock.lock();
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 03d4111..d8a561b 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -30,7 +30,6 @@
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.Index;
@@ -149,6 +148,13 @@
         return tasks;
     }
 
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        Preconditions.checkArgument(managesSSTable(sstable), "Attempting to add sstable from wrong holder");
+        managers.get(router.getIndexForSSTable(sstable)).addSSTable(sstable);
+    }
+
     AbstractCompactionTask getNextRepairFinishedTask()
     {
         List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers();
@@ -282,4 +288,9 @@
     {
         return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
     }
+
+    public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+    {
+        return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable));
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index aefa40b..bbc7198 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -464,7 +464,7 @@
 
     public synchronized boolean hasDataForSession(UUID sessionID)
     {
-        return strategies.keySet().contains(sessionID);
+        return strategies.containsKey(sessionID);
     }
 
     boolean containsSSTable(SSTableReader sstable)
@@ -482,6 +482,15 @@
         return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList());
     }
 
+    @VisibleForTesting
+    public synchronized boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+    {
+        AbstractCompactionStrategy strat = strategies.get(sessionID);
+        if (strat == null)
+            return false;
+        return strat.getSSTables().contains(sstable);
+    }
+
     /**
      * promotes/demotes sstables involved in a consistent repair that has been finalized, or failed
      */
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 3d72a11..15cec5d 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -460,6 +460,8 @@
 
     public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
     {
+        if (repairStatusesChanged.isEmpty())
+            return;
         INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
         for (INotificationConsumer subscriber : subscribers)
             subscriber.handleNotification(notification, this);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index fce67b5..5526d97 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -20,12 +20,15 @@
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Assert;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -33,7 +36,10 @@
 import org.apache.cassandra.distributed.api.QueryResult;
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.service.ActiveRepairService;
 
 import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
 
@@ -167,6 +173,34 @@
         Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
     }
 
+    public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table)
+    {
+        cluster.forEach(i -> {
+            String name = "node" + i.config().num();
+            i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking
+            i.runOnInstance(() -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
+                for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
+                {
+                    UUID pendingRepair = sstable.getSSTableMetadata().pendingRepair;
+                    if (pendingRepair == null)
+                        continue;
+                    LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
+                    // repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it
+                    if (session != null && !session.isCompleted())
+                        continue;
+                    // The session is complete, yet the sstable is not updated... is this still pending in compaction?
+                    if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable))
+                        continue;
+                    // compaction does not know about the pending repair... race condition since this check started?
+                    if (sstable.getSSTableMetadata().pendingRepair == null)
+                        continue; // yep, race condition... ignore
+                    throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor));
+                }
+            });
+        });
+    }
+
     public enum RepairType {
         FULL {
             public String[] append(String... args)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index 0e156da..5516fc8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -40,6 +40,7 @@
 
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
@@ -82,6 +83,7 @@
             }
 
             Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 
@@ -398,6 +400,7 @@
                 {
                     assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
                 }
+                assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
             }
             finally
             {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 4228806..590c65a 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -40,6 +40,7 @@
 
 import static java.lang.String.format;
 import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
@@ -125,6 +126,7 @@
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 
@@ -184,6 +186,7 @@
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
+            assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
         });
     }
 }