blob: ffa3d6a75e9cc06468ce2882f64c05ff0e0f7358 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.cassandra.index.sai;
import java.io.DataInput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.management.AttributeNotFoundException;
import javax.management.ObjectName;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import com.datastax.driver.core.QueryTrace;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sai.disk.SSTableIndex;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.utils.IndexIdentifier;
import org.apache.cassandra.index.sai.disk.format.OnDiskFormat;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
import org.apache.cassandra.index.sai.utils.IndexTermType;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.ResourceLeakDetector;
import org.apache.cassandra.inject.Injection;
import org.apache.cassandra.inject.Injections;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.TOCComponent;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.lucene.codecs.CodecUtil;
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_RANDOM_SEED;
import static org.apache.cassandra.inject.ActionBuilder.newActionBuilder;
import static org.apache.cassandra.inject.Expression.quote;
import static org.apache.cassandra.inject.InvokePointBuilder.newInvokePoint;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public abstract class SAITester extends CQLTester
{
protected static final Logger logger = LoggerFactory.getLogger(SAITester.class);
protected static final String CREATE_KEYSPACE_TEMPLATE = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = " +
"{'class': 'SimpleStrategy', 'replication_factor': '1'}";
protected static final String CREATE_TABLE_TEMPLATE = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }";
protected static final String CREATE_INDEX_TEMPLATE = "CREATE INDEX IF NOT EXISTS ON %%s(%s) USING 'sai'";
protected static final ColumnIdentifier V1_COLUMN_IDENTIFIER = ColumnIdentifier.getInterned("v1", true);
protected static final ColumnIdentifier V2_COLUMN_IDENTIFIER = ColumnIdentifier.getInterned("v2", true);
protected static final Injections.Counter indexBuildCounter = Injections.newCounter("IndexBuildCounter")
.add(newInvokePoint().onClass(CompactionManager.class)
.onMethod("submitIndexBuild",
"SecondaryIndexBuilder",
"ActiveCompactionsTracker"))
.build();
protected static final Injections.Counter perSSTableValidationCounter = Injections.newCounter("PerSSTableValidationCounter")
.add(newInvokePoint().onClass(OnDiskFormat.class)
.onMethod("validatePerSSTableIndexComponents"))
.build();
protected static final Injections.Counter perColumnValidationCounter = Injections.newCounter("PerColumnValidationCounter")
.add(newInvokePoint().onClass(OnDiskFormat.class)
.onMethod("validatePerColumnIndexComponents"))
.build();
private static Randomization random;
public static final ClusteringComparator EMPTY_COMPARATOR = new ClusteringComparator();
public static final PrimaryKey.Factory TEST_FACTORY = new PrimaryKey.Factory(Murmur3Partitioner.instance, EMPTY_COMPARATOR);
@BeforeClass
public static void setUpClass()
{
CQLTester.setUpClass();
// Ensure that the on-disk format statics are loaded before the test run
Version.LATEST.onDiskFormat();
}
@Rule
public TestRule testRules = new ResourceLeakDetector();
@Rule
public FailureWatcher failureRule = new FailureWatcher();
@After
public void removeAllInjections()
{
Injections.deleteAll();
CassandraRelevantProperties.SAI_MINIMUM_POSTINGS_LEAVES.reset();
CassandraRelevantProperties.SAI_POSTINGS_SKIP.reset();
V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.setLimitBytes(V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMIT);
}
public static Randomization getRandom()
{
if (random == null)
random = new Randomization();
return random;
}
public enum CorruptionType
{
REMOVED
{
@Override
public void corrupt(File file) throws IOException
{
if (!file.tryDelete())
throw new IOException("Unable to delete file: " + file);
}
},
EMPTY_FILE
{
@Override
public void corrupt(File file) throws IOException
{
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE))
{
channel.truncate(0);
}
}
},
TRUNCATED_HEADER
{
@Override
public void corrupt(File file) throws IOException
{
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE))
{
channel.truncate(2);
}
}
},
TRUNCATED_DATA
{
@Override
public void corrupt(File file) throws IOException
{
// header length is not fixed, use footer length to navigate a given data position
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE))
{
channel.truncate(file.length() - CodecUtil.footerLength() - 2);
}
}
},
TRUNCATED_FOOTER
{
@Override
public void corrupt(File file) throws IOException
{
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.WRITE))
{
channel.truncate(file.length() - CodecUtil.footerLength() + 2);
}
}
},
APPENDED_DATA
{
@Override
public void corrupt(File file) throws IOException
{
try (RandomAccessFile raf = new RandomAccessFile(file.toJavaIOFile(), "rw"))
{
raf.seek(file.length());
byte[] corruptedData = new byte[100];
new Random().nextBytes(corruptedData);
raf.write(corruptedData);
}
}
};
public abstract void corrupt(File file) throws IOException;
}
public static StorageAttachedIndex createMockIndex(ColumnMetadata column)
{
TableMetadata table = TableMetadata.builder(column.ksName, column.cfName)
.addPartitionKeyColumn("pk", Int32Type.instance)
.addRegularColumn(column.name, column.type)
.partitioner(Murmur3Partitioner.instance)
.caching(CachingParams.CACHE_NOTHING)
.build();
Map<String, String> options = new HashMap<>();
options.put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, StorageAttachedIndex.class.getCanonicalName());
options.put("target", column.name.toString());
IndexMetadata indexMetadata = IndexMetadata.fromSchemaMetadata(column.name.toString(), IndexMetadata.Kind.CUSTOM, options);
ColumnFamilyStore cfs = MockSchema.newCFS(table);
return new StorageAttachedIndex(cfs, indexMetadata);
}
public static StorageAttachedIndex createMockIndex(AbstractType<?> cellType)
{
TableMetadata table = TableMetadata.builder("test", "test")
.addPartitionKeyColumn("pk", Int32Type.instance)
.addRegularColumn("val", cellType)
.partitioner(Murmur3Partitioner.instance)
.caching(CachingParams.CACHE_NOTHING)
.build();
Map<String, String> options = new HashMap<>();
options.put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, StorageAttachedIndex.class.getCanonicalName());
options.put("target", "val");
IndexMetadata indexMetadata = IndexMetadata.fromSchemaMetadata("val", IndexMetadata.Kind.CUSTOM, options);
ColumnFamilyStore cfs = MockSchema.newCFS(table);
return new StorageAttachedIndex(cfs, indexMetadata);
}
public static IndexTermType createIndexTermType(AbstractType<?> cellType)
{
return IndexTermType.create(ColumnMetadata.regularColumn("sai", "internal", "val", cellType), Collections.emptyList(), IndexTarget.Type.SIMPLE);
}
public IndexIdentifier createIndexIdentifier(String indexName)
{
return createIndexIdentifier(keyspace(), currentTable(), indexName);
}
public static IndexIdentifier createIndexIdentifier(String keyspaceName, String tableName, String indexName)
{
return new IndexIdentifier(keyspaceName, tableName, indexName);
}
protected StorageAttachedIndexGroup getCurrentIndexGroup()
{
return StorageAttachedIndexGroup.getIndexGroup(getCurrentColumnFamilyStore());
}
protected void dropIndex(IndexIdentifier indexIdentifier) throws Throwable
{
dropIndex("DROP INDEX %s." + indexIdentifier.indexName);
}
protected void simulateNodeRestart()
{
simulateNodeRestart(true);
}
protected void simulateNodeRestart(boolean wait)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
cfs.indexManager.listIndexes().forEach(index -> ((StorageAttachedIndexGroup)cfs.indexManager.getIndexGroup(index)).reset());
cfs.indexManager.listIndexes().forEach(cfs.indexManager::buildIndex);
cfs.indexManager.executePreJoinTasksBlocking(true);
if (wait)
{
waitForTableIndexesQueryable();
}
}
protected void corruptIndexComponent(IndexComponent indexComponent, CorruptionType corruptionType) throws Exception
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
for (SSTableReader sstable : cfs.getLiveSSTables())
{
File file = IndexDescriptor.create(sstable).fileFor(indexComponent);
corruptionType.corrupt(file);
}
}
protected void corruptIndexComponent(IndexComponent indexComponent, IndexIdentifier indexIdentifier, CorruptionType corruptionType) throws Exception
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
for (SSTableReader sstable : cfs.getLiveSSTables())
{
File file = IndexDescriptor.create(sstable).fileFor(indexComponent, indexIdentifier);
corruptionType.corrupt(file);
}
}
protected boolean indexNeedsFullRebuild(String index)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
return cfs.indexManager.needsFullRebuild(index);
}
protected void verifyInitialIndexFailed(String indexName)
{
// Verify that the initial index build fails...
waitForAssert(() -> assertTrue(indexNeedsFullRebuild(indexName)));
}
protected boolean verifyChecksum(IndexTermType indexContext, IndexIdentifier indexIdentifier)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
for (SSTableReader sstable : cfs.getLiveSSTables())
{
IndexDescriptor indexDescriptor = IndexDescriptor.create(sstable);
if (!indexDescriptor.validatePerSSTableComponents(IndexValidation.CHECKSUM, true, false)
|| !indexDescriptor.validatePerIndexComponents(indexContext, indexIdentifier, IndexValidation.CHECKSUM, true, false))
return false;
}
return true;
}
protected Object getMBeanAttribute(ObjectName name, String attribute) throws Exception
{
return jmxConnection.getAttribute(name, attribute);
}
protected Object getMetricValue(ObjectName metricObjectName)
{
// lets workaround the fact that gauges have Value, but counters have Count
Object metricValue;
try
{
try
{
metricValue = getMBeanAttribute(metricObjectName, "Value");
}
catch (AttributeNotFoundException ignored)
{
metricValue = getMBeanAttribute(metricObjectName, "Count");
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return metricValue;
}
public void waitForCompactions()
{
waitForAssert(() -> assertFalse(CompactionManager.instance.isCompacting(ColumnFamilyStore.all(), ssTableReader -> true)), 10, TimeUnit.SECONDS);
}
protected void waitForCompactionsFinished()
{
waitForAssert(() -> assertEquals(0, getCompactionTasks()), 10, TimeUnit.SECONDS);
}
protected void waitForEquals(ObjectName name, long value)
{
waitForAssert(() -> assertEquals(value, ((Number) getMetricValue(name)).longValue()), 10, TimeUnit.SECONDS);
}
protected ObjectName objectName(String name, String keyspace, String table, String index, String type)
{
try
{
return new ObjectName(String.format("org.apache.cassandra.metrics:type=StorageAttachedIndex," +
"keyspace=%s,table=%s,index=%s,scope=%s,name=%s",
keyspace, table, index, type, name));
}
catch (Throwable ex)
{
throw Throwables.unchecked(ex);
}
}
protected ObjectName objectNameNoIndex(String name, String keyspace, String table, String type)
{
try
{
return new ObjectName(String.format("org.apache.cassandra.metrics:type=StorageAttachedIndex," +
"keyspace=%s,table=%s,scope=%s,name=%s",
keyspace, table, type, name));
}
catch (Throwable ex)
{
throw Throwables.unchecked(ex);
}
}
protected long getSegmentBufferSpaceLimit()
{
return V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.limitBytes();
}
protected long getSegmentBufferUsedBytes()
{
return V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.currentBytesUsed();
}
protected int getColumnIndexBuildsInProgress()
{
return SegmentBuilder.getActiveBuilderCount();
}
protected void upgradeSSTables()
{
try
{
StorageService.instance.upgradeSSTables(KEYSPACE, false, currentTable());
}
catch (Throwable e)
{
throw new RuntimeException(e);
}
}
protected long totalDiskSpaceUsed()
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
return cfs.metric.totalDiskSpaceUsed.getCount();
}
protected long indexDiskSpaceUse()
{
return getCurrentIndexGroup().totalDiskUsage();
}
protected int getOpenIndexFiles()
{
return getCurrentIndexGroup().openIndexFiles();
}
protected long getDiskUsage()
{
return getCurrentIndexGroup().diskUsage();
}
protected void verifyNoIndexFiles()
{
assertTrue(indexFiles().isEmpty());
}
protected void verifyIndexFiles(IndexTermType indexTermType,
IndexIdentifier indexIdentifier,
int indexFiles)
{
verifyIndexFiles(indexTermType, indexIdentifier, indexFiles, indexFiles, indexFiles);
}
protected void verifyIndexFiles(IndexTermType indexTermType,
IndexIdentifier indexIdentifier,
int perSSTableFiles,
int perColumnFiles,
int completionMarkers)
{
Set<File> indexFiles = indexFiles();
for (IndexComponent indexComponent : Version.LATEST.onDiskFormat().perSSTableIndexComponents(false))
{
Component component = SSTableFormat.Components.Types.CUSTOM.createComponent(Version.LATEST.fileNameFormatter().format(indexComponent, null));
Set<File> tableFiles = componentFiles(indexFiles, component);
assertEquals(tableFiles.toString(), perSSTableFiles, tableFiles.size());
}
for (IndexComponent indexComponent : Version.LATEST.onDiskFormat().perColumnIndexComponents(indexTermType))
{
String componentName = Version.LATEST.fileNameFormatter().format(indexComponent, indexIdentifier);
Component component = SSTableFormat.Components.Types.CUSTOM.createComponent(componentName);
Set<File> stringIndexFiles = componentFiles(indexFiles, component);
if (isBuildCompletionMarker(indexComponent))
assertEquals(completionMarkers, stringIndexFiles.size());
else
assertEquals(stringIndexFiles.toString(), perColumnFiles, stringIndexFiles.size());
}
}
protected void verifySSTableIndexes(IndexIdentifier indexIdentifier, int count)
{
try
{
verifySSTableIndexes(indexIdentifier, count, count);
}
catch (Exception e)
{
throw Throwables.unchecked(e);
}
}
protected void verifySSTableIndexes(IndexIdentifier indexIdentifier, int sstableContextCount, int sstableIndexCount)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
StorageAttachedIndexGroup indexGroup = getCurrentIndexGroup();
int contextCount = indexGroup.sstableContextManager().size();
assertEquals("Expected " + sstableContextCount +" SSTableContexts, but got " + contextCount, sstableContextCount, contextCount);
StorageAttachedIndex sai = (StorageAttachedIndex) cfs.indexManager.getIndexByName(indexIdentifier.indexName);
Collection<SSTableIndex> sstableIndexes = sai == null ? Collections.emptyList() : sai.view().getIndexes();
assertEquals("Expected " + sstableIndexCount +" SSTableIndexes, but got " + sstableIndexes.toString(), sstableIndexCount, sstableIndexes.size());
}
protected boolean isBuildCompletionMarker(IndexComponent indexComponent)
{
return (indexComponent == IndexComponent.GROUP_COMPLETION_MARKER) ||
(indexComponent == IndexComponent.COLUMN_COMPLETION_MARKER);
}
protected Set<File> indexFiles()
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
Set<Component> components = cfs.indexManager.listIndexGroups()
.stream()
.filter(g -> g instanceof StorageAttachedIndexGroup)
.map(Index.Group::getComponents)
.flatMap(Set::stream)
.collect(Collectors.toSet());
Set<File> indexFiles = new HashSet<>();
for (Component component : components)
{
List<File> files = cfs.getDirectories().getCFDirectories()
.stream()
.flatMap(dir -> Arrays.stream(dir.tryList()))
.filter(File::isFile)
.filter(f -> f.name().endsWith(component.name))
.collect(Collectors.toList());
indexFiles.addAll(files);
}
return indexFiles;
}
protected Set<File> componentFiles(Collection<File> indexFiles, Component component)
{
return indexFiles.stream().filter(c -> c.name().endsWith(component.name)).collect(Collectors.toSet());
}
public String createTable(String query)
{
return createTable(KEYSPACE, query);
}
@Override
public UntypedResultSet execute(String query, Object... values)
{
return executeFormattedQuery(formatQuery(query), values);
}
@Override
public Session sessionNet()
{
return sessionNet(getDefaultVersion());
}
public void flush(String keyspace, String table)
{
ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(table);
if (store != null)
store.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
}
public void compact(String keyspace, String table)
{
ColumnFamilyStore store = Keyspace.open(keyspace).getColumnFamilyStore(table);
if (store != null)
store.forceMajorCompaction();
}
protected void truncate(boolean snapshot)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
if (snapshot)
cfs.truncateBlocking();
else
cfs.truncateBlockingWithoutSnapshot();
}
protected void rebuildIndexes(String... indexes)
{
ColumnFamilyStore.rebuildSecondaryIndex(KEYSPACE, currentTable(), indexes);
}
protected void reloadSSTableIndex()
{
getCurrentIndexGroup().unsafeReload();
}
protected void runInitializationTask() throws Exception
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
for (Index i : cfs.indexManager.listIndexes())
{
assert i instanceof StorageAttachedIndex;
cfs.indexManager.makeIndexNonQueryable(i, Index.Status.BUILD_FAILED);
cfs.indexManager.buildIndex(i).get();
}
}
protected int getCompactionTasks()
{
return CompactionManager.instance.getActiveCompactions() + CompactionManager.instance.getPendingTasks();
}
protected int snapshot(String snapshotName)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
TableSnapshot snapshot = cfs.snapshot(snapshotName);
return snapshot.getDirectories().size();
}
protected void restoreSnapshot(String snapshot)
{
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(snapshot);
restore(cfs, lister);
}
protected void restore(ColumnFamilyStore cfs, Directories.SSTableLister lister)
{
File dataDirectory = cfs.getDirectories().getDirectoryForNewSSTables();
for (File file : lister.listFiles())
{
file.tryMove(new File(dataDirectory.absolutePath() + File.pathSeparator() + file.name()));
}
cfs.loadNewSSTables();
}
public static void copyTo(DataInput in, OutputStream out, int length) throws IOException
{
byte[] buffer = new byte[64 * 1024];
int copiedBytes = 0;
while (copiedBytes + buffer.length < length)
{
in.readFully(buffer);
out.write(buffer);
copiedBytes += buffer.length;
}
if (copiedBytes < length)
{
int left = length - copiedBytes;
in.readFully(buffer, 0, left);
out.write(buffer, 0, left);
}
}
protected void assertNumRows(int expected, String query, Object... args)
{
ResultSet rs = executeNet(String.format(query, args));
assertEquals(expected, rs.all().size());
}
protected static Injection newFailureOnEntry(String name, Class<?> invokeClass, String method, Class<? extends Throwable> exception)
{
return Injections.newCustom(name)
.add(newInvokePoint().onClass(invokeClass).onMethod(method))
.add(newActionBuilder().actions().doThrow(exception, quote("Injected failure!")))
.build();
}
protected void assertValidationCount(int perSSTable, int perColumn)
{
Assert.assertEquals(perSSTable, perSSTableValidationCounter.get());
Assert.assertEquals(perColumn, perColumnValidationCounter.get());
}
protected void resetValidationCount()
{
perSSTableValidationCounter.reset();
perColumnValidationCounter.reset();
}
protected long indexFilesLastModified()
{
return indexFiles().stream().map(File::lastModified).max(Long::compare).orElse(0L);
}
protected void verifyIndexComponentsIncludedInSSTable() throws Exception
{
verifySSTableComponents(currentTable(), true);
}
protected void verifyIndexComponentsNotIncludedInSSTable() throws Exception
{
verifySSTableComponents(currentTable(), false);
}
private void verifySSTableComponents(String table, boolean indexComponentsExist) throws Exception
{
ColumnFamilyStore cfs = Objects.requireNonNull(Schema.instance.getKeyspaceInstance(KEYSPACE)).getColumnFamilyStore(table);
for (SSTable sstable : cfs.getLiveSSTables())
{
Set<Component> components = sstable.getComponents();
StorageAttachedIndexGroup group = StorageAttachedIndexGroup.getIndexGroup(cfs);
Set<Component> ndiComponents = group == null ? Collections.emptySet() : group.getComponents();
Set<Component> diff = Sets.difference(ndiComponents, components);
if (indexComponentsExist)
assertTrue("Expect all index components are tracked by SSTable, but " + diff + " are not included.",
!ndiComponents.isEmpty() && diff.isEmpty());
else
assertFalse("Expect no index components, but got " + components, components.toString().contains("SAI"));
Set<Component> tocContents = TOCComponent.loadTOC(sstable.descriptor);
assertEquals(components, tocContents);
}
}
protected static void setBDKPostingsWriterSizing(int minimumPostingsLeaves, int postingsSkip)
{
CassandraRelevantProperties.SAI_MINIMUM_POSTINGS_LEAVES.setString(Integer.toString(minimumPostingsLeaves));
CassandraRelevantProperties.SAI_POSTINGS_SKIP.setString(Integer.toString(postingsSkip));
}
protected static void setSegmentWriteBufferSpace(final int segmentSize)
{
V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER.setLimitBytes(segmentSize);
}
protected String getSingleTraceStatement(Session session, String query, String contains)
{
query = String.format(query, KEYSPACE + '.' + currentTable());
QueryTrace trace = session.execute(session.prepare(query).bind().enableTracing()).getExecutionInfo().getQueryTrace();
waitForTracingEvents();
for (QueryTrace.Event event : trace.getEvents())
{
if (event.getDescription().contains(contains))
return event.getDescription();
}
return null;
}
protected ByteComparable integerToByteComparable(int value)
{
return v -> Int32Type.instance.asComparableBytes(Int32Type.instance.decompose(value), v);
}
/**
* Because the tracing executor is single threaded, submitting an empty event should ensure
* that all tracing events mutations have been applied.
*/
protected void waitForTracingEvents()
{
try
{
Stage.TRACING.executor().submit(() -> {}).get();
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.error("Failed to wait for tracing events", t);
}
}
public static class Randomization
{
private final long seed;
private final Random random;
Randomization()
{
seed = TEST_RANDOM_SEED.getLong(System.nanoTime());
random = new Random(seed);
}
public void printSeedOnFailure()
{
logger.error("Randomized test failed. To rerun test use -D{}={}", TEST_RANDOM_SEED.getKey(), seed);
}
public int nextInt()
{
return random.nextInt();
}
public int nextInt(int max)
{
return RandomInts.randomInt(random, max);
}
public int nextIntBetween(int minValue, int maxValue)
{
return RandomInts.randomIntBetween(random, minValue, maxValue);
}
public long nextLong()
{
return random.nextLong();
}
public short nextShort()
{
return (short)random.nextInt(Short.MAX_VALUE + 1);
}
public byte nextByte()
{
return (byte)random.nextInt(Byte.MAX_VALUE + 1);
}
public BigInteger nextBigInteger(int maxNumBits)
{
return new BigInteger(RandomInts.randomInt(random, maxNumBits), random);
}
public BigInteger nextBigInteger(int minNumBits, int maxNumBits)
{
return new BigInteger(RandomInts.randomIntBetween(random, minNumBits, maxNumBits), random);
}
public BigDecimal nextBigDecimal(int minUnscaledValue, int maxUnscaledValue, int minScale, int maxScale)
{
return BigDecimal.valueOf(RandomInts.randomIntBetween(random, minUnscaledValue, maxUnscaledValue),
RandomInts.randomIntBetween(random, minScale, maxScale));
}
public float nextFloat()
{
return random.nextFloat();
}
public double nextDouble()
{
return random.nextDouble();
}
public String nextAsciiString(int minLength, int maxLength)
{
return RandomStrings.randomAsciiOfLengthBetween(random, minLength, maxLength);
}
public String nextTextString(int minLength, int maxLength)
{
return RandomStrings.randomRealisticUnicodeOfLengthBetween(random, minLength, maxLength);
}
public boolean nextBoolean()
{
return random.nextBoolean();
}
public void nextBytes(byte[] bytes)
{
random.nextBytes(bytes);
}
}
public static class FailureWatcher extends TestWatcher
{
@Override
protected void failed(Throwable e, Description description)
{
if (random != null)
random.printSeedOnFailure();
}
}
/**
* Run repeated verification task concurrently with target test
*/
protected static class TestWithConcurrentVerification
{
private static final int verificationMaxInMs = 300_000; // 300s
private final Runnable verificationTask;
private final CountDownLatch verificationStarted = new CountDownLatch(1);
private final Runnable targetTask;
private final CountDownLatch taskCompleted = new CountDownLatch(1);
private final int verificationIntervalInMs;
public TestWithConcurrentVerification(Runnable verificationTask, Runnable targetTask)
{
this(verificationTask, targetTask, 10);
}
/**
* @param verificationTask to be run concurrently with target task
* @param targetTask task to be performed once
* @param verificationIntervalInMs interval between each verification task, -1 to run verification task once
*/
public TestWithConcurrentVerification(Runnable verificationTask, Runnable targetTask, int verificationIntervalInMs)
{
this.verificationTask = verificationTask;
this.targetTask = targetTask;
this.verificationIntervalInMs = verificationIntervalInMs;
}
public void start()
{
Thread verificationThread = new Thread(() -> {
verificationStarted.countDown();
while (true)
{
try
{
verificationTask.run();
if (verificationIntervalInMs < 0 || taskCompleted.await(verificationIntervalInMs, TimeUnit.MILLISECONDS))
break;
}
catch (Throwable e)
{
throw Throwables.unchecked(e);
}
}
});
try
{
verificationThread.start();
verificationStarted.await();
targetTask.run();
taskCompleted.countDown();
verificationThread.join(verificationMaxInMs);
}
catch (InterruptedException e)
{
throw Throwables.unchecked(e);
}
}
}
}