Improve TRUNCATE performance

Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-13909
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a45469..d6423b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)
  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911)
  * Filter header only commit logs before recovery (CASSANDRA-13918)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index f9555f4..b9adc4b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -141,6 +141,21 @@
         return accumulate;
     }
 
+    static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
+    {
+        try
+        {
+            for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry : txnLogs.bulkObsoletion(readers).entrySet())
+                obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), entry.getValue()));
+        }
+        catch (Throwable t)
+        {
+            accumulate = Throwables.merge(accumulate, t);
+        }
+
+        return accumulate;
+    }
+
     static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
     {
         if (obsoletions == null || obsoletions.isEmpty())
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index da5bb39..be26163 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -37,6 +37,7 @@
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.Throwables;
 
@@ -284,6 +285,25 @@
             throw new IllegalStateException();
     }
 
+    public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
+    {
+        for (LogRecord record : makeRecords(type, toBulkAdd))
+            if (!addRecord(record))
+                throw new IllegalStateException();
+    }
+
+    private Collection<LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables)
+    {
+        assert type == Type.ADD || type == Type.REMOVE;
+
+        for (SSTableReader sstable : tables)
+        {
+            File folder = sstable.descriptor.directory;
+            replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        }
+        return LogRecord.make(type, tables);
+    }
+
     private LogRecord makeRecord(Type type, SSTable table)
     {
         assert type == Type.ADD || type == Type.REMOVE;
@@ -414,4 +434,9 @@
     {
         return records;
     }
+
+    public boolean isEmpty()
+    {
+        return records.isEmpty();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index ac6d6d0..a322ea1 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
@@ -30,7 +31,9 @@
 import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -151,10 +154,35 @@
         // there is no separator after the generation number, and this would cause files of sstables with
         // a higher generation number that starts with the same number, to be incorrectly classified as files
         // of this record sstable
-        String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename() + Component.separator);
+        String absoluteTablePath = absolutePath(table.descriptor.baseFilename());
         return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
     }
 
+    public static Collection<LogRecord> make(Type type, Iterable<SSTableReader> tables)
+    {
+        // contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc part of the filename) to the sstable
+        Map<String, SSTable> absolutePaths = new HashMap<>();
+        for (SSTableReader table : tables)
+            absolutePaths.put(absolutePath(table.descriptor.baseFilename()), table);
+
+        // maps sstable base file name to the actual files on disk
+        Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet());
+        List<LogRecord> records = new ArrayList<>(existingFiles.size());
+        for (Map.Entry<String, List<File>> entry : existingFiles.entrySet())
+        {
+            List<File> filesOnDisk = entry.getValue();
+            String baseFileName = entry.getKey();
+            SSTable sstable = absolutePaths.get(baseFileName);
+            records.add(make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName));
+        }
+        return records;
+    }
+
+    private static String absolutePath(String baseFilename)
+    {
+        return FileUtils.getCanonicalPath(baseFilename + Component.separator);
+    }
+
     public LogRecord withExistingFiles()
     {
         return make(type, getExistingFiles(), 0, absolutePath.get());
@@ -275,6 +303,41 @@
         return files == null ? Collections.emptyList() : Arrays.asList(files);
     }
 
