blob: a7f58e13837a1c96f6954d6a03cc4bc07a91901d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.PaxosMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.paxos.PaxosPrepare.Status.Outcome;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.vint.VIntCoding;
import static java.util.Collections.emptyMap;
import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_REQ;
import static org.apache.cassandra.net.Verb.PAXOS2_PREPARE_RSP;
import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE;
import static org.apache.cassandra.service.paxos.Commit.*;
import static org.apache.cassandra.service.paxos.Commit.CompareResult.WAS_REPROPOSED_BY;
import static org.apache.cassandra.service.paxos.Paxos.*;
import static org.apache.cassandra.service.paxos.PaxosPrepare.Status.Outcome.*;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.service.paxos.PaxosState.*;
import static org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.*;
import static org.apache.cassandra.utils.CollectionSerializer.deserializeMap;
import static org.apache.cassandra.utils.CollectionSerializer.newHashMap;
import static org.apache.cassandra.utils.CollectionSerializer.serializeMap;
import static org.apache.cassandra.utils.CollectionSerializer.serializedSizeMap;
import static org.apache.cassandra.utils.concurrent.Awaitable.SyncAwaitable.waitUntil;
/**
* Perform one paxos "prepare" attempt, with various optimisations.
*
* The prepare step entails asking for a quorum of nodes to promise to accept our later proposal. It can
* yield one of five logical answers:
*
* 1) Success - we have received a quorum of promises, and we know that a quorum of nodes
* witnessed the prior round's commit (if any)
* 2) Timeout - we have not received enough responses at all before our deadline passed
* 3) Failure - we have received too many explicit failures to succeed
* 4) Superseded - we have been informed of a later ballot that has been promised
* 5) FoundInProgress - we have been informed of an earlier promise that has been accepted
*
* Success hinges on two distinct criteria being met, as the quorum of promises may not guarantee a quorum of
* witnesses of the prior round's commit. We track this separately by recording those nodes that have witnessed
* the prior round's commit. On receiving a quorum of promises, we submit the prior round's commit to any promisers
* that had not witnessed it, while continuing to wait for responses to our original request: as soon as we hear of
* a quorum that have witnessed it, either by our refresh request or by responses to the original request, we yield Success.
*
* Success is also accompanied by a quorum of read responses, avoiding another round-trip to obtain this result.
*
* This operation may be started either with a solo Prepare command, or with a prefixed Commit command.
* If we are completing an in-progress round we previously discovered, we save another round-trip by committing and
* preparing simultaneously.
*/
public class PaxosPrepare extends PaxosRequestCallback<PaxosPrepare.Response> implements PaxosPrepareRefresh.Callbacks, Paxos.Async<PaxosPrepare.Status>
{
private static final Logger logger = LoggerFactory.getLogger(PaxosPrepare.class);
private static Runnable onLinearizabilityViolation;
public static final RequestHandler requestHandler = new RequestHandler();
public static final RequestSerializer requestSerializer = new RequestSerializer();
public static final ResponseSerializer responseSerializer = new ResponseSerializer();
/**
* Represents the current status of a prepare action: it is a status rather than a result,
* as the result may be unknown without sufficient responses (though in most cases it is final status).
*/
static class Status
{
enum Outcome { READ_PERMITTED, PROMISED, SUPERSEDED, FOUND_INCOMPLETE_ACCEPTED, FOUND_INCOMPLETE_COMMITTED, MAYBE_FAILURE, ELECTORATE_MISMATCH }
final Outcome outcome;
final Participants participants;
Status(Outcome outcome, Participants participants)
{
this.outcome = outcome;
this.participants = participants;
}
@Nullable
Ballot retryWithAtLeast()
{
switch (outcome)
{
case READ_PERMITTED: return ((Success) this).supersededBy;
case SUPERSEDED: return ((Superseded) this).by;
default: return null;
}
}
Success success() { return (Success) this; }
FoundIncompleteAccepted incompleteAccepted() { return (FoundIncompleteAccepted) this; }
FoundIncompleteCommitted incompleteCommitted() { return (FoundIncompleteCommitted) this; }
Paxos.MaybeFailure maybeFailure() { return ((MaybeFailure) this).info; }
}
static class Success extends WithRequestedBallot
{
final List<Message<ReadResponse>> responses;
final boolean isReadSafe; // read responses constitute a linearizable read (though short read protection would invalidate that)
final @Nullable
Ballot supersededBy; // if known and READ_SUCCESS
Success(Outcome outcome, Ballot ballot, Participants participants, List<Message<ReadResponse>> responses, boolean isReadSafe, @Nullable Ballot supersededBy)
{
super(outcome, participants, ballot);
this.responses = responses;
this.isReadSafe = isReadSafe;
this.supersededBy = supersededBy;
}
static Success read(Ballot ballot, Participants participants, List<Message<ReadResponse>> responses, @Nullable Ballot supersededBy)
{
return new Success(Outcome.READ_PERMITTED, ballot, participants, responses, true, supersededBy);
}
static Success readOrWrite(Ballot ballot, Participants participants, List<Message<ReadResponse>> responses, boolean isReadConsistent)
{
return new Success(Outcome.PROMISED, ballot, participants, responses, isReadConsistent, null);
}
public String toString() { return "Success(" + ballot + ", " + participants.electorate + ')'; }
}
/**
* The ballot we sought promises for has been superseded by another proposer's
*
* Note: we extend this for Success, so that supersededBy() can be called for ReadSuccess
*/
static class Superseded extends Status
{
final Ballot by;
Superseded(Ballot by, Participants participants)
{
super(SUPERSEDED, participants);
this.by = by;
}
public String toString() { return "Superseded(" + by + ')'; }
}
static class WithRequestedBallot extends Status
{
final Ballot ballot;
WithRequestedBallot(Outcome outcome, Participants participants, Ballot ballot)
{
super(outcome, participants);
this.ballot = ballot;
}
}
static class FoundIncomplete extends WithRequestedBallot
{
private FoundIncomplete(Outcome outcome, Participants participants, Ballot promisedBallot)
{
super(outcome, participants, promisedBallot);
}
}
/**
* We have been informed of a promise made by one of the replicas we contacted, that was not accepted by all replicas
* (though may have been accepted by a majority; we don't know).
* In this case we cannot readily know if we have prevented this proposal from being completed, so we attempt
* to finish it ourselves (unfortunately leaving the proposer to timeout, given the current semantics)
* TODO: we should consider waiting for more responses in case we encounter any successful commit, or a majority
* of acceptors?
*/
static class FoundIncompleteAccepted extends FoundIncomplete
{
final Accepted accepted;
private FoundIncompleteAccepted(Ballot promisedBallot, Participants participants, Accepted accepted)
{
super(FOUND_INCOMPLETE_ACCEPTED, participants, promisedBallot);
this.accepted = accepted;
}
public String toString()
{
return "FoundIncomplete" + accepted;
}
}
/**
* We have been informed of a proposal that was accepted by a majority, but we do not know has been
* committed to a majority, and we failed to read from a single natural replica that had witnessed this
* commit when we performed the read.
* Since this is an edge case, we simply start again, to keep the control flow more easily understood;
* the commit shouldld be committed to a majority as part of our re-prepare.
*/
static class FoundIncompleteCommitted extends FoundIncomplete
{
final Committed committed;
private FoundIncompleteCommitted(Ballot promisedBallot, Participants participants, Committed committed)
{
super(FOUND_INCOMPLETE_COMMITTED, participants, promisedBallot);
this.committed = committed;
}
public String toString()
{
return "FoundIncomplete" + committed;
}
}
static class MaybeFailure extends Status
{
final Paxos.MaybeFailure info;
private MaybeFailure(Paxos.MaybeFailure info, Participants participants)
{
super(MAYBE_FAILURE, participants);
this.info = info;
}
public String toString() { return info.toString(); }
}
static class ElectorateMismatch extends WithRequestedBallot
{
private ElectorateMismatch(Participants participants, Ballot ballot)
{
super(ELECTORATE_MISMATCH, participants, ballot);
}
}
private final boolean acceptEarlyReadPermission;
private final AbstractRequest<?> request;
private Ballot supersededBy; // cannot be promised, as a newer promise has been made
private Accepted latestAccepted; // the latest latestAcceptedButNotCommitted response we have received (which may still have been committed elsewhere)
private Committed latestCommitted; // latest actually committed proposal
private final Participants participants;
private final List<Message<ReadResponse>> readResponses;
private boolean haveReadResponseWithLatest;
private boolean haveQuorumOfPermissions; // permissions => SUCCESS or READ_SUCCESS
private List<InetAddressAndPort> withLatest; // promised and have latest commit
private List<InetAddressAndPort> needLatest; // promised without having witnessed latest commit, nor yet been refreshed by us
private int failures; // failed either on initial request or on refresh
private boolean hasProposalStability = true; // no successful modifying proposal could have raced with us and not been seen
private boolean hasOnlyPromises = true;
private long maxLowBound;
private Status outcome;
private final Consumer<Status> onDone;
private PaxosPrepareRefresh refreshStaleParticipants;
private boolean linearizabilityViolationDetected = false;
PaxosPrepare(Participants participants, AbstractRequest<?> request, boolean acceptEarlyReadPermission, Consumer<Status> onDone)
{
this.acceptEarlyReadPermission = acceptEarlyReadPermission;
assert participants.sizeOfConsensusQuorum > 0;
this.participants = participants;
this.request = request;
this.readResponses = new ArrayList<>(participants.sizeOfConsensusQuorum);
this.withLatest = new ArrayList<>(participants.sizeOfConsensusQuorum);
this.latestAccepted = this.latestCommitted = Committed.none(request.partitionKey, request.table);
this.onDone = onDone;
}
private boolean hasInProgressProposal()
{
// no need to commit a no-op; either it
// 1) reached a majority, in which case it was agreed, had no effect and we can do nothing; or
// 2) did not reach a majority, was not agreed, and was not user visible as a result so we can ignore it
if (latestAccepted.update.isEmpty())
return false;
// If we aren't newer than latestCommitted, then we're done
if (!latestAccepted.isAfter(latestCommitted))
return false;
if (latestAccepted.ballot.uuidTimestamp() <= maxLowBound)
return false;
// We can be a re-proposal of latestCommitted, in which case we do not need to re-propose it
return !latestAccepted.isReproposalOf(latestCommitted);
}
static PaxosPrepare prepare(Participants participants, SinglePartitionReadCommand readCommand, boolean isWrite, boolean acceptEarlyReadPermission) throws UnavailableException
{
return prepare(null, participants, readCommand, isWrite, acceptEarlyReadPermission);
}
static PaxosPrepare prepare(Ballot minimumBallot, Participants participants, SinglePartitionReadCommand readCommand, boolean isWrite, boolean acceptEarlyReadPermission) throws UnavailableException
{
return prepareWithBallot(newBallot(minimumBallot, participants.consistencyForConsensus), participants, readCommand, isWrite, acceptEarlyReadPermission);
}
static PaxosPrepare prepareWithBallot(Ballot ballot, Participants participants, SinglePartitionReadCommand readCommand, boolean isWrite, boolean acceptEarlyReadPermission)
{
Tracing.trace("Preparing {} with read", ballot);
Request request = new Request(ballot, participants.electorate, readCommand, isWrite);
return prepareWithBallotInternal(participants, request, acceptEarlyReadPermission, null);
}
@SuppressWarnings("SameParameterValue")
static <T extends Consumer<Status>> T prepareWithBallot(Ballot ballot, Participants participants, DecoratedKey partitionKey, TableMetadata table, boolean isWrite, boolean acceptEarlyReadPermission, T onDone)
{
Tracing.trace("Preparing {}", ballot);
prepareWithBallotInternal(participants, new Request(ballot, participants.electorate, partitionKey, table, isWrite), acceptEarlyReadPermission, onDone);
return onDone;
}
private static PaxosPrepare prepareWithBallotInternal(Participants participants, Request request, boolean acceptEarlyReadPermission, Consumer<Status> onDone)
{
PaxosPrepare prepare = new PaxosPrepare(participants, request, acceptEarlyReadPermission, onDone);
Message<Request> message = Message.out(PAXOS2_PREPARE_REQ, request);
start(prepare, participants, message, RequestHandler::execute);
return prepare;
}
/**
* Submit the message to our peers, and submit it for local execution if relevant
*/
static <R extends AbstractRequest<R>> void start(PaxosPrepare prepare, Participants participants, Message<R> send, BiFunction<R, InetAddressAndPort, Response> selfHandler)
{
boolean executeOnSelf = false;
for (int i = 0, size = participants.sizeOfPoll() ; i < size ; ++i)
{
InetAddressAndPort destination = participants.voter(i);
boolean isPending = participants.electorate.isPending(destination);
logger.trace("{} to {}", send.payload, destination);
if (shouldExecuteOnSelf(destination))
executeOnSelf = true;
else
MessagingService.instance().sendWithCallback(isPending ? withoutRead(send) : send, destination, prepare);
}
if (executeOnSelf)
send.verb().stage.execute(() -> prepare.executeOnSelf(send.payload, selfHandler));
}
// TODO: extend Sync?
public synchronized Status awaitUntil(long deadline)
{
try
{
//noinspection StatementWithEmptyBody
while (!isDone() && waitUntil(this, deadline)) {}
if (!isDone())
signalDone(MAYBE_FAILURE);
return outcome;
}
catch (InterruptedException e)
{
// can only normally be interrupted if the system is shutting down; should rethrow as a write failure but propagate the interrupt
Thread.currentThread().interrupt();
return new MaybeFailure(new Paxos.MaybeFailure(true, participants.sizeOfPoll(), participants.sizeOfConsensusQuorum, 0, emptyMap()), participants);
}
}
private boolean isDone()
{
return outcome != null;
}
private int withLatest()
{
return withLatest.size();
}
private int needLatest()
{
return needLatest == null ? 0 : needLatest.size();
}
private static boolean needsGossipUpdate(Map<InetAddressAndPort, EndpointState> gossipInfo)
{
if (gossipInfo.isEmpty())
return false;
for (Map.Entry<InetAddressAndPort, EndpointState> entry : gossipInfo.entrySet())
{
EndpointState remote = entry.getValue();
if (remote == null)
continue;
EndpointState local = Gossiper.instance.getEndpointStateForEndpoint(entry.getKey());
if (local == null || local.isSupersededBy(remote))
return true;
}
return false;
}
public synchronized void onResponse(Response response, InetAddressAndPort from)
{
if (logger.isTraceEnabled())
logger.trace("{} for {} from {}", response, request.ballot, from);
if (isDone())
{
maybeCheckForLinearizabilityViolation(response, from);
return;
}
if (response.isRejected())
{
Rejected rejected = response.rejected();
supersededBy = rejected.supersededBy;
signalDone(SUPERSEDED);
return;
}
Permitted permitted = response.permitted();
if (permitted.gossipInfo.isEmpty())
// we agree about the electorate, so can simply accept the promise/permission
permitted(permitted, from);
else if (!needsGossipUpdate(permitted.gossipInfo))
// our gossip is up-to-date, but our original electorate could have been built with stale gossip, so verify it
permittedOrTerminateIfElectorateMismatch(permitted, from);
else
// otherwise our beliefs about the ring potentially diverge, so update gossip with the peer's information
Stage.GOSSIP.executor().execute(() -> {
Gossiper.instance.notifyFailureDetector(permitted.gossipInfo);
Gossiper.instance.applyStateLocally(permitted.gossipInfo);
// TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager
// (which currently drops some migration tasks on the floor).
// Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout.
// once any pending ranges have been calculated, refresh our Participants list and submit the promise
PendingRangeCalculatorService.instance.executeWhenFinished(() -> permittedOrTerminateIfElectorateMismatch(permitted, from));
});
}
private synchronized void permittedOrTerminateIfElectorateMismatch(Permitted permitted, InetAddressAndPort from)
{
if (isDone()) // this execution is asynchronous wrt promise arrival, so must recheck done status
return;
// if the electorate has changed, finish so we can retry with the updated view of the ring
if (!Electorate.get(request.table, request.partitionKey, consistency(request.ballot)).equals(participants.electorate))
{
signalDone(ELECTORATE_MISMATCH);
return;
}
// otherwise continue as normal
permitted(permitted, from);
}
private void permitted(Permitted permitted, InetAddressAndPort from)
{
if (permitted.outcome != PROMISE)
{
hasOnlyPromises = false;
if (supersededBy == null)
supersededBy = permitted.supersededBy;
}
if (permitted.lowBound > maxLowBound)
maxLowBound = permitted.lowBound;
if (!haveQuorumOfPermissions)
{
CompareResult compareLatest = permitted.latestCommitted.compareWith(latestCommitted);
switch (compareLatest)
{
default: throw new IllegalStateException();
case IS_REPROPOSAL:
latestCommitted = permitted.latestCommitted;
case WAS_REPROPOSED_BY:
case SAME:
withLatest.add(from);
haveReadResponseWithLatest |= permitted.readResponse != null;
break;
case BEFORE:
if (needLatest == null)
needLatest = new ArrayList<>(participants.sizeOfPoll() - withLatest.size());
needLatest.add(from);
break;
case AFTER:
// move with->need
if (!withLatest.isEmpty())
{
if (needLatest == null)
{
needLatest = withLatest;
withLatest = new ArrayList<>(Math.min(participants.sizeOfPoll() - needLatest.size(), participants.sizeOfConsensusQuorum));
}
else
{
needLatest.addAll(withLatest);
withLatest.clear();
}
}
withLatest.add(from);
haveReadResponseWithLatest = permitted.readResponse != null;
latestCommitted = permitted.latestCommitted;
}
if (isAfter(permitted.latestAcceptedButNotCommitted, latestAccepted))
latestAccepted = permitted.latestAcceptedButNotCommitted;
if (permitted.readResponse != null)
{
hasProposalStability &= permitted.hadProposalStability;
addReadResponse(permitted.readResponse, from);
}
}
else
{
switch (permitted.latestCommitted.compareWith(latestCommitted))
{
default: throw new IllegalStateException();
case SAME:
case IS_REPROPOSAL:
case WAS_REPROPOSED_BY:
withLatest.add(from);
break;
case AFTER:
if (maybeCheckForLinearizabilityViolation(permitted, from))
return;
// witnessing future commit doesn't imply have seen prior, so add to refresh list
case BEFORE:
if (needLatest == null)
needLatest = new ArrayList<>(participants.sizeOfPoll() - withLatest.size());
needLatest.add(from);
}
}
haveQuorumOfPermissions |= withLatest() + needLatest() >= participants.sizeOfConsensusQuorum;
if (haveQuorumOfPermissions)
{
if (request.read != null && readResponses.size() < participants.sizeOfReadQuorum)
throw new IllegalStateException("Insufficient read responses: " + readResponses + "; need " + participants.sizeOfReadQuorum);
if (!hasOnlyPromises && !hasProposalStability)
signalDone(SUPERSEDED);
// We must be certain to have witnessed a quorum of responses before completing any in-progress proposal
// else we may complete a stale proposal that did not reach a quorum (and may do so in preference
// to a different in progress proposal that did reach a quorum).
// We should also be sure to return any in progress proposal in preference to any incompletely committed
// earlier commits (since, while we should encounter it next round, any commit that is incomplete in the
// presence of an incomplete proposal can be ignored, as either the proposal is a re-proposal of the same
// commit or the commit has already reached a quorum
else if (hasInProgressProposal())
signalDone(hasOnlyPromises ? FOUND_INCOMPLETE_ACCEPTED : SUPERSEDED);
else if (withLatest() >= participants.sizeOfConsensusQuorum)
signalDone(hasOnlyPromises ? PROMISED : READ_PERMITTED);
// otherwise if we have any read response with the latest commit,
// try to simply ensure it has been persisted to a consensus group
else if (haveReadResponseWithLatest)
{
refreshStaleParticipants();
// if an optimistic read is possible, and we are performing a read,
// we can safely answer immediately without waiting for the refresh
if (hasProposalStability && acceptEarlyReadPermission)
signalDone(Outcome.READ_PERMITTED);
}
// otherwise we need to run our reads again anyway,
// and the chance of receiving another response with latest may be slim.
// so we just start again
else
signalDone(FOUND_INCOMPLETE_COMMITTED);
}
}
private boolean maybeCheckForLinearizabilityViolation(Response response, InetAddressAndPort from)
{
if (!response.isPromised() || !haveQuorumOfPermissions || !hasOnlyPromises)
return false;
Permitted permitted = response.permitted();
if (permitted.latestCommitted.compareWith(latestCommitted) == CompareResult.AFTER)
return checkForLinearizabilityViolation(permitted, from);
return false;
}
private static boolean isRunningLegacyPaxos()
{
switch (getPaxosVariant())
{
case v1:
case v1_without_linearizable_reads_or_rejected_writes:
return true;
default:
return false;
}
}
private Ballot getLowBoundForKey()
{
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(request.table.id);
return cfs != null ? cfs.getPaxosRepairLowBound(request.partitionKey) : Ballot.none();
}
/**
* The linearizability check is incompatible with legacy paxos due to at least 2 issues:
* 1. The prepare phase doesn't evaluate accepted/committed ballots when promising ballots (excluding legacy_fixed)
* 2. Commits made at LOCAL_SERIAL are sent to all DCs
* Both issues will trigger linearizability violations, but are fixed by paxos repair. So we shouldn't do
* linearizability checks unless we're running v2 paxos and have had at least one paxos repair covering this
* operation's key.
*/
private boolean isCompatibleWithLinearizabilityCheck()
{
if (isRunningLegacyPaxos())
return false;
return getLowBoundForKey() != Ballot.none();
}
private boolean checkForLinearizabilityViolation(Permitted permitted, InetAddressAndPort from)
{
if (!isCompatibleWithLinearizabilityCheck())
return false;
if (linearizabilityViolationDetected)
return false;
// if we witness a newer commit AND are accepted something has gone wrong, except:
// if we have raced with an ongoing commit, having missed all of them initially
if (permitted.latestCommitted.hasSameBallot(latestAccepted))
return false;
// or in the case that we have an empty proposal accepted, since that will not be committed
// in theory in this case we could now restart refreshStaleParticipants, but this would
// unnecessarily complicate the logic so instead we accept that we will unnecessarily re-propose
if (latestAccepted != null && latestAccepted.update.isEmpty() && latestAccepted.isAfter(permitted.latestCommitted))
return false;
// or in the case that both are older than the most recent repair low bound), in which case a topology change
// could have ocurred that means not all paxos state tables know of the accept/commit, though it is persistent
// in theory in this case we could ignore this entirely and call ourselves done
// TODO: consider this more; is it possible we cause problems by reproposing an old accept?
// shouldn't be, as any newer accept that reaches a quorum will supersede
if (permitted.latestCommitted.ballot.uuidTimestamp() <= maxLowBound)
return false;
// if the lateset commit ballot doesn't have an encoded consistency level, it's from a legacy paxos operation.
// Legacy paxos operations would send commits to all replicas for LOCAL_SERIAL operations, which look like
// linearizability violations from datacenters the operation wasn't run in, so we ignore them here.
if (permitted.latestCommitted.ballot.flag() == NONE)
return false;
// If we discovered an incomplete proposal, it could have since completed successfullly
if (latestAccepted != null && outcome.outcome == FOUND_INCOMPLETE_ACCEPTED)
{
switch (permitted.latestCommitted.compareWith(latestAccepted))
{
case WAS_REPROPOSED_BY:
case SAME:
return false;
}
}
long gcGraceMicros = TimeUnit.SECONDS.toMicros(permitted.latestCommitted.update.metadata().params.gcGraceSeconds);
// paxos repair uses stale ballots, so comparing against request.ballot time will not completely prevent false
// positives, since compaction may have removed paxos metadata on some nodes and not others. It's also possible
// clock skew has placed the ballot to repair in the future, so we use now or the ballot, whichever is higher.
long maxNowMicros = Math.max(currentTimeMillis() * 1000, request.ballot.unixMicros());
long ageMicros = maxNowMicros - permitted.latestCommitted.ballot.unixMicros();
String modifier = "";
boolean isTtlViolation;
if (isTtlViolation = (ageMicros >= gcGraceMicros))
{
if (participants.hasOldParticipants())
modifier = " (older than legacy TTL expiry with at least one legacy participant)";
else
modifier = " (older than legacy TTL expiry)";
}
String message = String.format("Linearizability violation%s: %s witnessed %s of latest %s (withLatest: %s, readResponses: %s, maxLowBound: %s, status: %s); %s promised with latest %s",
modifier, request.ballot, consistency(request.ballot), latestCommitted,
withLatest, readResponses
.stream()
.map(Message::from)
.map(Object::toString)
.collect(Collectors.joining(", ", "[", "]")),
maxLowBound, outcome, from, permitted.latestCommitted);
PaxosMetrics.linearizabilityViolations.inc();
linearizabilityViolationDetected = true;
try
{
switch (DatabaseDescriptor.paxosOnLinearizabilityViolations())
{
default: throw new AssertionError();
case fail:
signalDone(new MaybeFailure(new Paxos.MaybeFailure(true, "A linearizability violation was detected", participants.sizeOfPoll(), participants.sizeOfConsensusQuorum, withLatest() + needLatest(), Collections.emptyMap()), participants));
return true;
case log:
if (isTtlViolation && LOG_TTL_LINEARIZABILITY_VIOLATIONS) logger.warn(message);
else logger.error(message);
return false;
case ignore:
return false;
}
}
finally
{
Runnable run = onLinearizabilityViolation;
if (run != null)
run.run();
}
}
/**
* Save a read response from a node that we know to have witnessed the most recent commit
*
* Must be invoked while owning lock
*/
private void addReadResponse(ReadResponse response, InetAddressAndPort from)
{
readResponses.add(Message.synthetic(from, PAXOS2_PREPARE_RSP, response));
}
@Override
public synchronized void onFailure(InetAddressAndPort from, RequestFailureReason reason)
{
if (logger.isTraceEnabled())
logger.trace("{} {} failure from {}", request, reason, from);
if (isDone())
return;
super.onFailureWithMutex(from, reason);
++failures;
if (failures + participants.sizeOfConsensusQuorum == 1 + participants.sizeOfPoll())
signalDone(MAYBE_FAILURE);
}
private void signalDone(Outcome kindOfOutcome)
{
signalDone(toStatus(kindOfOutcome));
}
private void signalDone(Status status)
{
if (isDone())
throw new IllegalStateException();
this.outcome = status;
if (onDone != null)
onDone.accept(outcome);
notifyAll();
}
private Status toStatus(Outcome outcome)
{
switch (outcome)
{
case ELECTORATE_MISMATCH:
return new ElectorateMismatch(participants, request.ballot);
case SUPERSEDED:
return new Superseded(supersededBy, participants);
case FOUND_INCOMPLETE_ACCEPTED:
return new FoundIncompleteAccepted(request.ballot, participants, latestAccepted);
case FOUND_INCOMPLETE_COMMITTED:
return new FoundIncompleteCommitted(request.ballot, participants, latestCommitted);
case PROMISED:
return Success.readOrWrite(request.ballot, participants, readResponses, hasProposalStability);
case READ_PERMITTED:
if (!hasProposalStability)
throw new IllegalStateException();
return Success.read(request.ballot, participants, readResponses, supersededBy);
case MAYBE_FAILURE:
return new MaybeFailure(new Paxos.MaybeFailure(participants, withLatest(), failureReasonsAsMap()), participants);
default:
throw new IllegalStateException();
}
}
/**
* See {@link PaxosPrepareRefresh}
*
* Must be invoked while owning lock
*/
private void refreshStaleParticipants()
{
if (refreshStaleParticipants == null)
refreshStaleParticipants = new PaxosPrepareRefresh(request.ballot, participants, latestCommitted, this);
refreshStaleParticipants.refresh(needLatest);
needLatest.clear();
}
@Override
public void onRefreshFailure(InetAddressAndPort from, RequestFailureReason reason)
{
onFailure(from, reason);
}
public synchronized void onRefreshSuccess(Ballot isSupersededBy, InetAddressAndPort from)
{
if (logger.isTraceEnabled())
logger.trace("Refresh {} from {}", isSupersededBy == null ? "Success" : "SupersededBy(" + isSupersededBy + ')', from);
if (isDone())
return;
if (isSupersededBy != null)
{
supersededBy = isSupersededBy;
if (hasProposalStability) signalDone(Outcome.READ_PERMITTED);
else signalDone(SUPERSEDED);
}
else
{
withLatest.add(from);
if (withLatest.size() >= participants.sizeOfConsensusQuorum)
signalDone(hasOnlyPromises ? Outcome.PROMISED : Outcome.READ_PERMITTED);
}
}
static abstract class AbstractRequest<R extends AbstractRequest<R>>
{
final Ballot ballot;
final Electorate electorate;
final SinglePartitionReadCommand read;
final boolean isForWrite;
final DecoratedKey partitionKey;
final TableMetadata table;
AbstractRequest(Ballot ballot, Electorate electorate, SinglePartitionReadCommand read, boolean isForWrite)
{
this.ballot = ballot;
this.electorate = electorate;
this.read = read;
this.isForWrite = isForWrite;
this.partitionKey = read.partitionKey();
this.table = read.metadata();
}
AbstractRequest(Ballot ballot, Electorate electorate, DecoratedKey partitionKey, TableMetadata table, boolean isForWrite)
{
this.ballot = ballot;
this.electorate = electorate;
this.partitionKey = partitionKey;
this.table = table;
this.read = null;
this.isForWrite = isForWrite;
}
abstract R withoutRead();
public String toString()
{
return "Prepare(" + ballot + ')';
}
}
static class Request extends AbstractRequest<Request>
{
Request(Ballot ballot, Electorate electorate, SinglePartitionReadCommand read, boolean isWrite)
{
super(ballot, electorate, read, isWrite);
}
private Request(Ballot ballot, Electorate electorate, DecoratedKey partitionKey, TableMetadata table, boolean isWrite)
{
super(ballot, electorate, partitionKey, table, isWrite);
}
Request withoutRead()
{
return read == null ? this : new Request(ballot, electorate, partitionKey, table, isForWrite);
}
public String toString()
{
return "Prepare(" + ballot + ')';
}
}
static class Response
{
final MaybePromise.Outcome outcome;
Response(MaybePromise.Outcome outcome)
{
this.outcome = outcome;
}
Permitted permitted() { return (Permitted) this; }
Rejected rejected() { return (Rejected) this; }
public boolean isRejected()
{
return outcome == REJECT;
}
public boolean isPromised()
{
return outcome == PROMISE;
}
}
static class Permitted extends Response
{
final long lowBound;
// a proposal that has been accepted but not committed, i.e. must be null or > latestCommit
@Nullable final Accepted latestAcceptedButNotCommitted;
final Committed latestCommitted;
@Nullable final ReadResponse readResponse;
// latestAcceptedButNotCommitted and latestCommitted were the same before and after the read occurred, and no incomplete promise was witnessed
final boolean hadProposalStability;
final Map<InetAddressAndPort, EndpointState> gossipInfo;
@Nullable final Ballot supersededBy;
Permitted(MaybePromise.Outcome outcome, long lowBound, @Nullable Accepted latestAcceptedButNotCommitted, Committed latestCommitted, @Nullable ReadResponse readResponse, boolean hadProposalStability, Map<InetAddressAndPort, EndpointState> gossipInfo, @Nullable Ballot supersededBy)
{
super(outcome);
this.lowBound = lowBound;
this.latestAcceptedButNotCommitted = latestAcceptedButNotCommitted;
this.latestCommitted = latestCommitted;
this.hadProposalStability = hadProposalStability;
this.readResponse = readResponse;
this.gossipInfo = gossipInfo;
this.supersededBy = supersededBy;
}
@Override
public String toString()
{
return "Promise(" + latestAcceptedButNotCommitted + ", " + latestCommitted + ", " + hadProposalStability + ", " + gossipInfo + ')';
}
}
static class Rejected extends Response
{
final Ballot supersededBy;
Rejected(Ballot supersededBy)
{
super(REJECT);
this.supersededBy = supersededBy;
}
@Override
public String toString()
{
return "RejectPromise(supersededBy=" + supersededBy + ')';
}
}
public static class RequestHandler implements IVerbHandler<Request>
{
@Override
public void doVerb(Message<Request> message)
{
Response response = execute(message.payload, message.from());
if (response == null)
MessagingService.instance().respondWithFailure(UNKNOWN, message);
else
MessagingService.instance().respond(response, message);
}
static Response execute(AbstractRequest<?> request, InetAddressAndPort from)
{
if (!isInRangeAndShouldProcess(from, request.partitionKey, request.table, request.read != null))
return null;
long start = nanoTime();
try (PaxosState state = get(request.partitionKey, request.table))
{
return execute(request, state);
}
finally
{
Keyspace.openAndGetStore(request.table).metric.casPrepare.addNano(nanoTime() - start);
}
}
static Response execute(AbstractRequest<?> request, PaxosState state)
{
MaybePromise result = state.promiseIfNewer(request.ballot, request.isForWrite);
switch (result.outcome)
{
case PROMISE:
case PERMIT_READ:
// verify electorates; if they differ, send back gossip info for superset of two participant sets
Map<InetAddressAndPort, EndpointState> gossipInfo = verifyElectorate(request.electorate, Electorate.get(request.table, request.partitionKey, consistency(request.ballot)));
ReadResponse readResponse = null;
// Check we cannot race with a proposal, i.e. that we have not made a promise that
// could be in the process of making a proposal. If a majority of nodes have made no such promise
// then either we must have witnessed it (since it must have been committed), or the proposal
// will now be rejected by our promises.
// This is logicaly complicated a bit by reading from a subset of the consensus group when there are
// pending nodes, however electorate verification we will cause us to retry if the pending status changes
// during execution; otherwise if the most recent commit we witnessed wasn't witnessed by a read response
// we will abort and retry, and we must witness it by the above argument.
Ballot mostRecentCommit = result.before.accepted != null
&& result.before.accepted.ballot.compareTo(result.before.committed.ballot) > 0
&& result.before.accepted.update.isEmpty()
? result.before.accepted.ballot : result.before.committed.ballot;
boolean hasProposalStability = mostRecentCommit.equals(result.before.promisedWrite)
|| mostRecentCommit.compareTo(result.before.promisedWrite) > 0;
if (request.read != null)
{
try (ReadExecutionController executionController = request.read.executionController();
UnfilteredPartitionIterator iterator = request.read.executeLocally(executionController))
{
readResponse = request.read.createResponse(iterator, executionController.getRepairedDataInfo());
}
if (hasProposalStability)
{
Snapshot now = state.current(request.ballot);
hasProposalStability = now.promisedWrite == result.after.promisedWrite
&& now.committed == result.after.committed
&& now.accepted == result.after.accepted;
}
}
Ballot supersededBy = result.outcome == PROMISE ? null : result.after.latestWitnessedOrLowBound();
Accepted acceptedButNotCommitted = result.after.accepted;
Committed committed = result.after.committed;
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(request.table.id);
long lowBound = cfs.getPaxosRepairLowBound(request.partitionKey).uuidTimestamp();
return new Permitted(result.outcome, lowBound, acceptedButNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
case REJECT:
return new Rejected(result.supersededBy());
default:
throw new IllegalStateException();
}
}
}
static abstract class AbstractRequestSerializer<R extends AbstractRequest<R>, T> implements IVersionedSerializer<R>
{
abstract R construct(T param, Ballot ballot, Electorate electorate, SinglePartitionReadCommand read, boolean isWrite);
abstract R construct(T param, Ballot ballot, Electorate electorate, DecoratedKey partitionKey, TableMetadata table, boolean isWrite);
@Override
public void serialize(R request, DataOutputPlus out, int version) throws IOException
{
request.ballot.serialize(out);
Electorate.serializer.serialize(request.electorate, out, version);
out.writeByte((request.read != null ? 1 : 0) | (request.isForWrite ? 0 : 2));
if (request.read != null)
{
ReadCommand.serializer.serialize(request.read, out, version);
}
else
{
request.table.id.serialize(out);
DecoratedKey.serializer.serialize(request.partitionKey, out, version);
}
}
public R deserialize(T param, DataInputPlus in, int version) throws IOException
{
Ballot ballot = Ballot.deserialize(in);
Electorate electorate = Electorate.serializer.deserialize(in, version);
byte flag = in.readByte();
if ((flag & 1) != 0)
{
SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) ReadCommand.serializer.deserialize(in, version);
return construct(param, ballot, electorate, readCommand, (flag & 2) == 0);
}
else
{
TableMetadata table = Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
DecoratedKey partitionKey = (DecoratedKey) DecoratedKey.serializer.deserialize(in, table.partitioner, version);
return construct(param, ballot, electorate, partitionKey, table, (flag & 2) != 0);
}
}
@Override
public long serializedSize(R request, int version)
{
return Ballot.sizeInBytes()
+ Electorate.serializer.serializedSize(request.electorate, version)
+ 1 + (request.read != null
? ReadCommand.serializer.serializedSize(request.read, version)
: request.table.id.serializedSize()
+ DecoratedKey.serializer.serializedSize(request.partitionKey, version));
}
}
public static class RequestSerializer extends AbstractRequestSerializer<Request, Object>
{
Request construct(Object ignore, Ballot ballot, Electorate electorate, SinglePartitionReadCommand read, boolean isWrite)
{
return new Request(ballot, electorate, read, isWrite);
}
Request construct(Object ignore, Ballot ballot, Electorate electorate, DecoratedKey partitionKey, TableMetadata table, boolean isWrite)
{
return new Request(ballot, electorate, partitionKey, table, isWrite);
}
public Request deserialize(DataInputPlus in, int version) throws IOException
{
return deserialize(null, in, version);
}
}
public static class ResponseSerializer implements IVersionedSerializer<Response>
{
public void serialize(Response response, DataOutputPlus out, int version) throws IOException
{
if (response.isRejected())
{
out.writeByte(0);
Rejected rejected = (Rejected) response;
rejected.supersededBy.serialize(out);
}
else
{
Permitted promised = (Permitted) response;
out.writeByte(1
| (promised.latestAcceptedButNotCommitted != null ? 2 : 0)
| (promised.readResponse != null ? 4 : 0)
| (promised.hadProposalStability ? 8 : 0)
| (promised.outcome == PERMIT_READ ? 16 : 0)
);
out.writeUnsignedVInt(promised.lowBound);
if (promised.latestAcceptedButNotCommitted != null)
Accepted.serializer.serialize(promised.latestAcceptedButNotCommitted, out, version);
Committed.serializer.serialize(promised.latestCommitted, out, version);
if (promised.readResponse != null)
ReadResponse.serializer.serialize(promised.readResponse, out, version);
serializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, promised.gossipInfo, out, version);
if (promised.outcome == PERMIT_READ)
promised.supersededBy.serialize(out);
}
}
public Response deserialize(DataInputPlus in, int version) throws IOException
{
byte flags = in.readByte();
if (flags == 0)
{
Ballot supersededBy = Ballot.deserialize(in);
return new Rejected(supersededBy);
}
else
{
long lowBound = in.readUnsignedVInt();
Accepted acceptedNotCommitted = (flags & 2) != 0 ? Accepted.serializer.deserialize(in, version) : null;
Committed committed = Committed.serializer.deserialize(in, version);
ReadResponse readResponse = (flags & 4) != 0 ? ReadResponse.serializer.deserialize(in, version) : null;
Map<InetAddressAndPort, EndpointState> gossipInfo = deserializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, newHashMap(), in, version);
MaybePromise.Outcome outcome = (flags & 16) != 0 ? PERMIT_READ : PROMISE;
boolean hasProposalStability = (flags & 8) != 0;
Ballot supersededBy = null;
if (outcome == PERMIT_READ)
supersededBy = Ballot.deserialize(in);
return new Permitted(outcome, lowBound, acceptedNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
}
}
public long serializedSize(Response response, int version)
{
if (response.isRejected())
{
return 1 + Ballot.sizeInBytes();
}
else
{
Permitted permitted = (Permitted) response;
return 1
+ VIntCoding.computeUnsignedVIntSize(permitted.lowBound)
+ (permitted.latestAcceptedButNotCommitted == null ? 0 : Accepted.serializer.serializedSize(permitted.latestAcceptedButNotCommitted, version))
+ Committed.serializer.serializedSize(permitted.latestCommitted, version)
+ (permitted.readResponse == null ? 0 : ReadResponse.serializer.serializedSize(permitted.readResponse, version))
+ serializedSizeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, permitted.gossipInfo, version)
+ (permitted.outcome == PERMIT_READ ? Ballot.sizeInBytes() : 0);
}
}
}
static <R extends AbstractRequest<R>> Message<R> withoutRead(Message<R> send)
{
if (send.payload.read == null)
return send;
return send.withPayload(send.payload.withoutRead());
}
public static void setOnLinearizabilityViolation(Runnable runnable)
{
assert onLinearizabilityViolation == null || runnable == null;
onLinearizabilityViolation = runnable;
}
}