blob: bf5f90e55fa4606cc597395b2f61dc35c32edea8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.exceptions.ExceptionCode;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InOurDc;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaLayout.ForTokenWrite;
import org.apache.cassandra.locator.ReplicaPlan.ForRead;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.IsBootstrappingException;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestFailureException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.ClientRequestMetrics;
import org.apache.cassandra.service.CASRequest;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
import org.apache.cassandra.service.paxos.Commit.Proposal;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.CollectionSerializer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
import org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
import org.apache.cassandra.utils.NoSpamLogger;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.config.Config.PaxosVariant.v2_without_linearizable_reads_or_rejected_writes;
import static org.apache.cassandra.db.Keyspace.openAndGetStore;
import static org.apache.cassandra.exceptions.RequestFailureReason.TIMEOUT;
import static org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION;
import static org.apache.cassandra.config.DatabaseDescriptor.*;
import static org.apache.cassandra.db.ConsistencyLevel.*;
import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
import static org.apache.cassandra.locator.ReplicaLayout.forTokenWriteLiveAndDown;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetricsMap;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsMap;
import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL;
import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
import static org.apache.cassandra.service.paxos.BallotGenerator.Global.staleBallot;
import static org.apache.cassandra.service.paxos.ContentionStrategy.*;
import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.READ;
import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.WRITE;
import static org.apache.cassandra.service.paxos.PaxosCommit.commit;
import static org.apache.cassandra.service.paxos.PaxosCommitAndPrepare.commitAndPrepare;
import static org.apache.cassandra.service.paxos.PaxosPrepare.prepare;
import static org.apache.cassandra.service.paxos.PaxosPropose.propose;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.CollectionSerializer.newHashSet;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
import static org.apache.cassandra.utils.NoSpamLogger.Level.WARN;
/**
* <p>This class serves as an entry-point to Cassandra's implementation of Paxos Consensus.
* Note that Cassandra does not utilise the distinguished proposer (Multi Paxos) optimisation;
* each operation executes its own instance of Paxos Consensus. Instead Cassandra employs
* various optimisations to reduce the overhead of operations. This may lead to higher throughput
* and lower overhead read operations, at the expense of contention during mixed or write-heavy workloads.
*
* Firstly, note that we do not follow Lamport's formulation, instead following the more common approach in
* literature (see e.g. Dr. Heidi Howard's dissertation) of permitting any acceptor to vote on a proposal,
* not only those who issued a promise.
*
* <h2>No Commit of Empty Proposals</h2>
* <p>If a proposal is empty, there can be no effect to the state, so once this empty proposal has poisoned any earlier
* proposal it is safe to stop processing. An empty proposal effectively scrubs the instance of consensus being
* performed once it has reached a quorum, as no earlier incomplete proposal (that may perhaps have reached a minority)
* may now be completed.
*
* <h2>Fast Read / Failed Write</h2>
* <p>This optimisation relies on every voter having no incomplete promises, i.e. their commit register must be greater
* than or equal to their promise and proposal registers (or there must be such an empty proposal).
* Since the operation we are performing must invalidate any nascent operation that has reached a minority, and will
* itself be invalidated by any newer write it might race with, we are only concerned about operations that might be
* in-flight and incomplete. If we reach a quorum without any incomplete proposal, we prevent any incomplete proposal
* that might have come before us from being committed, and so are correctly ordered.
*
* <p>NOTE: we could likely weaken this further, permitting a fast operation if we witness a stale incomplete operation
* on one or more of the replicas, so long as we witness _some_ response that had knowledge of that operation's decision,
* however we might waste more time performing optimistic reads (which we skip if we witness any in progress promise)
*
* <h2>Read Commutativity Optimisation</h2>
* <p>We separate read and write promises into distinct registers. Since reads are commutative they do not need to be
* ordered with respect to each other, so read promises consult only the write promise register to find competing
* operations, whereas writes consult both read and write registers. This permits better utilisation of the Fast Read
* optimisation, permitting arbitrarily many fast reads to execute concurrently.
*
* <p>A read will use its promise to finish any in progress write it encounters, but note that this is safe for multiple
* reads to attempt simultaneously. If a write operation has not reached a quorum of promises then it has no effect,
* so while some read operations may attempt to complete it and others may not, the operation will only be invalidated
* and these actions will be equivalent. If the write had reached a quorum of promises then every reads will attempt
* to complete the write. At the accept phase, only the most recent read promise will be accepted so whether the write
* proposal had reached a quorum or not, a consistent outcome will result.
*
* <h2>Reproposal Avoidance</h2>
* <p>It can occur that two (or more) commands begin competing to re-propose the same incomplete command even after it
* has already committed - this can occur when an in progress command that has reached the commit condition (but not yet
* committed) is encountered by a promise, so that it is re-proposed. If the original coordinator does not fail this
* original command will be committed normally, but the re-proposal can take on a life of its own, and become contended
* and re-proposed indefinitely. By having reproposals use the original proposal ballot's timestamp we spot this situation
* and consider re-proposals of a command we have seen committed to be (in effect) empty proposals.
*
* <h2>Durability of Asynchronous Commit</h2>
* To permit asynchronous commit (and also because we should) we ensure commits are durable once a proposal has been
* accepted by a majority.
*
* Replicas track commands that have *locally* been witnessed but not committed. They may clear this log by performing
* a round of Paxos Repair for each key in the log (which is simply a round of Paxos that tries not to interfere with
* future rounds of Paxos, while aiming to complete any earlier incomplete round).
*
* By selecting some quorum of replicas for a range to perform this operation on, once successful we guarantee that
* any transaction that had previously been accepted by a majority has been committed, and any transaction that had been
* previously witnessed by a majority has been either committed or invalidated.
*
* To ensure durability across range movements, once a joining node becomes pending such a coordinated paxos repair
* is performed prior to performing bootstrap, so that commands initiated before joining will either be bootstrapped
* or completed by paxos repair to be committed to a majority that includes the new node in its calculations, and
* commands initiated after will anyway do so due to being pending.
*
* Finally, for greater guarantees across range movements despite the uncertainty of gossip, paxos operations validate
* ring information with each other while seeking a quorum of promises. Any inconsistency is resolved by synchronising
* gossip state between the coordinator and the peers in question.
*
* <h2>Clearing of Paxos State</h2>
* Coordinated paxos repairs as described above are preceded by an preparation step that determines a ballot below
* which we agree to reject new promises. By deciding and disseminating this point prior to performing a coordinated
* paxos repair, once complete we have ensured that all commands with a lower ballot are either committed or invalidated,
* and so we are then able to disseminate this ballot as a bound below which may expunge all data for the range.
*
* For consistency of execution coordinators seek this latter ballot bound from each replica and, using the maximum of
* these, ignore all data received associated with ballots lower than this bound.
*/
public class Paxos
{
private static final Logger logger = LoggerFactory.getLogger(Paxos.class);
private static volatile Config.PaxosVariant PAXOS_VARIANT = DatabaseDescriptor.getPaxosVariant();
private static final CassandraVersion MODERN_PAXOS_RELEASE = new CassandraVersion(System.getProperty("cassandra.paxos.modern_release", "4.1"));
static final boolean LOG_TTL_LINEARIZABILITY_VIOLATIONS = Boolean.parseBoolean(System.getProperty("cassandra.paxos.log_ttl_linearizability_violations", "true"));
static class Electorate implements Iterable<InetAddressAndPort>
{
static final Serializer serializer = new Serializer();
// all replicas, including pending, but without those in a remote DC if consistency is local
final Collection<InetAddressAndPort> natural;
// pending subset of electorate
final Collection<InetAddressAndPort> pending;
public Electorate(Collection<InetAddressAndPort> natural, Collection<InetAddressAndPort> pending)
{
this.natural = natural;
this.pending = pending;
}
public int size()
{
return natural.size() + pending.size();
}
@Override
public Iterator<InetAddressAndPort> iterator()
{
return Iterators.concat(natural.iterator(), pending.iterator());
}
static Electorate get(TableMetadata table, DecoratedKey key, ConsistencyLevel consistency)
{
return get(consistency, forTokenWriteLiveAndDown(Keyspace.open(table.keyspace), key.getToken()));
}
static Electorate get(ConsistencyLevel consistency, ForTokenWrite all)
{
ForTokenWrite electorate = all;
if (consistency == LOCAL_SERIAL)
electorate = all.filter(InOurDc.replicas());
return new Electorate(electorate.natural().endpointList(), electorate.pending().endpointList());
}
boolean hasPending()
{
return !pending.isEmpty();
}
boolean isPending(InetAddressAndPort endpoint)
{
return hasPending() && pending.contains(endpoint);
}
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Electorate that = (Electorate) o;
return natural.equals(that.natural) && pending.equals(that.pending);
}
public int hashCode()
{
return Objects.hash(natural, pending);
}
public String toString()
{
return "{" + natural + ", " + pending + '}';
}
static class Serializer implements IVersionedSerializer<Electorate>
{
public void serialize(Electorate electorate, DataOutputPlus out, int version) throws IOException
{
CollectionSerializer.serializeCollection(inetAddressAndPortSerializer, electorate.natural, out, version);
CollectionSerializer.serializeCollection(inetAddressAndPortSerializer, electorate.pending, out, version);
}
public Electorate deserialize(DataInputPlus in, int version) throws IOException
{
Set<InetAddressAndPort> endpoints = CollectionSerializer.deserializeCollection(inetAddressAndPortSerializer, newHashSet(), in, version);
Set<InetAddressAndPort> pending = CollectionSerializer.deserializeCollection(inetAddressAndPortSerializer, newHashSet(), in, version);
return new Electorate(endpoints, pending);
}
public long serializedSize(Electorate electorate, int version)
{
return CollectionSerializer.serializedSizeCollection(inetAddressAndPortSerializer, electorate.natural, version) +
CollectionSerializer.serializedSizeCollection(inetAddressAndPortSerializer, electorate.pending, version);
}
}
}
/**
* Encapsulates the peers we will talk to for this operation.
*/
static class Participants implements ForRead<EndpointsForToken, Participants>, Supplier<Participants>
{
final Keyspace keyspace;
final AbstractReplicationStrategy replicationStrategy;
/**
* SERIAL or LOCAL_SERIAL
*/
final ConsistencyLevel consistencyForConsensus;
/**
* Those members that vote for {@link #consistencyForConsensus}
*/
final Electorate electorate;
/**
* Those members of {@link #electorate} that we will 'poll' for their vote
* i.e. {@link #electorate} with down nodes removed
*/
private final EndpointsForToken electorateNatural;
final EndpointsForToken electorateLive;
final EndpointsForToken all;
final EndpointsForToken allLive;
final EndpointsForToken allDown;
final EndpointsForToken pending;
/**
* The number of responses we require to reach desired consistency from members of {@code contact}
*/
final int sizeOfConsensusQuorum;
/**
* The number of read responses we require to reach desired consistency from members of {@code contact}
* Note that this should always be met if {@link #sizeOfConsensusQuorum} is met, but we supply it separately
* for corroboration.
*/
final int sizeOfReadQuorum;
Participants(Keyspace keyspace, ConsistencyLevel consistencyForConsensus, ReplicaLayout.ForTokenWrite all, ReplicaLayout.ForTokenWrite electorate, EndpointsForToken live)
{
this.keyspace = keyspace;
this.replicationStrategy = all.replicationStrategy();
this.consistencyForConsensus = consistencyForConsensus;
this.all = all.all();
this.pending = all.pending();
this.allDown = all.all() == live ? EndpointsForToken.empty(all.token()) : all.all().without(live.endpoints());
this.electorate = new Electorate(electorate.natural().endpointList(), electorate.pending().endpointList());
this.electorateNatural = electorate.natural();
this.electorateLive = electorate.all() == live ? live : electorate.all().keep(live.endpoints());
this.allLive = live;
this.sizeOfReadQuorum = electorate.natural().size() / 2 + 1;
this.sizeOfConsensusQuorum = sizeOfReadQuorum + electorate.pending().size();
}
@Override
public int readQuorum()
{
return sizeOfReadQuorum;
}
@Override
public EndpointsForToken readCandidates()
{
// Note: we could probably return electorateLive here and save a reference, but it's not strictly correct
return electorateNatural;
}
static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus)
{
Keyspace keyspace = Keyspace.open(table.keyspace);
ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token);
ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal()
? all.filter(InOurDc.replicas()) : all;
EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive);
return new Participants(keyspace, consistencyForConsensus, all, electorate, live);
}
static Participants get(TableMetadata cfm, DecoratedKey key, ConsistencyLevel consistency)
{
return get(cfm, key.getToken(), consistency);
}
int sizeOfPoll()
{
return electorateLive.size();
}
InetAddressAndPort voter(int i)
{
return electorateLive.endpoint(i);
}
void assureSufficientLiveNodes(boolean isWrite) throws UnavailableException
{
if (sizeOfConsensusQuorum > sizeOfPoll())
{
mark(isWrite, m -> m.unavailables, consistencyForConsensus);
throw new UnavailableException("Cannot achieve consistency level " + consistencyForConsensus, consistencyForConsensus, sizeOfConsensusQuorum, sizeOfPoll());
}
}
void assureSufficientLiveNodesForRepair() throws UnavailableException
{
if (sizeOfConsensusQuorum > sizeOfPoll())
{
throw UnavailableException.create(consistencyForConsensus, sizeOfConsensusQuorum, sizeOfPoll());
}
}
int requiredFor(ConsistencyLevel consistency)
{
if (consistency == Paxos.nonSerial(consistencyForConsensus))
return sizeOfConsensusQuorum;
return consistency.blockForWrite(replicationStrategy(), pending);
}
public boolean hasOldParticipants()
{
return electorateLive.anyMatch(Paxos::isOldParticipant);
}
@Override
public Participants get()
{
return this;
}
@Override
public Keyspace keyspace()
{
return keyspace;
}
@Override
public AbstractReplicationStrategy replicationStrategy()
{
return replicationStrategy;
}
@Override
public ConsistencyLevel consistencyLevel()
{
return nonSerial(consistencyForConsensus);
}
@Override
public EndpointsForToken contacts()
{
return electorateLive;
}
@Override
public Replica lookup(InetAddressAndPort endpoint)
{
return all.lookup(endpoint);
}
@Override
public Participants withContacts(EndpointsForToken newContacts)
{
throw new UnsupportedOperationException();
}
}
/**
* Encapsulates information about a failure to reach Success, either because of explicit failure responses
* or insufficient responses (in which case the status is not final)
*/
static class MaybeFailure
{
final boolean isFailure;
final String serverError;
final int contacted;
final int required;
final int successes;
final Map<InetAddressAndPort, RequestFailureReason> failures;
static MaybeFailure noResponses(Participants contacted)
{
return new MaybeFailure(false, contacted.sizeOfPoll(), contacted.sizeOfConsensusQuorum, 0, emptyMap());
}
MaybeFailure(Participants contacted, int successes, AsMap failures)
{
this(contacted.sizeOfPoll() - failures.failureCount() < contacted.sizeOfConsensusQuorum, contacted.sizeOfPoll(), contacted.sizeOfConsensusQuorum, successes, failures);
}
MaybeFailure(int contacted, int required, int successes, AsMap failures)
{
this(contacted - failures.failureCount() < required, contacted, required, successes, failures);
}
MaybeFailure(boolean isFailure, int contacted, int required, int successes, Map<InetAddressAndPort, RequestFailureReason> failures)
{
this(isFailure, null, contacted, required, successes, failures);
}
MaybeFailure(boolean isFailure, String serverError, int contacted, int required, int successes, Map<InetAddressAndPort, RequestFailureReason> failures)
{
this.isFailure = isFailure;
this.serverError = serverError;
this.contacted = contacted;
this.required = required;
this.successes = successes;
this.failures = failures;
}
private static int failureCount(Map<InetAddressAndPort, RequestFailureReason> failures)
{
int count = 0;
for (RequestFailureReason reason : failures.values())
count += reason != TIMEOUT ? 1 : 0;
return count;
}
/**
* update relevant counters and throw the relevant exception
*/
RequestExecutionException markAndThrowAsTimeoutOrFailure(boolean isWrite, ConsistencyLevel consistency, int failedAttemptsDueToContention)
{
if (isFailure)
{
mark(isWrite, m -> m.failures, consistency);
throw serverError != null ? new RequestFailureException(ExceptionCode.SERVER_ERROR, serverError, consistency, successes, required, failures)
: isWrite
? new WriteFailureException(consistency, successes, required, WriteType.CAS, failures)
: new ReadFailureException(consistency, successes, required, false, failures);
}
else
{
mark(isWrite, m -> m.timeouts, consistency);
throw isWrite
? new CasWriteTimeoutException(WriteType.CAS, consistency, successes, required, failedAttemptsDueToContention)
: new ReadTimeoutException(consistency, successes, required, false);
}
}
public String toString()
{
return (isFailure ? "Failure(" : "Timeout(") + successes + ',' + failures + ')';
}
}
public interface Async<Result>
{
Result awaitUntil(long until);
}
/**
* Apply @param updates if and only if the current values in the row for @param key
* match the provided @param conditions. The algorithm is "raw" Paxos: that is, Paxos
* minus leader election -- any node in the cluster may propose changes for any partition.
*
* The Paxos electorate consists only of the replicas for the partition key.
* We expect performance to be reasonable, but CAS is still intended to be used
* "when you really need it," not for all your updates.
*
* There are three phases to Paxos:
* 1. Prepare: the coordinator generates a ballot (Ballot in our case) and asks replicas to
* - promise not to accept updates from older ballots and
* - tell us about the latest ballots it has already _promised_, _accepted_, or _committed_
* - reads the necessary data to evaluate our CAS condition
*
* 2. Propose: if a majority of replicas reply, the coordinator asks replicas to accept the value of the
* highest proposal ballot it heard about, or a new value if no in-progress proposals were reported.
* 3. Commit (Learn): if a majority of replicas acknowledge the accept request, we can commit the new
* value.
*
* Commit procedure is not covered in "Paxos Made Simple," and only briefly mentioned in "Paxos Made Live,"
* so here is our approach:
* 3a. The coordinator sends a commit message to all replicas with the ballot and value.
* 3b. Because of 1-2, this will be the highest-seen commit ballot. The replicas will note that,
* and send it with subsequent promise replies. This allows us to discard acceptance records
* for successfully committed replicas, without allowing incomplete proposals to commit erroneously
* later on.
*
* Note that since we are performing a CAS rather than a simple update, when nodes respond positively to
* Prepare, they include read response of commited values that will be reconciled on the coordinator
* and checked against CAS precondition between the prepare and accept phases. This gives us a slightly
* longer window for another coordinator to come along and trump our own promise with a newer one but
* is otherwise safe.
*
* Any successful prepare phase yielding a read that rejects the condition must be followed by the proposal of
* an empty update, to ensure the evaluation of the condition is linearized with respect to other reads and writes.
*
* @param key the row key for the row to CAS
* @param request the conditions for the CAS to apply as well as the update to perform if the conditions hold.
* @param consistencyForConsensus the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL.
* @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL.
*
* @return null if the operation succeeds in updating the row, or the current values corresponding to conditions.
* (since, if the CAS doesn't succeed, it means the current value do not match the conditions).
*/
public static RowIterator cas(DecoratedKey key,
CASRequest request,
ConsistencyLevel consistencyForConsensus,
ConsistencyLevel consistencyForCommit,
ClientState clientState)
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
final long start = nanoTime();
final long proposeDeadline = start + getCasContentionTimeout(NANOSECONDS);
final long commitDeadline = Math.max(proposeDeadline, start + getWriteRpcTimeout(NANOSECONDS));
return cas(key, request, consistencyForConsensus, consistencyForCommit, clientState, start, proposeDeadline, commitDeadline);
}
public static RowIterator cas(DecoratedKey key,
CASRequest request,
ConsistencyLevel consistencyForConsensus,
ConsistencyLevel consistencyForCommit,
ClientState clientState,
long proposeDeadline,
long commitDeadline
)
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
return cas(key, request, consistencyForConsensus, consistencyForCommit, clientState, nanoTime(), proposeDeadline, commitDeadline);
}
private static RowIterator cas(DecoratedKey partitionKey,
CASRequest request,
ConsistencyLevel consistencyForConsensus,
ConsistencyLevel consistencyForCommit,
ClientState clientState,
long start,
long proposeDeadline,
long commitDeadline
)
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
TableMetadata metadata = readCommand.metadata();
consistencyForConsensus.validateForCas();
consistencyForCommit.validateForCasCommit(Keyspace.open(metadata.keyspace).getReplicationStrategy());
Ballot minimumBallot = null;
int failedAttemptsDueToContention = 0;
try (PaxosOperationLock lock = PaxosState.lock(partitionKey, metadata, proposeDeadline, consistencyForConsensus, true))
{
Paxos.Async<PaxosCommit.Status> commit = null;
done: while (true)
{
// read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
BeginResult begin = begin(proposeDeadline, readCommand, consistencyForConsensus,
true, minimumBallot, failedAttemptsDueToContention);
Ballot ballot = begin.ballot;
Participants participants = begin.participants;
failedAttemptsDueToContention = begin.failedAttemptsDueToContention;
FilteredPartition current;
try (RowIterator iter = PartitionIterators.getOnlyElement(begin.readResponse, readCommand))
{
current = FilteredPartition.create(iter);
}
Proposal proposal;
boolean conditionMet = request.appliesTo(current);
if (!conditionMet)
{
if (getPaxosVariant() == v2_without_linearizable_reads_or_rejected_writes)
{
Tracing.trace("CAS precondition rejected", current);
casWriteMetrics.conditionNotMet.inc();
return current.rowIterator();
}
// If we failed to meet our condition, it does not mean we can do nothing: if we do not propose
// anything that is accepted by a quorum, it is possible for our !conditionMet state
// to not be serialized wrt other operations.
// If a later read encounters an "in progress" write that did not reach a majority,
// but that would have permitted conditionMet had it done so (and hence we evidently did not witness),
// that operation will complete the in-progress proposal before continuing, so that this and future
// reads will perceive conditionMet without any intervening modification from the time at which we
// assured a conditional write that !conditionMet.
// So our evaluation is only serialized if we invalidate any in progress operations by proposing an empty update
// See also CASSANDRA-12126
if (begin.isLinearizableRead)
{
Tracing.trace("CAS precondition does not match current values {}; read is already linearizable; aborting", current);
return conditionNotMet(current);
}
Tracing.trace("CAS precondition does not match current values {}; proposing empty update", current);
proposal = Proposal.empty(ballot, partitionKey, metadata);
}
else if (begin.isPromised)
{
// finish the paxos round w/ the desired updates
// TODO "turn null updates into delete?" - what does this TODO even mean?
PartitionUpdate updates = request.makeUpdates(current, clientState, begin.ballot);
// Apply triggers to cas updates. A consideration here is that
// triggers emit Mutations, and so a given trigger implementation
// may generate mutations for partitions other than the one this
// paxos round is scoped for. In this case, TriggerExecutor will
// validate that the generated mutations are targetted at the same
// partition as the initial updates and reject (via an
// InvalidRequestException) any which aren't.
updates = TriggerExecutor.instance.execute(updates);
proposal = Proposal.of(ballot, updates);
Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
}
else
{
// must retry, as only achieved read success in begin
Tracing.trace("CAS precondition is met, but ballot stale for proposal; retrying", current);
continue;
}
PaxosPropose.Status propose = propose(proposal, participants, conditionMet).awaitUntil(proposeDeadline);
switch (propose.outcome)
{
default: throw new IllegalStateException();
case MAYBE_FAILURE:
throw propose.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
case SUCCESS:
{
if (!conditionMet)
return conditionNotMet(current);
// no need to commit a no-op; either it
// 1) reached a majority, in which case it was agreed, had no effect and we can do nothing; or
// 2) did not reach a majority, was not agreed, and was not user visible as a result so we can ignore it
if (!proposal.update.isEmpty())
commit = commit(proposal.agreed(), participants, consistencyForConsensus, consistencyForCommit, true);
break done;
}
case SUPERSEDED:
{
switch (propose.superseded().hadSideEffects)
{
default: throw new IllegalStateException();
case MAYBE:
// We don't know if our update has been applied, as the competing ballot may have completed
// our proposal. We yield our uncertainty to the caller via timeout exception.
// TODO: should return more useful result to client, and should also avoid this situation where possible
throw new MaybeFailure(false, participants.sizeOfPoll(), participants.sizeOfConsensusQuorum, 0, emptyMap())
.markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
case NO:
minimumBallot = propose.superseded().by;
// We have been superseded without our proposal being accepted by anyone, so we can safely retry
Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
if (!waitForContention(proposeDeadline, ++failedAttemptsDueToContention, metadata, partitionKey, consistencyForConsensus, WRITE))
throw MaybeFailure.noResponses(participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
}
}
}
// continue to retry
}
if (commit != null)
{
PaxosCommit.Status result = commit.awaitUntil(commitDeadline);
if (!result.isSuccess())
throw result.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForCommit, failedAttemptsDueToContention);
}
Tracing.trace("CAS successful");
return null;
}
finally
{
final long latency = nanoTime() - start;
if (failedAttemptsDueToContention > 0)
{
casWriteMetrics.contention.update(failedAttemptsDueToContention);
openAndGetStore(metadata).metric.topCasPartitionContention.addSample(partitionKey.getKey(), failedAttemptsDueToContention);
}
casWriteMetrics.addNano(latency);
writeMetricsMap.get(consistencyForConsensus).addNano(latency);
}
}
private static RowIterator conditionNotMet(FilteredPartition read)
{
Tracing.trace("CAS precondition rejected", read);
casWriteMetrics.conditionNotMet.inc();
return read.rowIterator();
}
public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyForConsensus)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
long start = nanoTime();
long deadline = start + DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS);
return read(group, consistencyForConsensus, start, deadline);
}
public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyForConsensus, long deadline)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
return read(group, consistencyForConsensus, nanoTime(), deadline);
}
private static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyForConsensus, long start, long deadline)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
if (group.queries.size() > 1)
throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
int failedAttemptsDueToContention = 0;
Ballot minimumBallot = null;
SinglePartitionReadCommand read = group.queries.get(0);
try (PaxosOperationLock lock = PaxosState.lock(read.partitionKey(), read.metadata(), deadline, consistencyForConsensus, false))
{
while (true)
{
// does the work of applying in-progress writes; throws UAE or timeout if it can't
final BeginResult begin = begin(deadline, read, consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention);
failedAttemptsDueToContention = begin.failedAttemptsDueToContention;
switch (PAXOS_VARIANT)
{
default: throw new AssertionError();
case v2_without_linearizable_reads_or_rejected_writes:
case v2_without_linearizable_reads:
return begin.readResponse;
case v2:
// no need to submit an empty proposal, as the promise will be treated as complete for future optimistic reads
if (begin.isLinearizableRead)
return begin.readResponse;
}
Proposal proposal = Proposal.empty(begin.ballot, read.partitionKey(), read.metadata());
PaxosPropose.Status propose = propose(proposal, begin.participants, false).awaitUntil(deadline);
switch (propose.outcome)
{
default: throw new IllegalStateException();
case MAYBE_FAILURE:
throw propose.maybeFailure().markAndThrowAsTimeoutOrFailure(false, consistencyForConsensus, failedAttemptsDueToContention);
case SUCCESS:
return begin.readResponse;
case SUPERSEDED:
switch (propose.superseded().hadSideEffects)
{
default: throw new IllegalStateException();
case MAYBE:
// We don't know if our update has been applied, as the competing ballot may have completed
// our proposal. We yield our uncertainty to the caller via timeout exception.
// TODO: should return more useful result to client, and should also avoid this situation where possible
throw new MaybeFailure(false, begin.participants.sizeOfPoll(), begin.participants.sizeOfConsensusQuorum, 0, emptyMap())
.markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
case NO:
minimumBallot = propose.superseded().by;
// We have been superseded without our proposal being accepted by anyone, so we can safely retry
Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
if (!waitForContention(deadline, ++failedAttemptsDueToContention, group.metadata(), group.queries.get(0).partitionKey(), consistencyForConsensus, READ))
throw MaybeFailure.noResponses(begin.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
}
}
}
}
finally
{
long latency = nanoTime() - start;
readMetrics.addNano(latency);
casReadMetrics.addNano(latency);
readMetricsMap.get(consistencyForConsensus).addNano(latency);
TableMetadata table = read.metadata();
Keyspace.open(table.keyspace).getColumnFamilyStore(table.name).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
if (failedAttemptsDueToContention > 0)
casReadMetrics.contention.update(failedAttemptsDueToContention);
}
}
static class BeginResult
{
final Ballot ballot;
final Participants participants;
final int failedAttemptsDueToContention;
final PartitionIterator readResponse;
final boolean isLinearizableRead;
final boolean isPromised;
final Ballot retryWithAtLeast;
public BeginResult(Ballot ballot, Participants participants, int failedAttemptsDueToContention, PartitionIterator readResponse, boolean isLinearizableRead, boolean isPromised, Ballot retryWithAtLeast)
{
assert isPromised || isLinearizableRead;
this.ballot = ballot;
this.participants = participants;
this.failedAttemptsDueToContention = failedAttemptsDueToContention;
this.readResponse = readResponse;
this.isLinearizableRead = isLinearizableRead;
this.isPromised = isPromised;
this.retryWithAtLeast = retryWithAtLeast;
}
}
/**
* Begin a Paxos operation by seeking promises from our electorate to be completed with proposals by our caller; and:
*
* - Completing any in-progress proposals witnessed, that are not known to have reached the commit phase
* - Completing any in-progress commits witnessed, that are not known to have reached a quorum of the electorate
* - Retrying and backing-off under contention
* - Detecting electorate mismatches with our peers and retrying to avoid non-overlapping
* electorates agreeing operations
* - Returning a resolved read response, and knowledge of if it is linearizable to read without proposing an empty update
*
* Optimisations:
* - If the promises report an incomplete commit (but have been able to witness it in a read response)
* we will submit the commit to those nodes that have not witnessed while waiting for those that have,
* returning as soon as a quorum is known to have witnessed the commit
* - If we witness an in-progress commit to complete, we batch the commit together with a new prepare
* restarting our operation.
* - If we witness an in-progress proposal to complete, after successfully proposing it we batch its
* commit together with a new prepare restarting our operation.
*
* @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
* nodes have seen the mostRecentCommit. Otherwise, return null.
*/
@SuppressWarnings("resource")
private static BeginResult begin(long deadline,
SinglePartitionReadCommand query,
ConsistencyLevel consistencyForConsensus,
final boolean isWrite,
Ballot minimumBallot,
int failedAttemptsDueToContention)
throws WriteTimeoutException, WriteFailureException, ReadTimeoutException, ReadFailureException
{
boolean acceptEarlyReadPermission = !isWrite; // if we're reading, begin by assuming a read permission is sufficient
Participants initialParticipants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
initialParticipants.assureSufficientLiveNodes(isWrite);
PaxosPrepare preparing = prepare(minimumBallot, initialParticipants, query, isWrite, acceptEarlyReadPermission);
while (true)
{
// prepare
PaxosPrepare retry = null;
PaxosPrepare.Status prepare = preparing.awaitUntil(deadline);
boolean isPromised = false;
retry: switch (prepare.outcome)
{
default: throw new IllegalStateException();
case FOUND_INCOMPLETE_COMMITTED:
{
FoundIncompleteCommitted incomplete = prepare.incompleteCommitted();
Tracing.trace("Repairing replicas that missed the most recent commit");
retry = commitAndPrepare(incomplete.committed, incomplete.participants, query, isWrite, acceptEarlyReadPermission);
break;
}
case FOUND_INCOMPLETE_ACCEPTED:
{
FoundIncompleteAccepted inProgress = prepare.incompleteAccepted();
Tracing.trace("Finishing incomplete paxos round {}", inProgress.accepted);
if (isWrite)
casWriteMetrics.unfinishedCommit.inc();
else
casReadMetrics.unfinishedCommit.inc();
// we DO NOT need to change the timestamp of this commit - either we or somebody else will finish it
// and the original timestamp is correctly linearised. By not updatinig the timestamp we leave enough
// information for nodes to avoid competing re-proposing the same proposal; if an in progress accept
// is equal to the latest commit (even if the ballots aren't) we're done and can abort earlier,
// and in fact it's possible for a CAS to sometimes determine if side effects occurred by reading
// the underlying data and not witnessing the timestamp of its ballot (or any newer for the relevant data).
Proposal repropose = new Proposal(inProgress.ballot, inProgress.accepted.update);
PaxosPropose.Status proposeResult = propose(repropose, inProgress.participants, false).awaitUntil(deadline);
switch (proposeResult.outcome)
{
default: throw new IllegalStateException();
case MAYBE_FAILURE:
throw proposeResult.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);
case SUCCESS:
retry = commitAndPrepare(repropose.agreed(), inProgress.participants, query, isWrite, acceptEarlyReadPermission);
break retry;
case SUPERSEDED:
// since we are proposing a previous value that was maybe superseded by us before completion
// we don't need to test the side effects, as we just want to start again, and fall through
// to the superseded section below
prepare = new PaxosPrepare.Superseded(proposeResult.superseded().by, inProgress.participants);
}
}
case SUPERSEDED:
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
// sleep a random amount to give the other proposer a chance to finish
if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
break;
}
case PROMISED: isPromised = true;
case READ_PERMITTED:
{
// We have received a quorum of promises (or read permissions) that have all witnessed the commit of the prior paxos
// round's proposal (if any).
PaxosPrepare.Success success = prepare.success();
DataResolver<?, ?> resolver = new DataResolver(query, success.participants, NoopReadRepair.instance, query.creationTimeNanos());
for (int i = 0 ; i < success.responses.size() ; ++i)
resolver.preprocess(success.responses.get(i));
class WasRun implements Runnable { boolean v; public void run() { v = true; } }
WasRun hadShortRead = new WasRun();
PartitionIterator result = resolver.resolve(hadShortRead);
if (!isPromised && hadShortRead.v)
{
// we need to propose an empty update to linearize our short read, but only had read success
// since we may continue to perform short reads, we ask our prepare not to accept an early
// read permission, when a promise may yet be obtained
// TODO: increase read size each time this happens?
acceptEarlyReadPermission = false;
break;
}
return new BeginResult(success.ballot, success.participants, failedAttemptsDueToContention, result, !hadShortRead.v && success.isReadSafe, isPromised, success.supersededBy);
}
case MAYBE_FAILURE:
throw prepare.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);
case ELECTORATE_MISMATCH:
Participants participants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
participants.assureSufficientLiveNodes(isWrite);
retry = prepare(participants, query, isWrite, acceptEarlyReadPermission);
break;
}
if (retry == null)
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; retrying");
// sleep a random amount to give the other proposer a chance to finish
if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
}
preparing = retry;
}
}
public static boolean isInRangeAndShouldProcess(InetAddressAndPort from, DecoratedKey key, TableMetadata table, boolean includesRead)
{
Keyspace keyspace = Keyspace.open(table.keyspace);
return (includesRead ? EndpointsForToken.natural(keyspace, key.getToken())
: ReplicaLayout.forTokenWriteLiveAndDown(keyspace, key.getToken()).all()
).contains(getBroadcastAddressAndPort());
}
static ConsistencyLevel nonSerial(ConsistencyLevel serial)
{
switch (serial)
{
default: throw new IllegalStateException();
case SERIAL: return QUORUM;
case LOCAL_SERIAL: return LOCAL_QUORUM;
}
}
private static void mark(boolean isWrite, Function<ClientRequestMetrics, Meter> toMark, ConsistencyLevel consistency)
{
if (isWrite)
{
toMark.apply(casWriteMetrics).mark();
toMark.apply(writeMetricsMap.get(consistency)).mark();
}
else
{
toMark.apply(casReadMetrics).mark();
toMark.apply(readMetricsMap.get(consistency)).mark();
}
}
public static Ballot newBallot(@Nullable Ballot minimumBallot, ConsistencyLevel consistency)
{
// We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected
// already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known
// in progress (#5667). Lastly, we don't want to use a timestamp that is older than the last one assigned by ClientState or operations may appear
// out-of-order (#7801).
long minTimestampMicros = minimumBallot == null ? Long.MIN_VALUE : 1 + minimumBallot.unixMicros();
// Note that ballotMicros is not guaranteed to be unique if two proposal are being handled concurrently by the same coordinator. But we still
// need ballots to be unique for each proposal so we have to use getRandomTimeUUIDFromMicros.
return nextBallot(minTimestampMicros, flag(consistency));
}
static Ballot staleBallotNewerThan(Ballot than, ConsistencyLevel consistency)
{
long minTimestampMicros = 1 + than.unixMicros();
long maxTimestampMicros = BallotGenerator.Global.prevUnixMicros();
maxTimestampMicros -= Math.min((maxTimestampMicros - minTimestampMicros) / 2, SECONDS.toMicros(5L));
if (maxTimestampMicros <= minTimestampMicros)
return nextBallot(minTimestampMicros, flag(consistency));
return staleBallot(minTimestampMicros, maxTimestampMicros, flag(consistency));
}
/**
* Create a ballot uuid with the consistency level encoded in the timestamp.
*
* UUIDGen.getRandomTimeUUIDFromMicros timestamps are always a multiple of 10, so we add a 1 or 2 to indicate
* the consistency level of the operation. This should have no effect in practice (except preferring a serial
* operation over a local serial if there's a timestamp collision), but lets us avoid adding CL to the paxos
* table and messages, which should make backcompat easier if a different solution is committed upstream.
*/
public static Ballot ballotForConsistency(long whenInMicros, ConsistencyLevel consistency)
{
Preconditions.checkArgument(consistency.isSerialConsistency());
return nextBallot(whenInMicros, flag(consistency));
}
private static Ballot.Flag flag(ConsistencyLevel consistency)
{
return consistency == SERIAL ? GLOBAL : LOCAL;
}
public static ConsistencyLevel consistency(Ballot ballot)
{
switch (ballot.flag())
{
default: return null;
case LOCAL: return LOCAL_SERIAL;
case GLOBAL: return SERIAL;
}
}
static Map<InetAddressAndPort, EndpointState> verifyElectorate(Electorate remoteElectorate, Electorate localElectorate)
{
// verify electorates; if they differ, send back gossip info for superset of two participant sets
if (remoteElectorate.equals(localElectorate))
return emptyMap();
Map<InetAddressAndPort, EndpointState> endpoints = Maps.newHashMapWithExpectedSize(remoteElectorate.size() + localElectorate.size());
for (InetAddressAndPort host : remoteElectorate)
{
EndpointState endpoint = Gossiper.instance.copyEndpointStateForEndpoint(host);
if (endpoint == null)
{
NoSpamLogger.log(logger, WARN, 1, TimeUnit.MINUTES, "Remote electorate {} could not be found in Gossip", host);
continue;
}
endpoints.put(host, endpoint);
}
for (InetAddressAndPort host : localElectorate)
{
EndpointState endpoint = Gossiper.instance.copyEndpointStateForEndpoint(host);
if (endpoint == null)
{
NoSpamLogger.log(logger, WARN, 1, TimeUnit.MINUTES, "Local electorate {} could not be found in Gossip", host);
continue;
}
endpoints.putIfAbsent(host, endpoint);
}
return endpoints;
}
public static boolean useV2()
{
switch (PAXOS_VARIANT)
{
case v2_without_linearizable_reads_or_rejected_writes:
case v2_without_linearizable_reads:
case v2:
return true;
case v1:
case v1_without_linearizable_reads_or_rejected_writes:
return false;
default:
throw new AssertionError();
}
}
public static boolean isLinearizable()
{
switch (PAXOS_VARIANT)
{
case v2:
case v1:
return true;
case v2_without_linearizable_reads_or_rejected_writes:
case v2_without_linearizable_reads:
case v1_without_linearizable_reads_or_rejected_writes:
return false;
default:
throw new AssertionError();
}
}
public static void setPaxosVariant(Config.PaxosVariant paxosVariant)
{
Preconditions.checkNotNull(paxosVariant);
PAXOS_VARIANT = paxosVariant;
DatabaseDescriptor.setPaxosVariant(paxosVariant);
}
public static Config.PaxosVariant getPaxosVariant()
{
return PAXOS_VARIANT;
}
static boolean isOldParticipant(Replica replica)
{
String version = Gossiper.instance.getForEndpoint(replica.endpoint(), RELEASE_VERSION);
if (version == null)
return false;
try
{
return new CassandraVersion(version).compareTo(MODERN_PAXOS_RELEASE) < 0;
}
catch (Throwable t)
{
return false;
}
}
public static void evictHungRepairs()
{
PaxosTableRepairs.evictHungRepairs();
}
}