blob: 60ebf47781ffa707400d2f65e98fc1590efd2dfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.paxos;
import java.util.Iterator;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.simulator.systems.NonInterceptible;
import org.apache.cassandra.simulator.systems.NonInterceptible.Permit;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.TimeUUID;
import static java.lang.Long.max;
import static java.util.Arrays.stream;
import static org.apache.cassandra.db.SystemKeyspace.loadPaxosState;
import static org.apache.cassandra.service.paxos.Commit.latest;
import static org.apache.cassandra.service.paxos.PaxosState.unsafeGetIfPresent;
import static org.apache.cassandra.simulator.systems.NonInterceptible.Permit.OPTIONAL;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
public class Ballots
{
private static final ColumnMetadata PROMISE = paxosUUIDColumn("in_progress_ballot");
private static final ColumnMetadata PROPOSAL = paxosUUIDColumn("proposal_ballot");
private static final ColumnMetadata COMMIT = paxosUUIDColumn("most_recent_commit_at");
@Shared(scope = SIMULATION)
public static class LatestBallots
{
public final long promise;
public final long accept; // the ballot actually accepted
public final long acceptOf; // the original ballot (i.e. if a reproposal accept != acceptOf)
public final long commit;
public final long persisted;
public LatestBallots(long promise, long accept, long acceptOf, long commit, long persisted)
{
this.promise = promise;
this.accept = accept;
this.acceptOf = acceptOf;
this.commit = commit;
this.persisted = persisted;
}
public long any()
{
return max(max(max(promise, accept), commit), persisted);
}
public long permanent()
{
return max(commit, persisted);
}
public String toString()
{
return "[" + promise + ',' + accept + ',' + commit + ',' + persisted + ']';
}
}
public static LatestBallots read(Permit permit, DecoratedKey key, TableMetadata metadata, int nowInSec, boolean includeEmptyProposals)
{
return NonInterceptible.apply(permit, () -> {
PaxosState.Snapshot state = unsafeGetIfPresent(key, metadata);
PaxosState.Snapshot persisted = loadPaxosState(key, metadata, nowInSec);
TimeUUID promised = latest(persisted.promised, state == null ? null : state.promised);
Commit.Accepted accepted = latest(persisted.accepted, state == null ? null : state.accepted);
Commit.Committed committed = latest(persisted.committed, state == null ? null : state.committed);
long baseTable = latestBallotFromBaseTable(key, metadata);
return new LatestBallots(
promised.unixMicros(),
accepted == null || accepted.update.isEmpty() ? 0L : accepted.ballot.unixMicros(),
accepted == null || accepted.update.isEmpty() ? 0L : accepted.update.stats().minTimestamp,
latestBallot(committed.update.iterator()),
baseTable
);
});
}
static LatestBallots[][] read(Permit permit, Cluster cluster, String keyspace, String table, int[] primaryKeys, int[][] replicasForKeys, boolean includeEmptyProposals)
{
return NonInterceptible.apply(permit, () -> {
LatestBallots[][] result = new LatestBallots[primaryKeys.length][];
for (int i = 0 ; i < primaryKeys.length ; ++i)
{
int primaryKey = primaryKeys[i];
result[i] = stream(replicasForKeys[i])
.mapToObj(cluster::get)
.map(node -> node.unsafeApplyOnThisThread((p, ks, tbl, pk, ie) -> {
TableMetadata metadata = Keyspace.open(ks).getColumnFamilyStore(tbl).metadata.get();
DecoratedKey key = metadata.partitioner.decorateKey(Int32Type.instance.decompose(pk));
return read(p, key, metadata, FBUtilities.nowInSeconds(), ie);
}, permit, keyspace, table, primaryKey, includeEmptyProposals))
.toArray(LatestBallots[]::new);
}
return result;
});
}
public static String paxosDebugInfo(DecoratedKey key, TableMetadata metadata, int nowInSec)
{
return NonInterceptible.apply(OPTIONAL, () -> {
PaxosState.Snapshot state = unsafeGetIfPresent(key, metadata);
PaxosState.Snapshot persisted = loadPaxosState(key, metadata, nowInSec);
long[] memtable = latestBallotsFromPaxosMemtable(key, metadata);
PaxosState.Snapshot cache = state == null ? persisted : state;
long baseTable = latestBallotFromBaseTable(key, metadata);
long baseMemtable = latestBallotFromBaseMemtable(key, metadata);
return debugBallot(cache.promised, memtable[0], persisted.promised) + ", "
+ debugBallot(cache.accepted, memtable[1], persisted.accepted) + ", "
+ debugBallot(cache.committed, memtable[2], persisted.committed) + ", "
+ debugBallot(baseMemtable, 0L, baseTable);
});
}
private static ColumnMetadata paxosUUIDColumn(String name)
{
return ColumnMetadata.regularColumn(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PAXOS, name, TimeUUIDType.instance);
}
/**
* Load the current paxos state for the table and key
*/
private static long[] latestBallotsFromPaxosMemtable(DecoratedKey key, TableMetadata metadata)
{
ColumnFamilyStore paxos = Keyspace.open("system").getColumnFamilyStore("paxos");
long[] result = new long[3];
List<Memtable> memtables = ImmutableList.copyOf(paxos.getTracker().getView().getAllMemtables());
for (Memtable memtable : memtables)
{
Row row = getRow(key, metadata, paxos, memtable);
if (row == null)
continue;
Cell promise = row.getCell(PROMISE);
if (promise != null && promise.value() != null)
result[0] = promise.timestamp();
Cell proposal = row.getCell(PROPOSAL);
if (proposal != null && proposal.value() != null)
result[1] = proposal.timestamp();
Cell commit = row.getCell(COMMIT);
if (commit != null && commit.value() != null)
result[2] = commit.timestamp();
}
return result;
}
private static Row getRow(DecoratedKey key, TableMetadata metadata, ColumnFamilyStore paxos, Memtable memtable)
{
final ClusteringComparator comparator = paxos.metadata.get().comparator;
UnfilteredRowIterator iter = memtable.rowIterator(key, Slices.with(comparator, Slice.make(comparator.make(metadata.id))), ColumnFilter.NONE, false, SSTableReadsListener.NOOP_LISTENER);
if (iter == null || !iter.hasNext())
return null;
return (Row) iter.next();
}
public static long latestBallotFromBaseTable(DecoratedKey key, TableMetadata metadata)
{
SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(metadata, 0, key, Slice.ALL);
try (ReadExecutionController controller = cmd.executionController(); UnfilteredPartitionIterator partitions = cmd.executeLocally(controller))
{
if (!partitions.hasNext())
return 0L;
try (UnfilteredRowIterator rows = partitions.next())
{
return latestBallot(rows);
}
}
}
private static long latestBallotFromBaseMemtable(DecoratedKey key, TableMetadata metadata)
{
ColumnFamilyStore table = Keyspace.openAndGetStore(metadata);
long timestamp = 0;
List<Memtable> memtables = ImmutableList.copyOf(table.getTracker().getView().getAllMemtables());
for (Memtable memtable : memtables)
{
try (UnfilteredRowIterator partition = memtable.rowIterator(key))
{
if (partition == null)
continue;
timestamp = max(timestamp, latestBallot(partition));
}
}
return timestamp;
}
private static long latestBallot(Iterator<? extends Unfiltered> partition)
{
long timestamp = 0L;
while (partition.hasNext())
{
Unfiltered unfiltered = partition.next();
if (!unfiltered.isRow())
continue;
timestamp = ((Row) unfiltered).accumulate((cd, v) -> max(v, cd.maxTimestamp()), timestamp);
}
return timestamp;
}
private static String debugBallot(Commit cache, long memtable, Commit persisted)
{
return debugBallot(cache == null ? null : cache.ballot, memtable, persisted == null ? null : persisted.ballot);
}
private static String debugBallot(TimeUUID cache, long memtable, TimeUUID persisted)
{
return debugBallot(timestamp(cache), memtable, timestamp(persisted));
}
private static String debugBallot(long cache, long memtable, long persisted)
{
return debugBallotVsMemtable(cache, memtable)
+ (cache == persisted ? "" : '(' + debugBallotVsMemtable(persisted, memtable) + ')');
}
private static String debugBallotVsMemtable(long value, long memtable)
{
return value + (memtable == value && memtable != 0 ? "*" : "");
}
private static long timestamp(TimeUUID a)
{
return a == null ? 0L : a.unixMicros();
}
}