blob: 0459cb3b3bed8b150f75661034ea9c44df97c960 [file] [log] [blame]
package org.apache.cassandra;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOError;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.AbstractReadCommandBuilder;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Directories.DataDirectory;
import org.apache.cassandra.db.DisallowedDirectories;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionTasks;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Cells;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.TableViews;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
import org.apache.cassandra.io.sstable.UUIDBasedSSTableId;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Throwables;
import org.awaitility.Awaitility;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class Util
{
private static final Logger logger = LoggerFactory.getLogger(Util.class);
private static List<UUID> hostIdPool = new ArrayList<>();
public static IPartitioner testPartitioner()
{
return DatabaseDescriptor.getPartitioner();
}
public static DecoratedKey dk(String key)
{
return testPartitioner().decorateKey(ByteBufferUtil.bytes(key));
}
public static DecoratedKey dk(String key, AbstractType<?> type)
{
return testPartitioner().decorateKey(type.fromString(key));
}
public static DecoratedKey dk(ByteBuffer key)
{
return testPartitioner().decorateKey(key);
}
public static PartitionPosition rp(String key)
{
return rp(key, testPartitioner());
}
public static PartitionPosition rp(String key, IPartitioner partitioner)
{
return PartitionPosition.ForKey.get(ByteBufferUtil.bytes(key), partitioner);
}
public static Clustering<?> clustering(ClusteringComparator comparator, Object... o)
{
return comparator.make(o);
}
public static Token token(int key)
{
return testPartitioner().getToken(ByteBufferUtil.bytes(key));
}
public static Token token(String key)
{
return testPartitioner().getToken(ByteBufferUtil.bytes(key));
}
public static Range<PartitionPosition> range(String left, String right)
{
return new Range<>(rp(left), rp(right));
}
public static Range<PartitionPosition> range(IPartitioner p, String left, String right)
{
return new Range<>(rp(left, p), rp(right, p));
}
//Test helper to make an iterator iterable once
public static <T> Iterable<T> once(final Iterator<T> source)
{
return new Iterable<T>()
{
private AtomicBoolean exhausted = new AtomicBoolean();
public Iterator<T> iterator()
{
Preconditions.checkState(!exhausted.getAndSet(true));
return source;
}
};
}
public static ByteBuffer getBytes(long v)
{
byte[] bytes = new byte[8];
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.putLong(v);
bb.rewind();
return bb;
}
public static ByteBuffer getBytes(int v)
{
byte[] bytes = new byte[4];
ByteBuffer bb = ByteBuffer.wrap(bytes);
bb.putInt(v);
bb.rewind();
return bb;
}
/**
* Writes out a bunch of mutations for a single column family.
*
* @param mutations A group of Mutations for the same keyspace and column family.
* @return The ColumnFamilyStore that was used.
*/
public static ColumnFamilyStore writeColumnFamily(List<Mutation> mutations)
{
IMutation first = mutations.get(0);
String keyspaceName = first.getKeyspaceName();
TableId tableId = first.getTableIds().iterator().next();
for (Mutation rm : mutations)
rm.applyUnsafe();
ColumnFamilyStore store = Keyspace.open(keyspaceName).getColumnFamilyStore(tableId);
Util.flush(store);
return store;
}
public static boolean equalsCounterId(CounterId n, ByteBuffer context, int offset)
{
return CounterId.wrap(context, context.position() + offset).equals(n);
}
/**
* Creates initial set of nodes and tokens. Nodes are added to StorageService as 'normal'
*/
public static void createInitialRing(StorageService ss, IPartitioner partitioner, List<Token> endpointTokens,
List<Token> keyTokens, List<InetAddressAndPort> hosts, List<UUID> hostIds, int howMany)
throws UnknownHostException
{
// Expand pool of host IDs as necessary
for (int i = hostIdPool.size(); i < howMany; i++)
hostIdPool.add(UUID.randomUUID());
boolean endpointTokenPrefilled = endpointTokens != null && !endpointTokens.isEmpty();
for (int i=0; i<howMany; i++)
{
if(!endpointTokenPrefilled)
endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
hostIds.add(hostIdPool.get(i));
}
for (int i=0; i<endpointTokens.size(); i++)
{
InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), MessagingService.current_version, 1);
Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
ss.onChange(ep,
ApplicationState.STATUS_WITH_PORT,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
ss.onChange(ep,
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
hosts.add(ep);
}
// check that all nodes are in token metadata
for (int i=0; i<endpointTokens.size(); ++i)
assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
}
public static Future<?> compactAll(ColumnFamilyStore cfs, int gcBefore)
{
List<Descriptor> descriptors = new ArrayList<>();
for (SSTableReader sstable : cfs.getLiveSSTables())
descriptors.add(sstable.descriptor);
return CompactionManager.instance.submitUserDefined(cfs, descriptors, gcBefore);
}
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
try (CompactionTasks tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore))
{
for (AbstractCompactionTask task : tasks)
task.execute(ActiveCompactionsTracker.NOOP);
}
}
public static void expectEOF(Callable<?> callable)
{
expectException(callable, EOFException.class);
}
public static void expectException(Callable<?> callable, Class<?> exception)
{
boolean thrown = false;
try
{
callable.call();
}
catch (Throwable e)
{
assert e.getClass().equals(exception) : e.getClass().getName() + " is not " + exception.getName();
thrown = true;
}
assert thrown : exception.getName() + " not received";
}
public static AbstractReadCommandBuilder.SinglePartitionBuilder cmd(ColumnFamilyStore cfs, Object... partitionKey)
{
return new AbstractReadCommandBuilder.SinglePartitionBuilder(cfs, makeKey(cfs.metadata(), partitionKey));
}
public static AbstractReadCommandBuilder.PartitionRangeBuilder cmd(ColumnFamilyStore cfs)
{
return new AbstractReadCommandBuilder.PartitionRangeBuilder(cfs);
}
static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)
{
if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
return (DecoratedKey)partitionKey[0];
ByteBuffer key = metadata.partitionKeyAsClusteringComparator().make(partitionKey).serializeAsPartitionKey();
return metadata.partitioner.decorateKey(key);
}
public static void assertEmptyUnfiltered(ReadCommand command)
{
try (ReadExecutionController executionController = command.executionController();
UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
{
if (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().partitionKeyType.getString(partition.partitionKey().getKey()));
}
}
}
}
public static void assertEmpty(ReadCommand command)
{
try (ReadExecutionController executionController = command.executionController();
PartitionIterator iterator = command.executeInternal(executionController))
{
if (iterator.hasNext())
{
try (RowIterator partition = iterator.next())
{
throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().partitionKeyType.getString(partition.partitionKey().getKey()));
}
}
}
}
public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command)
{
try (ReadExecutionController controller = command.executionController())
{
return getAllUnfiltered(command, controller);
}
}
public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command, ReadExecutionController controller)
{
List<ImmutableBTreePartition> results = new ArrayList<>();
try (UnfilteredPartitionIterator iterator = command.executeLocally(controller))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
results.add(ImmutableBTreePartition.create(partition));
}
}
}
return results;
}
public static List<FilteredPartition> getAll(ReadCommand command)
{
try (ReadExecutionController controller = command.executionController())
{
return getAll(command, controller);
}
}
public static List<FilteredPartition> getAll(ReadCommand command, ReadExecutionController controller)
{
List<FilteredPartition> results = new ArrayList<>();
try (PartitionIterator iterator = command.executeInternal(controller))
{
while (iterator.hasNext())
{
try (RowIterator partition = iterator.next())
{
results.add(FilteredPartition.create(partition));
}
}
}
return results;
}
public static Row getOnlyRowUnfiltered(ReadCommand cmd)
{
try (ReadExecutionController executionController = cmd.executionController();
UnfilteredPartitionIterator iterator = cmd.executeLocally(executionController))
{
assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
try (UnfilteredRowIterator partition = iterator.next())
{
assert !iterator.hasNext() : "Expecting a single partition but got more";
assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
Row row = ((Row)partition.next());
assert !partition.hasNext() : "Expecting a single row but got more";
return row;
}
}
}
public static Row getOnlyRow(ReadCommand cmd)
{
try (ReadExecutionController executionController = cmd.executionController();
PartitionIterator iterator = cmd.executeInternal(executionController))
{
assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
try (RowIterator partition = iterator.next())
{
assert !iterator.hasNext() : "Expecting a single partition but got more";
assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
Row row = partition.next();
assert !partition.hasNext() : "Expecting a single row but got more";
return row;
}
}
}
public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd)
{
try (ReadExecutionController controller = cmd.executionController())
{
return getOnlyPartitionUnfiltered(cmd, controller);
}
}
public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd, ReadExecutionController controller)
{
try (UnfilteredPartitionIterator iterator = cmd.executeLocally(controller))
{
assert iterator.hasNext() : "Expecting a single partition but got nothing";
try (UnfilteredRowIterator partition = iterator.next())
{
assert !iterator.hasNext() : "Expecting a single partition but got more";
return ImmutableBTreePartition.create(partition);
}
}
}
public static FilteredPartition getOnlyPartition(ReadCommand cmd)
{
return getOnlyPartition(cmd, false);
}
public static FilteredPartition getOnlyPartition(ReadCommand cmd, boolean trackRepairedStatus)
{
try (ReadExecutionController executionController = cmd.executionController(trackRepairedStatus);
PartitionIterator iterator = cmd.executeInternal(executionController))
{
assert iterator.hasNext() : "Expecting a single partition but got nothing";
try (RowIterator partition = iterator.next())
{
assert !iterator.hasNext() : "Expecting a single partition but got more";
return FilteredPartition.create(partition);
}
}
}
public static UnfilteredRowIterator apply(Mutation mutation)
{
mutation.apply();
assert mutation.getPartitionUpdates().size() == 1;
return mutation.getPartitionUpdates().iterator().next().unfilteredIterator();
}
public static Cell<?> cell(ColumnFamilyStore cfs, Row row, String columnName)
{
ColumnMetadata def = cfs.metadata().getColumn(ByteBufferUtil.bytes(columnName));
assert def != null;
return row.getCell(def);
}
public static Row row(Partition partition, Object... clustering)
{
return partition.getRow(partition.metadata().comparator.make(clustering));
}
public static void assertCellValue(Object value, ColumnFamilyStore cfs, Row row, String columnName)
{
Cell<?> cell = cell(cfs, row, columnName);
assert cell != null : "Row " + row.toString(cfs.metadata()) + " has no cell for " + columnName;
assertEquals(value, Cells.composeValue(cell, cell.column().type));
}
public static void consume(UnfilteredRowIterator iter)
{
try (UnfilteredRowIterator iterator = iter)
{
while (iter.hasNext())
iter.next();
}
}
public static void consume(UnfilteredPartitionIterator iterator)
{
while (iterator.hasNext())
{
consume(iterator.next());
}
}
public static int size(PartitionIterator iter)
{
int size = 0;
while (iter.hasNext())
{
++size;
iter.next().close();
}
return size;
}
public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b)
{
return Objects.equals(a.columns(), b.columns())
&& Objects.equals(a.stats(), b.stats())
&& sameContent(a, b);
}
// Test equality of the iterators, but without caring too much about the "metadata" of said iterator. This is often
// what we want in tests. In particular, the columns() reported by the iterators will sometimes differ because they
// are a superset of what the iterator actually contains, and depending on the method used to get each iterator
// tested, one may include a defined column the other don't while there is not actual content for that column.
public static boolean sameContent(UnfilteredRowIterator a, UnfilteredRowIterator b)
{
return Objects.equals(a.metadata(), b.metadata())
&& Objects.equals(a.isReverseOrder(), b.isReverseOrder())
&& Objects.equals(a.partitionKey(), b.partitionKey())
&& Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion())
&& Objects.equals(a.staticRow(), b.staticRow())
&& Iterators.elementsEqual(a, b);
}
public static boolean sameContent(RowIterator a, RowIterator b)
{
return Objects.equals(a.metadata(), b.metadata())
&& Objects.equals(a.isReverseOrder(), b.isReverseOrder())
&& Objects.equals(a.partitionKey(), b.partitionKey())
&& Objects.equals(a.staticRow(), b.staticRow())
&& Iterators.elementsEqual(a, b);
}
public static boolean sameContent(Mutation a, Mutation b)
{
if (!a.key().equals(b.key()) || !a.getTableIds().equals(b.getTableIds()))
return false;
for (PartitionUpdate update : a.getPartitionUpdates())
{
if (!sameContent(update.unfilteredIterator(), b.getPartitionUpdate(update.metadata()).unfilteredIterator()))
return false;
}
return true;
}
// moved & refactored from KeyspaceTest in < 3.0
public static void assertColumns(Row row, String... expectedColumnNames)
{
Iterator<Cell<?>> cells = row == null ? Collections.emptyIterator() : row.cells().iterator();
String[] actual = Iterators.toArray(Iterators.transform(cells, new Function<Cell<?>, String>()
{
public String apply(Cell<?> cell)
{
return cell.column().name.toString();
}
}), String.class);
assert Arrays.equals(actual, expectedColumnNames)
: String.format("Columns [%s])] is not expected [%s]",
((row == null) ? "" : row.columns().toString()),
StringUtils.join(expectedColumnNames, ","));
}
public static void assertColumn(TableMetadata cfm, Row row, String name, String value, long timestamp)
{
Cell<?> cell = row.getCell(cfm.getColumn(new ColumnIdentifier(name, true)));
assertColumn(cell, value, timestamp);
}
public static void assertColumn(Cell<?> cell, String value, long timestamp)
{
assertNotNull(cell);
assertEquals(0, ByteBufferUtil.compareUnsigned(cell.buffer(), ByteBufferUtil.bytes(value)));
assertEquals(timestamp, cell.timestamp());
}
public static void assertClustering(TableMetadata cfm, Row row, Object... clusteringValue)
{
assertEquals(row.clustering().size(), clusteringValue.length);
assertEquals(0, cfm.comparator.compare(row.clustering(), cfm.comparator.make(clusteringValue)));
}
public static PartitionerSwitcher switchPartitioner(IPartitioner p)
{
return new PartitionerSwitcher(p);
}
public static class PartitionerSwitcher implements AutoCloseable
{
final IPartitioner oldP;
final IPartitioner newP;
public PartitionerSwitcher(IPartitioner partitioner)
{
newP = partitioner;
oldP = StorageService.instance.setPartitionerUnsafe(partitioner);
}
public void close()
{
IPartitioner p = StorageService.instance.setPartitionerUnsafe(oldP);
assert p == newP;
}
}
public static void spinAssertEquals(Object expected, Supplier<Object> actualSupplier, int timeoutInSeconds)
{
spinAssertEquals(null, expected, actualSupplier, timeoutInSeconds, TimeUnit.SECONDS);
}
public static <T> void spinAssertEquals(String message, T expected, Supplier<? extends T> actualSupplier, long timeout, TimeUnit timeUnit)
{
Awaitility.await()
.pollInterval(Duration.ofMillis(100))
.pollDelay(0, TimeUnit.MILLISECONDS)
.atMost(timeout, timeUnit)
.untilAsserted(() -> assertThat(message, actualSupplier.get(), equalTo(expected)));
}
public static void joinThread(Thread thread) throws InterruptedException
{
thread.join(10000);
}
public static AssertionError runCatchingAssertionError(Runnable test)
{
try
{
test.run();
return null;
}
catch (AssertionError e)
{
return e;
}
}
/**
* Wrapper function used to run a test that can sometimes flake for uncontrollable reasons.
*
* If the given test fails on the first run, it is executed the given number of times again, expecting all secondary
* runs to succeed. If they do, the failure is understood as a flake and the test is treated as passing.
*
* Do not use this if the test is deterministic and its success is not influenced by external factors (such as time,
* selection of random seed, network failures, etc.). If the test can be made independent of such factors, it is
* probably preferable to do so rather than use this method.
*
* @param test The test to run.
* @param rerunsOnFailure How many times to re-run it if it fails. All reruns must pass.
* @param message Message to send to System.err on initial failure.
*/
public static void flakyTest(Runnable test, int rerunsOnFailure, String message)
{
AssertionError e = runCatchingAssertionError(test);
if (e == null)
return; // success
logger.info("Test failed. {}", message, e);
logger.info("Re-running {} times to verify it isn't failing more often than it should.", rerunsOnFailure);
int rerunsFailed = 0;
for (int i = 0; i < rerunsOnFailure; ++i)
{
AssertionError t = runCatchingAssertionError(test);
if (t != null)
{
++rerunsFailed;
e.addSuppressed(t);
logger.debug("Test failed again, total num failures: {}", rerunsFailed, t);
}
}
if (rerunsFailed > 0)
{
logger.error("Test failed in {} of the {} reruns.", rerunsFailed, rerunsOnFailure);
throw e;
}
logger.info("All reruns succeeded. Failure treated as flake.");
}
// for use with Optional in tests, can be used as an argument to orElseThrow
public static Supplier<AssertionError> throwAssert(final String message)
{
return () -> new AssertionError(message);
}
public static class UnfilteredSource extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
{
Iterator<Unfiltered> content;
public UnfilteredSource(TableMetadata metadata, DecoratedKey partitionKey, Row staticRow, Iterator<Unfiltered> content)
{
super(metadata,
partitionKey,
DeletionTime.LIVE,
metadata.regularAndStaticColumns(),
staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW,
false,
EncodingStats.NO_STATS);
this.content = content;
}
@Override
protected Unfiltered computeNext()
{
return content.hasNext() ? content.next() : endOfData();
}
}
public static UnfilteredPartitionIterator executeLocally(PartitionRangeReadCommand command,
ColumnFamilyStore cfs,
ReadExecutionController controller)
{
return command.queryStorage(cfs, controller);
}
public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
{
try
{
for ( ; ; )
{
DataDirectory dir = cfs.getDirectories().getWriteableLocation(1);
DisallowedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
}
}
catch (IOError e)
{
// Expected -- marked all directories as unwritable
}
return () -> DisallowedDirectories.clearUnwritableUnsafe();
}
public static PagingState makeSomePagingState(ProtocolVersion protocolVersion)
{
return makeSomePagingState(protocolVersion, Integer.MAX_VALUE);
}
public static PagingState makeSomePagingState(ProtocolVersion protocolVersion, int remainingInPartition)
{
TableMetadata metadata =
TableMetadata.builder("ks", "tbl")
.addPartitionKeyColumn("k", AsciiType.instance)
.addClusteringColumn("c1", AsciiType.instance)
.addClusteringColumn("c2", Int32Type.instance)
.addRegularColumn("myCol", AsciiType.instance)
.build();
ByteBuffer pk = ByteBufferUtil.bytes("someKey");
ColumnMetadata def = metadata.getColumn(new ColumnIdentifier("myCol", false));
Clustering<?> c = Clustering.make(ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42));
Row row = BTreeRow.singleCellRow(c, BufferCell.live(def, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
return new PagingState(pk, mark, 10, remainingInPartition);
}
public static void assertRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
{
assertTrue(a + " not equal to " + b, Iterables.elementsEqual(a, b));
}
public static void assertNotRCEquals(ReplicaCollection<?> a, ReplicaCollection<?> b)
{
assertFalse(a + " equal to " + b, Iterables.elementsEqual(a, b));
}
/**
* Makes sure that the sstables on disk are the same ones as the cfs live sstables (that they have the same generation)
*/
public static void assertOnDiskState(ColumnFamilyStore cfs, int expectedSSTableCount)
{
LifecycleTransaction.waitForDeletions();
assertEquals(expectedSSTableCount, cfs.getLiveSSTables().size());
Set<SSTableId> liveIdentifiers = cfs.getLiveSSTables().stream()
.map(sstable -> sstable.descriptor.id)
.collect(Collectors.toSet());
int fileCount = 0;
for (File f : cfs.getDirectories().getCFDirectories())
{
for (File sst : f.tryList())
{
if (sst.name().contains("Data"))
{
Descriptor d = Descriptor.fromFilename(sst.absolutePath());
assertTrue(liveIdentifiers.contains(d.id));
fileCount++;
}
}
}
assertEquals(expectedSSTableCount, fileCount);
}
/**
* Disable bloom filter on all sstables of given table
*/
public static void disableBloomFilter(ColumnFamilyStore cfs)
{
Collection<SSTableReader> sstables = cfs.getLiveSSTables();
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
for (SSTableReader sstable : sstables)
{
sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent);
txn.update(sstable, true);
txn.checkpoint();
}
txn.finish();
}
for (SSTableReader reader : cfs.getLiveSSTables())
assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter());
}
/**
* Setups Gossiper to mimic the upgrade behaviour when {@link Gossiper#isUpgradingFromVersionLowerThan(CassandraVersion)}
* or {@link Gossiper#hasMajorVersion3Nodes()} is called.
*/
public static void setUpgradeFromVersion(String version)
{
int v = Optional.ofNullable(Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()))
.map(ep -> ep.getApplicationState(ApplicationState.RELEASE_VERSION))
.map(rv -> rv.version)
.orElse(0);
Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION,
VersionedValue.unsafeMakeVersionedValue(version, v + 1));
try
{
// add dummy host to avoid returning early in Gossiper.instance.upgradeFromVersionSupplier
Gossiper.instance.initializeNodeUnsafe(InetAddressAndPort.getByName("127.0.0.2"), UUID.randomUUID(), 1);
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
Gossiper.instance.expireUpgradeFromVersion();
}
/**
* Sets the length of the file to given size. File will be created if not exist.
*
* @param file file for which length needs to be set
* @param size new szie
* @throws IOException on any I/O error.
*/
public static void setFileLength(File file, long size) throws IOException
{
try (FileChannel fileChannel = file.newReadWriteChannel())
{
if (file.length() >= size)
{
fileChannel.truncate(size);
}
else
{
fileChannel.position(size - 1);
fileChannel.write(ByteBuffer.wrap(new byte[1]));
}
}
}
public static Supplier<SequenceBasedSSTableId> newSeqGen(int ... existing)
{
return SequenceBasedSSTableId.Builder.instance.generator(IntStream.of(existing).mapToObj(SequenceBasedSSTableId::new));
}
public static Supplier<UUIDBasedSSTableId> newUUIDGen()
{
return UUIDBasedSSTableId.Builder.instance.generator(Stream.empty());
}
public static Set<Descriptor> getSSTables(String ks, String tableName)
{
return Keyspace.open(ks)
.getColumnFamilyStore(tableName)
.getLiveSSTables()
.stream()
.map(sstr -> sstr.descriptor)
.collect(Collectors.toSet());
}
public static Set<Descriptor> getSnapshots(String ks, String tableName, String snapshotTag)
{
try
{
return Keyspace.open(ks)
.getColumnFamilyStore(tableName)
.getSnapshotSSTableReaders(snapshotTag)
.stream()
.map(sstr -> sstr.descriptor)
.collect(Collectors.toSet());
}
catch (IOException e)
{
throw Throwables.unchecked(e);
}
}
public static Set<Descriptor> getBackups(String ks, String tableName)
{
return Keyspace.open(ks)
.getColumnFamilyStore(tableName)
.getDirectories()
.sstableLister(Directories.OnTxnErr.THROW)
.onlyBackups(true)
.list()
.keySet();
}
public static StreamState bulkLoadSSTables(File dir, String targetKeyspace)
{
SSTableLoader.Client client = new SSTableLoader.Client()
{
private String keyspace;
public void init(String keyspace)
{
this.keyspace = keyspace;
for (Replica replica : StorageService.instance.getLocalReplicas(keyspace))
addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort());
}
public TableMetadataRef getTableMetadata(String tableName)
{
return Schema.instance.getTableMetadataRef(keyspace, tableName);
}
};
SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput(), 1, targetKeyspace);
StreamResultFuture result = loader.stream();
return FBUtilities.waitOnFuture(result);
}
public static File relativizePath(File targetBasePath, File path, int components)
{
Preconditions.checkArgument(components > 0);
Preconditions.checkArgument(path.toPath().getNameCount() >= components);
Path relative = path.toPath().subpath(path.toPath().getNameCount() - components, path.toPath().getNameCount());
return new File(targetBasePath.toPath().resolve(relative));
}
public static void flush(ColumnFamilyStore cfs)
{
cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
}
public static void flushTable(Keyspace keyspace, String table)
{
flush(keyspace.getColumnFamilyStore(table));
}
public static void flushTable(Keyspace keyspace, TableId table)
{
flush(keyspace.getColumnFamilyStore(table));
}
public static void flushTable(String keyspace, String table)
{
flushTable(Keyspace.open(keyspace), table);
}
public static void flush(Keyspace keyspace)
{
FBUtilities.waitOnFutures(keyspace.flush(ColumnFamilyStore.FlushReason.UNIT_TESTS));
}
public static void flushKeyspace(String keyspaceName)
{
flush(Keyspace.open(keyspaceName));
}
public static void flush(TableViews view)
{
view.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
}
}