IR may leak SSTables with pending repair when coming from streaming
patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19182
diff --git a/CHANGES.txt b/CHANGES.txt
index 64c6391..5db267e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.13
+ * IR may leak SSTables with pending repair when coming from streaming (CASSANDRA-19182)
* Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736)
* Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566)
* Fix few types issues and implement types compatibility tests (CASSANDRA-19479)
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index 95fc7b8..824b22f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -181,6 +181,7 @@
return new GroupedSSTableContainer(this);
}
+ public abstract void addSSTable(SSTableReader sstable);
public abstract void addSSTables(GroupedSSTableContainer sstables);
public abstract void removeSSTables(GroupedSSTableContainer sstables);
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 129ee79..a4084a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -139,6 +139,12 @@
}
@Override
+ public void addSSTable(SSTableReader sstable)
+ {
+ getStrategyFor(sstable).addSSTable(sstable);
+ }
+
+ @Override
public void addSSTables(GroupedSSTableContainer sstables)
{
Preconditions.checkArgument(sstables.numGroups() == strategies.size());
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 99e2ce9..93105c8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -434,6 +434,20 @@
}
}
+ @VisibleForTesting
+ public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+ {
+ readLock.lock();
+ try
+ {
+ return pendingRepairs.hasPendingRepairSSTable(sessionID, sstable) || transientRepairs.hasPendingRepairSSTable(sessionID, sstable);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
public void shutdown()
{
writeLock.lock();
@@ -608,7 +622,7 @@
private void handleFlushNotification(Iterable<SSTableReader> added)
{
for (SSTableReader sstable : added)
- compactionStrategyFor(sstable).addSSTable(sstable);
+ getHolder(sstable).addSSTable(sstable);
}
private int getHolderIndex(SSTableReader sstable)
@@ -1147,6 +1161,8 @@
*/
public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
{
+ if (sstables.isEmpty())
+ return;
Set<SSTableReader> changed = new HashSet<>();
writeLock.lock();
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 03d4111..d8a561b 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -30,7 +30,6 @@
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
@@ -149,6 +148,13 @@
return tasks;
}
+ @Override
+ public void addSSTable(SSTableReader sstable)
+ {
+ Preconditions.checkArgument(managesSSTable(sstable), "Attempting to add sstable from wrong holder");
+ managers.get(router.getIndexForSSTable(sstable)).addSSTable(sstable);
+ }
+
AbstractCompactionTask getNextRepairFinishedTask()
{
List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers();
@@ -282,4 +288,9 @@
{
return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
}
+
+ public boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+ {
+ return Iterables.any(managers, prm -> prm.hasPendingRepairSSTable(sessionID, sstable));
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index aefa40b..bbc7198 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -464,7 +464,7 @@
public synchronized boolean hasDataForSession(UUID sessionID)
{
- return strategies.keySet().contains(sessionID);
+ return strategies.containsKey(sessionID);
}
boolean containsSSTable(SSTableReader sstable)
@@ -482,6 +482,15 @@
return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList());
}
+ @VisibleForTesting
+ public synchronized boolean hasPendingRepairSSTable(UUID sessionID, SSTableReader sstable)
+ {
+ AbstractCompactionStrategy strat = strategies.get(sessionID);
+ if (strat == null)
+ return false;
+ return strat.getSSTables().contains(sstable);
+ }
+
/**
* promotes/demotes sstables involved in a consistent repair that has been finalized, or failed
*/
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 3d72a11..15cec5d 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -460,6 +460,8 @@
public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
{
+ if (repairStatusesChanged.isEmpty())
+ return;
INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index fce67b5..5526d97 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -20,12 +20,15 @@
import java.util.Collections;
import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assert;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
@@ -33,7 +36,10 @@
import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.service.ActiveRepairService;
import static org.apache.cassandra.utils.Retry.retryWithBackoffBlocking;
@@ -167,6 +173,34 @@
Assert.assertFalse("Only one repair expected, but found more than one", rs.hasNext());
}
+ public static void assertNoSSTableLeak(ICluster<IInvokableInstance> cluster, String ks, String table)
+ {
+ cluster.forEach(i -> {
+ String name = "node" + i.config().num();
+ i.forceCompact(ks, table); // cleanup happens in compaction, so run before checking
+ i.runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
+ for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables())
+ {
+ UUID pendingRepair = sstable.getSSTableMetadata().pendingRepair;
+ if (pendingRepair == null)
+ continue;
+ LocalSession session = ActiveRepairService.instance.consistent.local.getSession(pendingRepair);
+ // repair maybe async, so some participates may still think the repair is active, which means the sstable SHOULD link to it
+ if (session != null && !session.isCompleted())
+ continue;
+ // The session is complete, yet the sstable is not updated... is this still pending in compaction?
+ if (cfs.getCompactionStrategyManager().hasPendingRepairSSTable(pendingRepair, sstable))
+ continue;
+ // compaction does not know about the pending repair... race condition since this check started?
+ if (sstable.getSSTableMetadata().pendingRepair == null)
+ continue; // yep, race condition... ignore
+ throw new AssertionError(String.format("%s had leak detected on sstable %s", name, sstable.descriptor));
+ }
+ });
+ });
+ }
+
public enum RepairType {
FULL {
public String[] append(String... args)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index 0e156da..5516fc8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -40,6 +40,7 @@
import static java.lang.String.format;
import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
@@ -82,6 +83,7 @@
}
Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+ assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}
@@ -398,6 +400,7 @@
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
+ assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
}
finally
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 4228806..590c65a 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -40,6 +40,7 @@
import static java.lang.String.format;
import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
+import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertNoSSTableLeak;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
import static org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
@@ -125,6 +126,7 @@
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
+ assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}
@@ -184,6 +186,7 @@
{
assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
}
+ assertNoSSTableLeak(CLUSTER, KEYSPACE, table);
});
}
}