blob: c7d0fbbc96ff2438b45db7ffb3601db65ab98a47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.fuzz;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import harry.core.Run;
import harry.ddl.SchemaSpec;
import harry.model.OpSelectors;
import harry.operations.Relation;
import harry.operations.Query;
import harry.operations.QueryGenerator;
import harry.util.BitSet;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.AbstractMarker;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.SingleColumnRelation;
import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.cql3.WhereClause;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.cql3.statements.DeleteStatement;
import org.apache.cassandra.cql3.statements.StatementType;
import org.apache.cassandra.db.ClusteringBound;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static harry.generators.DataGenerators.UNSET_VALUE;
public class SSTableGenerator
{
protected final SchemaSpec schema;
protected final OpSelectors.DescriptorSelector descriptorSelector;
protected final OpSelectors.MonotonicClock clock;
protected final ColumnFamilyStore store;
protected final TableMetadata metadata;
protected final QueryGenerator rangeSelector;
private final Set<SSTableReader> sstables = new HashSet<>();
private long lts = 0;
public SSTableGenerator(Run run,
ColumnFamilyStore store)
{
this.schema = run.schemaSpec;
this.descriptorSelector = run.descriptorSelector;
this.clock = run.clock;
this.store = store;
// We assume metadata can not change over the lifetime of sstable generator
this.metadata = store.metadata.get();
this.rangeSelector = new QueryGenerator(schema, run.pdSelector, descriptorSelector, run.rng);
store.disableAutoCompaction();
}
public Collection<SSTableReader> gen(int rows)
{
mark();
for (int i = 0; i < rows; i++)
{
long current = lts++;
write(current, current, current, current, true).applyUnsafe();
if (schema.staticColumns != null)
writeStatic(current, current, current, current, true).applyUnsafe();
}
return flush();
}
public void mark()
{
sstables.clear();
sstables.addAll(store.getLiveSSTables());
}
public Collection<SSTableReader> flush()
{
Util.flush(store);
sstables.removeAll(store.getLiveSSTables());
Set<SSTableReader> ret = new HashSet<>(sstables);
mark();
return ret;
}
public Mutation write(long lts, long pd, long cd, long opId, boolean withRowMarker)
{
long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT, schema);
Object[] partitionKey = schema.inflatePartitionKey(pd);
Object[] clusteringKey = schema.inflateClusteringKey(cd);
Object[] regularColumns = schema.inflateRegularColumns(vds);
RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
FBUtilities.nowInSeconds(),
clock.rts(lts),
metadata.params.defaultTimeToLive,
serializePartitionKey(store, partitionKey))
.clustering(clusteringKey);
if (!withRowMarker)
builder.noRowMarker();
for (int i = 0; i < regularColumns.length; i++)
{
Object value = regularColumns[i];
if (value == UNSET_VALUE)
continue;
ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.regularColumns.get(i).name, false));
builder.add(def, value);
}
return builder.build();
}
public Mutation writeStatic(long lts, long pd, long cd, long opId, boolean withRowMarker)
{
long[] sds = descriptorSelector.sds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT_WITH_STATICS, schema);
Object[] partitionKey = schema.inflatePartitionKey(pd);
Object[] staticColumns = schema.inflateStaticColumns(sds);
RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
FBUtilities.nowInSeconds(),
clock.rts(lts),
metadata.params.defaultTimeToLive,
serializePartitionKey(store, partitionKey));
if (!withRowMarker)
builder.noRowMarker();
for (int i = 0; i < staticColumns.length; i++)
{
Object value = staticColumns[i];
if (value == UNSET_VALUE)
continue;
ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.staticColumns.get(i).name, false));
builder.add(def, staticColumns[i]);
}
return builder.build();
}
public Mutation deleteColumn(long lts, long pd, long cd, long opId)
{
Object[] partitionKey = schema.inflatePartitionKey(pd);
Object[] clusteringKey = schema.inflateClusteringKey(cd);
RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
FBUtilities.nowInSeconds(),
clock.rts(lts),
metadata.params.defaultTimeToLive,
serializePartitionKey(store, partitionKey))
.noRowMarker()
.clustering(clusteringKey);
BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN);
BitSet mask = schema.regularColumnsMask();
if (columns == null || columns.allUnset(mask))
throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
columns.eachSetBit((idx) -> {
if (idx < schema.regularColumnsOffset)
throw new RuntimeException("Can't delete parts of partition or clustering key");
if (idx > schema.allColumns.size())
throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
builder.delete(schema.allColumns.get(idx).name);
}, mask);
return builder.build();
}
public Mutation deleteStatic(long lts, long pd, long opId)
{
Object[] partitionKey = schema.inflatePartitionKey(pd);
RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
FBUtilities.nowInSeconds(),
clock.rts(lts),
metadata.params.defaultTimeToLive,
serializePartitionKey(store, partitionKey))
.noRowMarker();
BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
BitSet mask = schema.staticColumnsMask();
if (columns == null || columns.allUnset(mask))
throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
columns.eachSetBit((idx) -> {
if (idx < schema.staticColumnsOffset)
throw new RuntimeException(String.format("Can't delete parts of partition or clustering key %d (%s)",
idx, schema.allColumns.get(idx)));
if (idx > schema.allColumns.size())
throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
builder.delete(schema.allColumns.get(idx).name);
}, mask);
return builder.build();
}
public Mutation deletePartition(long lts, long pd)
{
Object[] partitionKey = schema.inflatePartitionKey(pd);
PartitionUpdate update = PartitionUpdate.fullPartitionDelete(metadata,
serializePartitionKey(store, partitionKey),
clock.rts(lts),
FBUtilities.nowInSeconds());
return new Mutation(update);
}
public Mutation deleteRow(long lts, long pd, long cd)
{
Object[] partitionKey = schema.inflatePartitionKey(pd);
Object[] clusteringKey = schema.inflateClusteringKey(cd);
return RowUpdateBuilder.deleteRow(metadata,
clock.rts(lts),
serializePartitionKey(store, partitionKey),
clusteringKey);
}
public Mutation deleteSlice(long lts, long pd, long opId)
{
return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE));
}
public Mutation deleteRange(long lts, long pd, long opId)
{
return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE));
}
Mutation delete(long lts, long pd, Query query)
{
Object[] partitionKey = schema.inflatePartitionKey(pd);
WhereClause.Builder builder = new WhereClause.Builder();
List<ColumnIdentifier> variableNames = new ArrayList<>();
List<ByteBuffer> values = new ArrayList<>();
for (int i = 0; i < partitionKey.length; i++)
{
String name = schema.partitionKeys.get(i).name;
ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(name));
variableNames.add(columnDef.name);
values.add(ByteBufferUtil.objectToBytes(partitionKey[i]));
builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, true),
toOperator(Relation.RelationKind.EQ),
new AbstractMarker.Raw(values.size() - 1)));
}
for (Relation relation : query.relations)
{
String name = relation.column();
ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(relation.column()));
variableNames.add(columnDef.name);
values.add(ByteBufferUtil.objectToBytes(relation.value()));
builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, false),
toOperator(relation.kind),
new AbstractMarker.Raw(values.size() - 1)));
}
StatementRestrictions restrictions = new StatementRestrictions(null,
StatementType.DELETE,
metadata,
builder.build(),
new VariableSpecifications(variableNames),
false,
false,
false,
false);
QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.QUORUM, values);
SortedSet<ClusteringBound<?>> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
SortedSet<ClusteringBound<?>> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
Slices slices = DeleteStatement.toSlices(metadata, startBounds, endBounds);
assert slices.size() == 1;
int deletionTime = FBUtilities.nowInSeconds();
long rts = clock.rts(lts);
return new RowUpdateBuilder(metadata,
deletionTime,
rts,
metadata.params.defaultTimeToLive,
serializePartitionKey(store, partitionKey))
.noRowMarker()
.addRangeTombstone(new RangeTombstone(slices.get(0), new DeletionTime(rts, deletionTime)))
.build();
}
public static Operator toOperator(Relation.RelationKind kind)
{
switch (kind)
{
case LT: return Operator.LT;
case GT: return Operator.GT;
case LTE: return Operator.LTE;
case GTE: return Operator.GTE;
case EQ: return Operator.EQ;
default: throw new IllegalArgumentException("Unsupported " + kind);
}
}
public static DecoratedKey serializePartitionKey(ColumnFamilyStore store, Object... pk)
{
if (pk.length == 1)
return store.getPartitioner().decorateKey(ByteBufferUtil.objectToBytes(pk[0]));
ByteBuffer[] values = new ByteBuffer[pk.length];
for (int i = 0; i < pk.length; i++)
values[i] = ByteBufferUtil.objectToBytes(pk[i]);
return store.getPartitioner().decorateKey(CompositeType.build(ByteBufferAccessor.instance, values));
}
}