blob: 8855ceb0d3e37423070a0586350d28bdde2e31ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.googlecode.concurrenttrees.common.Iterables;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.memtable.AbstractMemtable;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
import org.apache.cassandra.io.sstable.ScrubTest;
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.OpOrder.Barrier;
import org.apache.cassandra.utils.concurrent.OpOrder.Group;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ColumnFamilyStoreTest
{
public static final String KEYSPACE1 = "ColumnFamilyStoreTest1";
public static final String KEYSPACE2 = "ColumnFamilyStoreTest2";
public static final String KEYSPACE3 = "ColumnFamilyStoreTest3";
public static final String CF_STANDARD1 = "Standard1";
public static final String CF_STANDARD2 = "Standard2";
public static final String CF_INDEX1 = "Indexed1";
public static final String CF_SPEC_RETRY1 = "SpeculativeRetryTest1";
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
SchemaLoader.keysIndexCFMD(KEYSPACE1, CF_INDEX1, true));
SchemaLoader.createKeyspace(KEYSPACE2,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1));
SchemaLoader.createKeyspace(KEYSPACE3,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE3, CF_SPEC_RETRY1)
.speculativeRetry(SpeculativeRetryPolicy.fromString("50PERCENTILE"))
.additionalWritePolicy(SpeculativeRetryPolicy.fromString("75PERCENTILE")));
}
@Before
public void truncateCFS()
{
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1).truncateBlocking();
Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
}
@Test
public void testMemtableTimestamp() throws Throwable
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
assertEquals(Memtable.NO_MIN_TIMESTAMP, fakeMemTableWithMinTS(cfs, EncodingStats.NO_STATS.minTimestamp).getMinTimestamp());
}
@Test
// create two sstables, and verify that we only deserialize data from the most recent one
public void testTimeSortedQuery()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
new RowUpdateBuilder(cfs.metadata(), 0, "key1")
.clustering("Column1")
.add("val", "asdf")
.build()
.applyUnsafe();
Util.flush(cfs);
new RowUpdateBuilder(cfs.metadata(), 1, "key1")
.clustering("Column1")
.add("val", "asdf")
.build()
.applyUnsafe();
Util.flush(cfs);
((ClearableHistogram)cfs.metric.sstablesPerReadHistogram.cf).clear(); // resets counts
Util.getAll(Util.cmd(cfs, "key1").includeRow("c1").build());
assertEquals(1, cfs.metric.sstablesPerReadHistogram.cf.getCount());
}
@Test
public void testGetColumnWithWrongBF()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
keyspace.getColumnFamilyStores().forEach(ColumnFamilyStore::truncateBlocking);
List<Mutation> rms = new LinkedList<>();
rms.add(new RowUpdateBuilder(cfs.metadata(), 0, "key1")
.clustering("Column1")
.add("val", "asdf")
.build());
Util.writeColumnFamily(rms);
List<SSTableReader> ssTables = keyspace.getAllSSTables(SSTableSet.LIVE);
assertEquals(1, ssTables.size());
Util.disableBloomFilter(cfs);
Util.assertEmpty(Util.cmd(cfs, "key2").build());
}
@Test
public void testEmptyRow() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD2);
RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(), "key1", "Column1").applyUnsafe();
Runnable r = new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
Row toCheck = Util.getOnlyRowUnfiltered(Util.cmd(cfs, "key1").build());
Iterator<Cell<?>> iter = toCheck.cells().iterator();
assert(Iterators.size(iter) == 0);
}
};
reTest(cfs, r);
}
@Test
public void testDeleteStandardRowSticksAfterFlush()
{
// test to make sure flushing after a delete doesn't resurrect delted cols.
String keyspaceName = KEYSPACE1;
String cfName = CF_STANDARD1;
Keyspace keyspace = Keyspace.open(keyspaceName);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
ByteBuffer col = ByteBufferUtil.bytes("val");
ByteBuffer val = ByteBufferUtil.bytes("val1");
// insert
Mutation.SimpleBuilder builder = Mutation.simpleBuilder(keyspaceName, cfs.metadata().partitioner.decorateKey(ByteBufferUtil.bytes("val2")));
builder.update(cfName).row("Column1").add("val", "val1").build();
new RowUpdateBuilder(cfs.metadata(), 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
new RowUpdateBuilder(cfs.metadata(), 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
assertRangeCount(cfs, col, val, 2);
// flush.
Util.flush(cfs);
// insert, don't flush
new RowUpdateBuilder(cfs.metadata(), 1, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
new RowUpdateBuilder(cfs.metadata(), 1, "key4").clustering("Column1").add("val", "val1").build().applyUnsafe();
assertRangeCount(cfs, col, val, 4);
// delete (from sstable and memtable)
RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key1", "Column1").applyUnsafe();
RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key3", "Column1").applyUnsafe();
// verify delete
assertRangeCount(cfs, col, val, 2);
// flush
Util.flush(cfs);
// re-verify delete. // first breakage is right here because of CASSANDRA-1837.
assertRangeCount(cfs, col, val, 2);
// simulate a 'late' insertion that gets put in after the deletion. should get inserted, but fail on read.
new RowUpdateBuilder(cfs.metadata(), 2, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
new RowUpdateBuilder(cfs.metadata(), 2, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
// should still be nothing there because we deleted this row. 2nd breakage, but was undetected because of 1837.
assertRangeCount(cfs, col, val, 2);
// make sure that new writes are recognized.
new RowUpdateBuilder(cfs.metadata(), 10, "key5").clustering("Column1").add("val", "val1").build().applyUnsafe();
new RowUpdateBuilder(cfs.metadata(), 10, "key6").clustering("Column1").add("val", "val1").build().applyUnsafe();
assertRangeCount(cfs, col, val, 4);
// and it remains so after flush. (this wasn't failing before, but it's good to check.)
Util.flush(cfs);
assertRangeCount(cfs, col, val, 4);
}
@Test
public void testClearEphemeralSnapshots()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1);
//cleanup any previous test gargbage
cfs.clearSnapshot("");
int numRows = 1000;
long[] colValues = new long [numRows * 2]; // each row has two columns
for (int i = 0; i < colValues.length; i+=2)
{
colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
colValues[i+1] = 3L; //other column
}
ScrubTest.fillIndexCF(cfs, false, colValues);
cfs.snapshot("nonEphemeralSnapshot", null, false, false);
cfs.snapshot("ephemeralSnapshot", null, true, false);
Map<String, TableSnapshot> snapshotDetails = cfs.listSnapshots();
assertEquals(2, snapshotDetails.size());
assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories());
snapshotDetails = cfs.listSnapshots();
assertEquals(1, snapshotDetails.size());
assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
//test cleanup
cfs.clearSnapshot("");
}
@Test
public void testSnapshotSize() throws IOException
{
// cleanup any previous test gargbage
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
cfs.clearSnapshot("");
// Add row
new RowUpdateBuilder(cfs.metadata(), 0, "key1")
.clustering("Column1")
.add("val", "asdf")
.build()
.applyUnsafe();
Util.flush(cfs);
// snapshot
cfs.snapshot("basic", null, false, false);
// check snapshot was created
Map<String, TableSnapshot> snapshotDetails = cfs.listSnapshots();
assertThat(snapshotDetails).hasSize(1);
assertThat(snapshotDetails).containsKey("basic");
// check that sizeOnDisk > trueSize = 0
TableSnapshot details = snapshotDetails.get("basic");
assertThat(details.computeSizeOnDiskBytes()).isGreaterThan(details.computeTrueSizeBytes());
assertThat(details.computeTrueSizeBytes()).isEqualTo(getSnapshotManifestAndSchemaFileSizes(details));
// compact base table to make trueSize > 0
cfs.forceMajorCompaction();
LifecycleTransaction.waitForDeletions();
// sizeOnDisk > trueSize because trueSize does not include manifest.json
// Check that truesize now is > 0
snapshotDetails = cfs.listSnapshots();
details = snapshotDetails.get("basic");
assertThat(details.computeSizeOnDiskBytes()).isEqualTo(details.computeTrueSizeBytes());
}
@Test
public void testBackupAfterFlush() throws Throwable
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
Util.flush(cfs);
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key2")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
Util.flush(cfs);
for (SSTableReader liveSSTable : cfs.getLiveSSTables())
{
Descriptor existing = liveSSTable.descriptor;
Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing),
KEYSPACE2,
CF_STANDARD1,
liveSSTable.descriptor.id,
liveSSTable.descriptor.version.format);
for (Component c : liveSSTable.getComponents())
assertTrue("Cannot find backed-up file:" + desc.fileFor(c), desc.fileFor(c).exists());
}
}
@Test
public void speculationThreshold()
{
// CF_SPEC_RETRY1 configured to use the 50th percentile for read and 75th percentile for write
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE3).getColumnFamilyStore(CF_SPEC_RETRY1);
cfs.sampleReadLatencyMicros = 123000;
cfs.additionalWriteLatencyMicros = 234000;
// test updating before any stats are present
cfs.updateSpeculationThreshold();
assertThat(cfs.sampleReadLatencyMicros).isEqualTo(123000);
assertThat(cfs.additionalWriteLatencyMicros).isEqualTo(234000);
// Seed the column family with some latency data.
final int count = 10000;
for (int millis = 0; millis < count; millis++)
{
cfs.metric.coordinatorReadLatency.update(millis, TimeUnit.MILLISECONDS);
cfs.metric.coordinatorWriteLatency.update(millis, TimeUnit.MILLISECONDS);
}
// Sanity check the metrics - 50th percentile of linear 0-10000ms
// remember, latencies are only an estimate - off by up to 20% by the 1.2 factor between buckets.
assertThat(cfs.metric.coordinatorReadLatency.getCount()).isEqualTo(count);
assertThat(cfs.metric.coordinatorReadLatency.getSnapshot().getValue(0.5))
.isBetween((double) TimeUnit.MILLISECONDS.toMicros(5839),
(double) TimeUnit.MILLISECONDS.toMicros(5840));
// Sanity check the metrics - 75th percentileof linear 0-10000ms
assertThat(cfs.metric.coordinatorWriteLatency.getCount()).isEqualTo(count);
assertThat(cfs.metric.coordinatorWriteLatency.getSnapshot().getValue(0.75))
.isBetween((double) TimeUnit.MILLISECONDS.toMicros(8409),
(double) TimeUnit.MILLISECONDS.toMicros(8410));
// CF_SPEC_RETRY1 configured to use the 50th percentile for speculation
cfs.updateSpeculationThreshold();
assertThat(cfs.sampleReadLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(5839), TimeUnit.MILLISECONDS.toMicros(5840));
assertThat(cfs.additionalWriteLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(8409), TimeUnit.MILLISECONDS.toMicros(8410));
}
// TODO: Fix once we have working supercolumns in 8099
// // CASSANDRA-3467. the key here is that supercolumn and subcolumn comparators are different
// @Test
// public void testSliceByNamesCommandOnUUIDTypeSCF() throws Throwable
// {
// String keyspaceName = KEYSPACE1;
// String cfName = CF_SUPER6;
// ByteBuffer superColName = LexicalUUIDType.instance.fromString("a4ed3562-0e8e-4b41-bdfd-c45a2774682d");
// Keyspace keyspace = Keyspace.open(keyspaceName);
// ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
// DecoratedKey key = Util.dk("slice-get-uuid-type");
//
// // Insert a row with one supercolumn and multiple subcolumns
// putColsSuper(cfs, key, superColName, new BufferCell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
// new BufferCell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
//
// // Get the entire supercolumn like normal
// ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
// assertEquals(ByteBufferUtil.bytes("A"), cfGet.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a"))).value());
// assertEquals(ByteBufferUtil.bytes("B"), cfGet.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
//
// // Now do the SliceByNamesCommand on the supercolumn, passing both subcolumns in as columns to get
// SortedSet<CellName> sliceColNames = new TreeSet<CellName>(cfs.metadata.comparator);
// sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a")));
// sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b")));
// SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
// ColumnFamily cfSliced = cmd.getRow(keyspace).cf;
//
// // Make sure the slice returns the same as the straight get
// assertEquals(ByteBufferUtil.bytes("A"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a"))).value());
// assertEquals(ByteBufferUtil.bytes("B"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
// }
// TODO: Fix once SSTableSimpleWriter's back in
// @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
// TODO: Fix once SSTableSimpleWriter's back in
// @Test
// public void testLoadNewSSTablesAvoidsOverwrites() throws Throwable
// {
// String ks = KEYSPACE1;
// String cf = CF_STANDARD1;
// ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// SSTableDeletingTask.waitForDeletions();
//
// final CFMetaData cfmeta = Schema.instance.getTableMetadataRef(ks, cf);
// Directories dir = new Directories(cfs.metadata);
//
// // clear old SSTables (probably left by CFS.clearUnsafe() calls in other tests)
// for (Map.Entry<Descriptor, Set<Component>> entry : dir.sstableLister().list().entrySet())
// {
// for (Component component : entry.getValue())
// {
// FileUtils.delete(entry.getKey().filenameFor(component));
// }
// }
//
// // sanity check
// int existingSSTables = dir.sstableLister().list().keySet().size();
// assert existingSSTables == 0 : String.format("%d SSTables unexpectedly exist", existingSSTables);
//
// ByteBuffer key = bytes("key");
//
// SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
// cfmeta, StorageService.getPartitioner())
// {
// @Override
// protected SSTableWriter getWriter()
// {
// // hack for reset generation
// generation.set(0);
// return super.getWriter();
// }
// };
// writer.newRow(key);
// writer.addColumn(bytes("col"), bytes("val"), 1);
// writer.close();
//
// writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
// cfmeta, StorageService.getPartitioner());
// writer.newRow(key);
// writer.addColumn(bytes("col"), bytes("val"), 1);
// writer.close();
//
// Set<Integer> generations = new HashSet<>();
// for (Descriptor descriptor : dir.sstableLister().list().keySet())
// generations.add(descriptor.generation);
//
// // we should have two generations: [1, 2]
// assertEquals(2, generations.size());
// assertTrue(generations.contains(1));
// assertTrue(generations.contains(2));
//
// assertEquals(0, cfs.getLiveSSTables().size());
//
// // start the generation counter at 1 again (other tests have incremented it already)
// cfs.resetFileIndexGenerator();
//
// boolean incrementalBackupsEnabled = DatabaseDescriptor.isIncrementalBackupsEnabled();
// try
// {
// // avoid duplicate hardlinks to incremental backups
// DatabaseDescriptor.setIncrementalBackupsEnabled(false);
// cfs.loadNewSSTables();
// }
// finally
// {
// DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackupsEnabled);
// }
//
// assertEquals(2, cfs.getLiveSSTables().size());
// generations = new HashSet<>();
// for (Descriptor descriptor : dir.sstableLister().list().keySet())
// generations.add(descriptor.generation);
//
// // normally they would get renamed to generations 1 and 2, but since those filenames already exist,
// // they get skipped and we end up with generations 3 and 4
// assertEquals(2, generations.size());
// assertTrue(generations.contains(3));
// assertTrue(generations.contains(4));
// }
public void reTest(ColumnFamilyStore cfs, Runnable verify) throws Exception
{
verify.run();
Util.flush(cfs);
verify.run();
}
private void assertRangeCount(ColumnFamilyStore cfs, ByteBuffer col, ByteBuffer val, int count)
{
assertRangeCount(cfs, cfs.metadata().getColumn(col), val, count);
}
private void assertRangeCount(ColumnFamilyStore cfs, ColumnMetadata col, ByteBuffer val, int count)
{
int found = 0;
if (count != 0)
{
for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).filterOn(col.name.toString(), Operator.EQ, val).build()))
{
for (Row r : partition)
{
if (r.getCell(col).buffer().equals(val))
++found;
}
}
}
assertEquals(count, found);
}
@Test
public void testSnapshotWithoutFlushWithSecondaryIndexes() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_INDEX1);
cfs.truncateBlocking();
UpdateBuilder builder = UpdateBuilder.create(cfs.metadata.get(), "key")
.newRow()
.add("birthdate", 1L)
.add("notbirthdate", 2L);
new Mutation(builder.build()).applyUnsafe();
Util.flush(cfs);
String snapshotName = "newSnapshot";
cfs.snapshotWithoutMemtable(snapshotName);
File snapshotManifestFile = cfs.getDirectories().getSnapshotManifestFile(snapshotName);
SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(snapshotManifestFile);
// Keyspace1-Indexed1 and the corresponding index
assertThat(manifest.getFiles()).hasSize(2);
// Snapshot of the secondary index is stored in the subfolder with the same file name
String baseTableFile = manifest.getFiles().get(0);
String indexTableFile = manifest.getFiles().get(1);
assertThat(baseTableFile).isNotEqualTo(indexTableFile);
assertThat(Directories.isSecondaryIndexFolder(new File(indexTableFile).parent())).isTrue();
Set<String> originalFiles = new HashSet<>();
Iterables.toList(cfs.concatWithIndexes()).stream()
.flatMap(c -> c.getLiveSSTables().stream().map(t -> t.descriptor.fileFor(Components.DATA)))
.forEach(e -> originalFiles.add(e.toString()));
assertThat(originalFiles.stream().anyMatch(f -> f.endsWith(indexTableFile))).isTrue();
assertThat(originalFiles.stream().anyMatch(f -> f.endsWith(baseTableFile))).isTrue();
}
private void createSnapshotAndDelete(String ks, String table, boolean writeData)
{
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
if (writeData)
{
writeData(cfs);
}
TableSnapshot snapshot = cfs.snapshot("basic");
assertThat(snapshot.exists()).isTrue();
assertThat(cfs.listSnapshots().containsKey("basic")).isTrue();
assertThat(cfs.listSnapshots().get("basic")).isEqualTo(snapshot);
snapshot.getDirectories().forEach(FileUtils::deleteRecursive);
assertThat(snapshot.exists()).isFalse();
assertFalse(cfs.listSnapshots().containsKey("basic"));
}
private void writeData(ColumnFamilyStore cfs)
{
if (cfs.name.equals(CF_INDEX1))
{
new RowUpdateBuilder(cfs.metadata(), 2, "key").add("birthdate", 1L).add("notbirthdate", 2L).build().applyUnsafe();
Util.flush(cfs);
}
else
{
new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
Util.flush(cfs);
}
}
@Test
public void testSnapshotCreationAndDeleteEmptyTable() {
createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, false);
createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, false);
createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, false);
createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, false);
createSnapshotAndDelete(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2, false);
}
@Test
public void testSnapshotCreationAndDeletePopulatedTable() {
createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, true);
createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, true);
createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, true);
createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, true);
}
@Test
public void testDataDirectoriesOfColumnFamily() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
List<String> dataPaths = cfs.getDataPaths();
Assert.assertFalse(dataPaths.isEmpty());
Path path = Paths.get(dataPaths.get(0));
String keyspace = path.getParent().getFileName().toString();
String table = path.getFileName().toString().split("-")[0];
Assert.assertEquals(cfs.getTableName(), table);
Assert.assertEquals(KEYSPACE1, keyspace);
}
@Test
public void testScrubDataDirectories() throws Throwable
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
ColumnFamilyStore.scrubDataDirectories(cfs.metadata());
new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
Util.flush(cfs);
// Nuke the metadata and reload that sstable
Collection<SSTableReader> ssTables = cfs.getLiveSSTables();
assertEquals(1, ssTables.size());
SSTableReader ssTable = ssTables.iterator().next();
File dataFile = ssTable.descriptor.fileFor(Components.DATA);
File tmpDataFile = ssTable.descriptor.tmpFileFor(Components.DATA);
dataFile.tryMove(tmpDataFile);
ssTable.selfRef().release();
ColumnFamilyStore.scrubDataDirectories(cfs.metadata());
List<File> ssTableFiles = new Directories(cfs.metadata()).sstableLister(Directories.OnTxnErr.THROW).listFiles();
assertNotNull(ssTableFiles);
assertEquals(0, ssTableFiles.size());
cfs.clearUnsafe();
}
@VisibleForTesting
public static long getSnapshotManifestAndSchemaFileSizes(TableSnapshot snapshot) throws IOException
{
Optional<File> schemaFile = snapshot.getSchemaFile();
Optional<File> manifestFile = snapshot.getManifestFile();
long schemaAndManifestFileSizes = 0;
schemaAndManifestFileSizes += schemaFile.isPresent() ? schemaFile.get().length() : 0;
schemaAndManifestFileSizes += manifestFile.isPresent() ? manifestFile.get().length() : 0;
return schemaAndManifestFileSizes;
}
private Memtable fakeMemTableWithMinTS(ColumnFamilyStore cfs, long minTS)
{
return new AbstractMemtable(cfs.metadata, minTS)
{
@Override
public long put(PartitionUpdate update, UpdateTransaction indexer, Group opGroup)
{
return 0;
}
@Override
public long partitionCount()
{
return 0;
}
@Override
public long getLiveDataSize()
{
return 0;
}
@Override
public void addMemoryUsageTo(MemoryUsage usage)
{
}
@Override
public void markExtraOnHeapUsed(long additionalSpace, Group opGroup)
{
}
@Override
public void markExtraOffHeapUsed(long additionalSpace, Group opGroup)
{
}
@Override
public FlushablePartitionSet<?> getFlushSet(PartitionPosition from, PartitionPosition to)
{
return null;
}
@Override
public void switchOut(Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound)
{
}
@Override
public void discard()
{
}
@Override
public boolean accepts(Group opGroup, CommitLogPosition commitLogPosition)
{
return false;
}
@Override
public CommitLogPosition getApproximateCommitLogLowerBound()
{
return null;
}
@Override
public CommitLogPosition getCommitLogLowerBound()
{
return null;
}
@Override
public LastCommitLogPosition getFinalCommitLogUpperBound()
{
return null;
}
@Override
public boolean mayContainDataBefore(CommitLogPosition position)
{
return false;
}
@Override
public boolean isClean()
{
return false;
}
@Override
public boolean shouldSwitch(FlushReason reason)
{
return false;
}
@Override
public void metadataUpdated()
{
}
@Override
public void localRangesUpdated()
{
}
@Override
public void performSnapshot(String snapshotName)
{
}
@Override
public UnfilteredRowIterator rowIterator(DecoratedKey key,
Slices slices,
ColumnFilter columnFilter,
boolean reversed,
SSTableReadsListener listener)
{
return null;
}
@Override
public UnfilteredPartitionIterator
partitionIterator(ColumnFilter columnFilter, DataRange dataRange, SSTableReadsListener listener)
{
return null;
}
};
}
}