Merge branch 'cassandra-3.0' into cassandra-3.11
diff --git a/CHANGES.txt b/CHANGES.txt
index 264887b..aca219e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
* Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
* Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
Merged from 3.0:
+ * Improve TRUNCATE performance (CASSANDRA-13909)
* Implement short read protection on partition boundaries (CASSANDRA-13595)
* Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911)
* Filter header only commit logs before recovery (CASSANDRA-13918)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index f9555f4..b9adc4b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -141,6 +141,21 @@
return accumulate;
}
+ static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
+ {
+ try
+ {
+ for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry : txnLogs.bulkObsoletion(readers).entrySet())
+ obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), entry.getValue()));
+ }
+ catch (Throwable t)
+ {
+ accumulate = Throwables.merge(accumulate, t);
+ }
+
+ return accumulate;
+ }
+
static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
{
if (obsoletions == null || obsoletions.isEmpty())
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 9691ee9..123dd8a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -37,6 +37,7 @@
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LogRecord.Type;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.utils.Throwables;
@@ -285,6 +286,26 @@
throw new IllegalStateException();
}
+ public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
+ {
+ for (LogRecord record : makeRecords(type, toBulkAdd))
+ if (!addRecord(record))
+ throw new IllegalStateException();
+ }
+
+ private Collection<LogRecord> makeRecords(Type type, Iterable<SSTableReader> tables)
+ {
+ assert type == Type.ADD || type == Type.REMOVE;
+
+ for (SSTableReader sstable : tables)
+ {
+ File directory = sstable.descriptor.directory;
+ String fileName = StringUtils.join(directory, File.separator, getFileName());
+ replicas.maybeCreateReplica(directory, fileName, records);
+ }
+ return LogRecord.make(type, tables);
+ }
+
private LogRecord makeRecord(Type type, SSTable table)
{
assert type == Type.ADD || type == Type.REMOVE;
@@ -428,4 +449,9 @@
id.toString(),
LogFile.EXT);
}
+
+ public boolean isEmpty()
+ {
+ return records.isEmpty();
+ }
}
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index eb8400d..dd3fcde 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -21,6 +21,7 @@
package org.apache.cassandra.db.lifecycle;
import java.io.File;
+import java.io.FilenameFilter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
@@ -30,7 +31,9 @@
import java.util.zip.CRC32;
import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
@@ -153,10 +156,35 @@
// there is no separator after the generation number, and this would cause files of sstables with
// a higher generation number that starts with the same number, to be incorrectly classified as files
// of this record sstable
- String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename() + Component.separator);
+ String absoluteTablePath = absolutePath(table.descriptor.baseFilename());
return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
}
+ public static Collection<LogRecord> make(Type type, Iterable<SSTableReader> tables)
+ {
+ // contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc part of the filename) to the sstable
+ Map<String, SSTable> absolutePaths = new HashMap<>();
+ for (SSTableReader table : tables)
+ absolutePaths.put(absolutePath(table.descriptor.baseFilename()), table);
+
+ // maps sstable base file name to the actual files on disk
+ Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet());
+ List<LogRecord> records = new ArrayList<>(existingFiles.size());
+ for (Map.Entry<String, List<File>> entry : existingFiles.entrySet())
+ {
+ List<File> filesOnDisk = entry.getValue();
+ String baseFileName = entry.getKey();
+ SSTable sstable = absolutePaths.get(baseFileName);
+ records.add(make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName));
+ }
+ return records;
+ }
+
+ private static String absolutePath(String baseFilename)
+ {
+ return FileUtils.getCanonicalPath(baseFilename + Component.separator);
+ }
+
public LogRecord withExistingFiles()
{
return make(type, getExistingFiles(), 0, absolutePath.get());
@@ -272,6 +300,41 @@
return files == null ? Collections.emptyList() : Arrays.asList(files);
}
+ /**
+ * absoluteFilePaths contains full file parts up to the component name
+ *
+ * this method finds all files on disk beginning with any of the paths in absoluteFilePaths
+ * @return a map from absoluteFilePath to actual file on disk.
+ */
+ public static Map<String, List<File>> getExistingFiles(Set<String> absoluteFilePaths)
+ {
+ Set<File> uniqueDirectories = absoluteFilePaths.stream().map(path -> Paths.get(path).getParent().toFile()).collect(Collectors.toSet());
+ Map<String, List<File>> fileMap = new HashMap<>();
+ FilenameFilter ff = (dir, name) -> {
+ Descriptor descriptor = null;
+ try
+ {
+ descriptor = Descriptor.fromFilename(dir, name).left;
+ }
+ catch (Throwable t)
+ {// ignored - if we can't parse the filename, just skip the file
+ }
+
+ String absolutePath = descriptor != null ? absolutePath(descriptor.baseFilename()) : null;
+ if (absolutePath != null && absoluteFilePaths.contains(absolutePath))
+ fileMap.computeIfAbsent(absolutePath, k -> new ArrayList<>()).add(new File(dir, name));
+
+ return false;
+ };
+
+ // populate the file map:
+ for (File f : uniqueDirectories)
+ f.listFiles(ff);
+
+ return fileMap;
+ }
+
+
public boolean isFinal()
{
return type.isFinal();
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 75e82f3..f797784 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -165,6 +165,22 @@
return new SSTableTidier(reader, false, this);
}
+ Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader> sstables)
+ {
+ if (!txnFile.isEmpty())
+ throw new IllegalStateException("Bad state when doing bulk obsoletions");
+
+ txnFile.addAll(Type.REMOVE, sstables);
+ Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (tracker != null)
+ tracker.notifyDeleting(sstable);
+ tidiers.put(sstable, new SSTableTidier(sstable, false, this));
+ }
+ return tidiers;
+ }
+
OperationType type()
{
return txnFile.type();
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index e2fcb06..6136f79 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -245,7 +245,7 @@
// It is important that any method accepting/returning a Throwable never throws an exception, and does its best
// to complete the instructions given to it
List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
- accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
+ accumulate = prepareForBulkObsoletion(removed, txnLogs, obsoletions, accumulate);
try
{
txnLogs.finish();
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 1db7944..95168e3 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,14 +18,21 @@
*/
package org.apache.cassandra.db.lifecycle;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -160,12 +167,23 @@
@Test
public void testMarkObsolete()
{
+ testMarkObsoleteHelper(false);
+ }
+ @Test
+ public void testBulkMarkObsolete()
+ {
+ testMarkObsoleteHelper(true);
+ }
+
+ public void testMarkObsoleteHelper(boolean bulk)
+ {
ColumnFamilyStore cfs = MockSchema.newCFS();
LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
+ Iterable<SSTableReader> readersToKeep = Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
- Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
+ Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, txnLogs, obsoletions, null) : Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
assertNotNull(obsoletions);
assertEquals(2, obsoletions.size());
@@ -174,9 +192,47 @@
for (SSTableReader reader : readers)
Assert.assertTrue(reader.isMarkedCompacted());
+ for (SSTableReader reader : readersToKeep)
+ Assert.assertFalse(reader.isMarkedCompacted());
+
accumulate = Helpers.markObsolete(obsoletions, null);
assertNotNull(accumulate);
txnLogs.finish();
}
+
+ @Test
+ public void compareBulkAndNormalObsolete() throws IOException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
+ LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN);
+
+ Collection<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
+ // add a few readers that should not be removed:
+ Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
+
+ List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>();
+ List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>();
+
+ Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, normalObsoletions, null));
+ Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, bulkObsoletions, null));
+
+ assertEquals(Sets.newHashSet(readers), normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+ assertEquals(Sets.newHashSet(readers), bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+
+ Set<String> normalLogRecords = new HashSet<>();
+ Set<String> bulkLogRecords = new HashSet<>();
+
+ for (File f : txnLogs.logFiles())
+ Files.lines(f.toPath()).forEach(bulkLogRecords::add);
+ for (File f : txnLogs2.logFiles())
+ Files.lines(f.toPath()).forEach(normalLogRecords::add);
+
+ Assert.assertEquals(readers.size(), normalLogRecords.size());
+ Assert.assertEquals(bulkLogRecords, normalLogRecords);
+
+ txnLogs.finish();
+ txnLogs2.finish();
+ }
}