+    /**
+     * absoluteFilePaths contains full file parts up to the component name
+     *
+     * this method finds all files on disk beginning with any of the paths in absoluteFilePaths
+     * @return a map from absoluteFilePath to actual file on disk.
+     */
+    public static Map<String, List<File>> getExistingFiles(Set<String> absoluteFilePaths)
+    {
+        Set<File> uniqueDirectories = absoluteFilePaths.stream().map(path -> Paths.get(path).getParent().toFile()).collect(Collectors.toSet());
+        Map<String, List<File>> fileMap = new HashMap<>();
+        FilenameFilter ff = (dir, name) -> {
+            Descriptor descriptor = null;
+            try
+            {
+                descriptor = Descriptor.fromFilename(dir, name).left;
+            }
+            catch (Throwable t)
+            {// ignored - if we can't parse the filename, just skip the file
+            }
+
+            String absolutePath = descriptor != null ? absolutePath(descriptor.baseFilename()) : null;
+            if (absolutePath != null && absoluteFilePaths.contains(absolutePath))
+                fileMap.computeIfAbsent(absolutePath, k -> new ArrayList<>()).add(new File(dir, name));
+
+            return false;
+        };
+
+        // populate the file map:
+        for (File f : uniqueDirectories)
+            f.listFiles(ff);
+
+        return fileMap;
+    }
+
+
     public boolean isFinal()
     {
         return type.isFinal();
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 350477c..6599142 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -163,6 +163,22 @@
         return new SSTableTidier(reader, false, this);
     }
 
+    Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader> sstables)
+    {
+        if (!txnFile.isEmpty())
+            throw new IllegalStateException("Bad state when doing bulk obsoletions");
+
+        txnFile.addAll(Type.REMOVE, sstables);
+        Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>();
+        for (SSTableReader sstable : sstables)
+        {
+            if (tracker != null)
+                tracker.notifyDeleting(sstable);
+            tidiers.put(sstable, new SSTableTidier(sstable, false, this));
+        }
+        return tidiers;
+    }
+
     OperationType type()
     {
         return txnFile.type();
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 9feaa3e..d281278 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -245,7 +245,7 @@
             // It is important that any method accepting/returning a Throwable never throws an exception, and does its best
             // to complete the instructions given to it
             List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-            accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
+            accumulate = prepareForBulkObsoletion(removed, txnLogs, obsoletions, accumulate);
             try
             {
                 txnLogs.finish();
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 3549523..1b8e265 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,14 +18,21 @@
 */
 package org.apache.cassandra.db.lifecycle;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -158,12 +165,23 @@
     @Test
     public void testMarkObsolete()
     {
+        testMarkObsoleteHelper(false);
+    }
+    @Test
+    public void testBulkMarkObsolete()
+    {
+        testMarkObsoleteHelper(true);
+    }
+
+    public void testMarkObsoleteHelper(boolean bulk)
+    {
         ColumnFamilyStore cfs = MockSchema.newCFS();
         LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
         Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
+        Iterable<SSTableReader> readersToKeep = Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
 
         List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
+        Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, txnLogs, obsoletions, null) : Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
         assertNotNull(obsoletions);
         assertEquals(2, obsoletions.size());
 
@@ -172,9 +190,47 @@
         for (SSTableReader reader : readers)
             Assert.assertTrue(reader.isMarkedCompacted());
 
+        for (SSTableReader reader : readersToKeep)
+            Assert.assertFalse(reader.isMarkedCompacted());
+
         accumulate = Helpers.markObsolete(obsoletions, null);
         assertNotNull(accumulate);
 
         txnLogs.finish();
     }
+
+    @Test
+    public void compareBulkAndNormalObsolete() throws IOException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
+        LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN);
+
+        Collection<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
+        // add a few readers that should not be removed:
+        Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
+
+        List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>();
+        List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>();
+
+        Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, normalObsoletions, null));
+        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, bulkObsoletions, null));
+
+        assertEquals(Sets.newHashSet(readers), normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+        assertEquals(Sets.newHashSet(readers), bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+
+        Set<String> normalLogRecords = new HashSet<>();
+        Set<String> bulkLogRecords = new HashSet<>();
+
+        for (File f : txnLogs.logFiles())
+            Files.lines(f.toPath()).forEach(bulkLogRecords::add);
+        for (File f : txnLogs2.logFiles())
+            Files.lines(f.toPath()).forEach(normalLogRecords::add);
+
+        Assert.assertEquals(readers.size(), normalLogRecords.size());
+        Assert.assertEquals(bulkLogRecords, normalLogRecords);
+
+        txnLogs.finish();
+        txnLogs2.finish();
+    }
 }