blob: d88323cbcf5ef18b28d6937ea170d304b0289dc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClock;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS;
import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
import static org.apache.cassandra.net.Verb.PAXOS2_REPAIR_REQ;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.service.paxos.Commit.*;
import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.REPAIR;
import static org.apache.cassandra.service.paxos.ContentionStrategy.waitUntilForContention;
import static org.apache.cassandra.service.paxos.Paxos.*;
import static org.apache.cassandra.service.paxos.PaxosPrepare.*;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
/**
* Facility to finish any in-progress paxos transaction, and ensure that a quorum of nodes agree on the most recent operation.
* Semantically, we simply ensure that any side effects that were "decided" before repair was initiated have been committed
* to a quorum of nodes.
* This means:
* - any prepare that has _possibly_ reached a quorum of nodes will be invalidated
* - any proposal that has been accepted by at least one node, but not known to be committed to any, will be proposed again
* - any proposal that has been committed to at least one node, but not committed to all, will be committed to a quorum
*
* Note that once started, this continues to try to repair any ongoing operations for the partition up to 4 times.
* In a functioning cluster this should always be possible, but during a network partition this might cause the repair
* to fail.
*
* Requirements for correction:
* - If performed during a range movement, we depend on a quorum (of the new topology) have been informed of the new
* topology _prior_ to initiating this repair, and this node to have been a member of a quorum of nodes verifying
* - If a quorum of nodes is unaware of the new topology prior to initiating repair, an operation could simply occur
* after repair completes that permits a linearization failure, such as with CASSANDRA-15745.
* their topology is up-to-date.
* - Paxos prepare rounds must also verify the topology being used with their peers
* - If prepare rounds do not verify their topology, a node that is not a member of the quorum who have agreed
* the latest topology could still perform an operation without being aware of the topology change, and permit a
* linearization failure, such as with CASSANDRA-15745.
*
* With these issues addressed elsewhere, our algorithm is fairly simple.
* In brief:
* 1) Query all replicas for any promises or proposals they have witnessed that have not been committed,
* and their most recent commit. Wait for a quorum of responses.
* 2) If this is the first time we have queried other nodes, we take a note of the most recent ballot we see;
* If this is not the first time we have queried other nodes, and we have committed a newer ballot than the one
* we previously recorded, we terminate (somebody else has done the work for us).
* 3) If we see an in-progress operation that is very recent, we wait for it to complete and try again
* 4) If we see a previously accepted operation, we attempt to complete it, or
* if we see a prepare with no proposal, we propose an empty update to invalidate it;
* otherwise we have nothing to do, as there is no operation that can have produced a side-effect before we began.
* 5) We prepare a paxos round to agree the new commit using a higher ballot than the one witnessed,
* but a lower than one we would propose a new operation with. This permits newer operations to "beat" us so
* that we do not interfere with normal paxos operations.
* 6) If we are "beaten" we start again (without delay, as (2) manages delays where necessary)
*/
public class PaxosRepair extends AbstractPaxosRepair
{
private static final Logger logger = LoggerFactory.getLogger(PaxosRepair.class);
public static final RequestSerializer requestSerializer = new RequestSerializer();
public static final ResponseSerializer responseSerializer = new ResponseSerializer();
public static final RequestHandler requestHandler = new RequestHandler();
private static final long RETRY_TIMEOUT_NANOS = getRetryTimeoutNanos();
private static final ScheduledExecutorPlus RETRIES = executorFactory().scheduled("PaxosRepairRetries");
private static long getRetryTimeoutNanos()
{
long retryMillis = PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS.getLong();
return TimeUnit.MILLISECONDS.toNanos(retryMillis);
}
private final TableMetadata table;
private final ConsistencyLevel paxosConsistency;
private Participants participants;
private Ballot successCriteria;
private Ballot prevSupersededBy;
private int attempts;
public String toString()
{
return "PaxosRepair{" +
"key=" + partitionKey() +
", table=" + table.toString() +
", consistency=" + paxosConsistency +
", participants=" + participants.electorate +
", state=" + state() +
", startedMillis=" + MonotonicClock.Global.approxTime.translate().toMillisSinceEpoch(startedNanos()) +
", started=" + isStarted() +
'}';
}
/**
* Waiting for responses to PAXOS_REPAIR messages.
*
* This state may be entered multiple times; every time we fail for any reason, we restart from this state
*/
private class Querying extends State implements RequestCallbackWithFailure<Response>, Runnable
{
private int successes;
private int failures;
private Ballot latestWitnessed;
private @Nullable Accepted latestAccepted;
private Committed latestCommitted;
private Ballot oldestCommitted;
private Ballot clashingPromise;
@Override
public void onFailure(InetAddressAndPort from, RequestFailureReason reason)
{
updateState(this, null, (i1, i2) -> i1.onFailure());
}
@Override
public void onResponse(Message<Response> msg)
{
logger.trace("PaxosRepair {} from {}", msg.payload, msg.from());
updateState(this, msg, Querying::onResponseInternal);
}
private State onFailure()
{
if (++failures + participants.sizeOfConsensusQuorum > participants.sizeOfPoll())
return retry(this);
return this;
}
private State onResponseInternal(Message<Response> msg)
{
latestWitnessed = latest(latestWitnessed, msg.payload.latestWitnessedOrLowBound);
latestAccepted = latest(latestAccepted, msg.payload.acceptedButNotCommitted);
latestCommitted = latest(latestCommitted, msg.payload.committed);
if (oldestCommitted == null || isAfter(oldestCommitted, msg.payload.committed))
oldestCommitted = msg.payload.committed.ballot;
if (isAfter(latestWitnessed, clashingPromise))
clashingPromise = null;
if (timestampsClash(latestAccepted, msg.payload.latestWitnessedOrLowBound))
clashingPromise = msg.payload.latestWitnessedOrLowBound;
if (timestampsClash(latestAccepted, latestWitnessed))
clashingPromise = latestWitnessed;
// once we receive the requisite number, we can simply proceed, and ignore future responses
if (++successes == participants.sizeOfConsensusQuorum)
return execute();
return this;
}
private State execute()
{
// if we have a timestamp clash, always prefer the accepted ballot
latestWitnessed = latest(latestAccepted, latestWitnessed);
Ballot latestPreviouslyWitnessed = latest(successCriteria, prevSupersededBy);
// Save as success criteria the latest promise seen by our first round; if we ever see anything
// newer committed, we know at least one paxos round has been completed since we started, which is all we need
// or newer than this committed we know we're done, so to avoid looping indefinitely in competition
// with others, we store this ballot for future retries so we can terminate based on other proposers' work
if (successCriteria == null || timestampsClash(successCriteria, latestWitnessed))
{
if (logger.isTraceEnabled())
logger.trace("PaxosRepair of {} setting success criteria to {}", partitionKey(), Ballot.toString(latestWitnessed));
successCriteria = latestWitnessed;
}
boolean hasCommittedSuccessCriteria = isAfter(latestCommitted, successCriteria) || latestCommitted.hasBallot(successCriteria);
boolean isPromisedButNotAccepted = isAfter(latestWitnessed, latestAccepted); // not necessarily promised - may be lowBound
boolean isAcceptedButNotCommitted = isAfter(latestAccepted, latestCommitted);
boolean reproposalMayBeRejected = clashingPromise != null || !isAfter(latestWitnessed, latestPreviouslyWitnessed);
if (hasCommittedSuccessCriteria)
{
if (logger.isTraceEnabled())
logger.trace("PaxosRepair witnessed {} newer than success criteria {} (oldest: {})", latestCommitted, Ballot.toString(successCriteria), Ballot.toString(oldestCommitted));
// we have a new enough commit, but it might not have reached enough participants; make sure it has before terminating
// note: we could send to only those we know haven't witnessed it, but this is a rare operation so a small amount of redundant work is fine
return oldestCommitted.equals(latestCommitted.ballot)
? DONE
: PaxosCommit.commit(latestCommitted, participants, paxosConsistency, commitConsistency(), true,
new CommittingRepair());
}
else if (isAcceptedButNotCommitted && !isPromisedButNotAccepted && !reproposalMayBeRejected)
{
if (logger.isTraceEnabled())
logger.trace("PaxosRepair of {} completing {}", partitionKey(), latestAccepted);
// We need to complete this in-progress accepted proposal, which may not have been seen by a majority
// However, since we have not sought any promises, we can simply complete the existing proposal
// since this is an idempotent operation - both us and the original proposer (and others) can
// all do it at the same time without incident
// If ballots with same timestamp have been both accepted and rejected by different nodes,
// to avoid a livelock we simply try to poison, knowing we will fail but use a new ballot
// (note there are alternative approaches but this is conservative)
return PaxosPropose.propose(latestAccepted, participants, false,
new ProposingRepair(latestAccepted));
}
else if (isAcceptedButNotCommitted || isPromisedButNotAccepted || latestWitnessed.compareTo(latestPreviouslyWitnessed) < 0)
{
Ballot ballot = staleBallotNewerThan(latest(latestWitnessed, latestPreviouslyWitnessed), paxosConsistency);
// We need to propose a no-op > latestPromised, to ensure we don't later discover
// that latestPromised had already been accepted (by a minority) and repair it
// This means starting a new ballot, but we choose to use one that is likely to lose a contention battle
// Since this operation is not urgent, and we can piggy-back on other paxos operations
if (logger.isTraceEnabled())
logger.trace("PaxosRepair of {} found incomplete promise or proposal; preparing stale ballot {}", partitionKey(), Ballot.toString(ballot));
return prepareWithBallot(ballot, participants, partitionKey(), table, false, false,
new PoisonProposals());
}
else
{
logger.error("PaxosRepair illegal state latestWitnessed={}, latestAcceptedButNotCommitted={}, latestCommitted={}, oldestCommitted={}", latestWitnessed, latestAccepted, latestCommitted, oldestCommitted);
throw new IllegalStateException(); // should be logically impossible
}
}
public void run()
{
Message<Request> message = Message.out(PAXOS2_REPAIR_REQ, new Request(partitionKey(), table));
for (int i = 0, size = participants.sizeOfPoll(); i < size ; ++i)
MessagingService.instance().sendWithCallback(message, participants.voter(i), this);
}
}
/**
* We found either an incomplete promise or proposal, so we need to start a new paxos round to complete them
*/
private class PoisonProposals extends ConsumerState<Status>
{
@Override
public State execute(Status input) throws Throwable
{
switch (input.outcome)
{
case MAYBE_FAILURE:
return retry(this);
case READ_PERMITTED:
case SUPERSEDED:
prevSupersededBy = latest(prevSupersededBy, input.retryWithAtLeast());
return retry(this);
case FOUND_INCOMPLETE_ACCEPTED:
{
// finish the in-progress proposal
// cannot simply restart, as our latest promise is newer than the proposal
// so we require a promise before we decide which proposal to complete
// (else an "earlier" operation can sneak in and invalidate us while we're proposing
// with a newer ballot)
FoundIncompleteAccepted incomplete = input.incompleteAccepted();
Proposal propose = new Proposal(incomplete.ballot, incomplete.accepted.update);
logger.trace("PaxosRepair of {} found incomplete {}", partitionKey(), incomplete.accepted);
return PaxosPropose.propose(propose, participants, false,
new ProposingRepair(propose)); // we don't know if we're done, so we must restart
}
case FOUND_INCOMPLETE_COMMITTED:
{
// finish the in-progress commit
FoundIncompleteCommitted incomplete = input.incompleteCommitted();
logger.trace("PaxosRepair of {} found in progress {}", partitionKey(), incomplete.committed);
return PaxosCommit.commit(incomplete.committed, participants, paxosConsistency, commitConsistency(), true,
new CommitAndRestart()); // we don't know if we're done, so we must restart
}
case PROMISED:
{
// propose the empty ballot
logger.trace("PaxosRepair of {} submitting empty proposal", partitionKey());
Proposal proposal = Proposal.empty(input.success().ballot, partitionKey(), table);
return PaxosPropose.propose(proposal, participants, false,
new ProposingRepair(proposal));
}
default:
throw new IllegalStateException();
}
}
}
private class ProposingRepair extends ConsumerState<PaxosPropose.Status>
{
final Proposal proposal;
private ProposingRepair(Proposal proposal)
{
this.proposal = proposal;
}
@Override
public State execute(PaxosPropose.Status input)
{
switch (input.outcome)
{
case MAYBE_FAILURE:
return retry(this);
case SUPERSEDED:
if (isAfter(input.superseded().by, prevSupersededBy))
prevSupersededBy = input.superseded().by;
return retry(this);
case SUCCESS:
if (proposal.update.isEmpty())
{
logger.trace("PaxosRepair of {} complete after successful empty proposal", partitionKey());
return DONE;
}
logger.trace("PaxosRepair of {} committing successful proposal {}", partitionKey(), proposal);
return PaxosCommit.commit(proposal.agreed(), participants, paxosConsistency, commitConsistency(), true,
new CommittingRepair());
default:
throw new IllegalStateException();
}
}
}
private class CommittingRepair extends ConsumerState<PaxosCommit.Status>
{
@Override
public State execute(PaxosCommit.Status input)
{
logger.trace("PaxosRepair of {} {}", partitionKey(), input);
return input.isSuccess() ? DONE : retry(this);
}
}
private class CommitAndRestart extends ConsumerState<PaxosCommit.Status>
{
@Override
public State execute(PaxosCommit.Status input)
{
return restart(this);
}
}
private PaxosRepair(DecoratedKey partitionKey, Ballot incompleteBallot, TableMetadata table, ConsistencyLevel paxosConsistency)
{
super(partitionKey, incompleteBallot);
// TODO: move precondition into super ctor
Preconditions.checkArgument(paxosConsistency.isSerialConsistency());
this.table = table;
this.paxosConsistency = paxosConsistency;
this.successCriteria = incompleteBallot;
}
public static PaxosRepair create(ConsistencyLevel consistency, DecoratedKey partitionKey, Ballot incompleteBallot, TableMetadata table)
{
return new PaxosRepair(partitionKey, incompleteBallot, table, consistency);
}
private State retry(State state)
{
Preconditions.checkState(isStarted());
if (isResult(state))
return state;
return restart(state, waitUntilForContention(++attempts, table, partitionKey(), paxosConsistency, REPAIR));
}
@Override
public State restart(State state, long waitUntil)
{
if (isResult(state))
return state;
participants = Participants.get(table, partitionKey(), paxosConsistency);
if (waitUntil > Long.MIN_VALUE && waitUntil - startedNanos() > RETRY_TIMEOUT_NANOS)
return new Failure(null);
try
{
participants.assureSufficientLiveNodesForRepair();
}
catch (UnavailableException e)
{
return new Failure(e);
}
Querying querying = new Querying();
long now;
if (waitUntil == Long.MIN_VALUE || waitUntil - (now = nanoTime()) < 0) querying.run();
else RETRIES.schedule(querying, waitUntil - now, NANOSECONDS);
return querying;
}
private ConsistencyLevel commitConsistency()
{
Preconditions.checkState(paxosConsistency.isSerialConsistency());
return paxosConsistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
}
static class Request
{
final DecoratedKey partitionKey;
final TableMetadata table;
Request(DecoratedKey partitionKey, TableMetadata table)
{
this.partitionKey = partitionKey;
this.table = table;
}
}
/**
* The response to a proposal, indicating success (if {@code supersededBy == null},
* or failure, alongside the ballot that beat us
*/
static class Response
{
@Nonnull final Ballot latestWitnessedOrLowBound;
@Nullable final Accepted acceptedButNotCommitted;
@Nonnull final Committed committed;
Response(Ballot latestWitnessedOrLowBound, @Nullable Accepted acceptedButNotCommitted, Committed committed)
{
this.latestWitnessedOrLowBound = latestWitnessedOrLowBound;
this.acceptedButNotCommitted = acceptedButNotCommitted;
this.committed = committed;
}
public String toString()
{
return String.format("Response(%s, %s, %s", latestWitnessedOrLowBound, acceptedButNotCommitted, committed);
}
}
private static Map<String, Set<InetAddressAndPort>> mapToDc(Collection<InetAddressAndPort> endpoints, Function<InetAddressAndPort, String> dcFunc)
{
Map<String, Set<InetAddressAndPort>> map = new HashMap<>();
endpoints.forEach(e -> map.computeIfAbsent(dcFunc.apply(e), k -> new HashSet<>()).add(e));
return map;
}
private static boolean hasQuorumOrSingleDead(Collection<InetAddressAndPort> all, Collection<InetAddressAndPort> live, boolean requireQuorum)
{
Preconditions.checkArgument(all.size() >= live.size());
return live.size() >= (all.size() / 2) + 1 || (!requireQuorum && live.size() >= all.size() - 1);
}
@VisibleForTesting
static boolean hasSufficientLiveNodesForTopologyChange(Collection<InetAddressAndPort> allEndpoints, Collection<InetAddressAndPort> liveEndpoints, Function<InetAddressAndPort, String> dcFunc, boolean onlyQuorumRequired, boolean strictQuorum)
{
Map<String, Set<InetAddressAndPort>> allDcMap = mapToDc(allEndpoints, dcFunc);
Map<String, Set<InetAddressAndPort>> liveDcMap = mapToDc(liveEndpoints, dcFunc);
if (!hasQuorumOrSingleDead(allEndpoints, liveEndpoints, strictQuorum))
return false;
if (onlyQuorumRequired)
return true;
for (Map.Entry<String, Set<InetAddressAndPort>> entry : allDcMap.entrySet())
{
Set<InetAddressAndPort> all = entry.getValue();
Set<InetAddressAndPort> live = liveDcMap.getOrDefault(entry.getKey(), Collections.emptySet());
if (!hasQuorumOrSingleDead(all, live, strictQuorum))
return false;
}
return true;
}
/**
* checks if we have enough live nodes to perform a paxos repair for topology repair. Generally, this means that we need enough
* live participants to reach EACH_QUORUM, with a few exceptions. The EACH_QUORUM requirement is meant to support workload using either
* SERIAL or LOCAL_SERIAL
*
* if paxos_topology_repair_strict_each_quorum is set to false (the default), we will accept either a quorum or n-1 live nodes
* in the cluster and per dc. If paxos_topology_repair_no_dc_checks is true, we only check the live nodes in the cluster,
* and do not do any per-dc checks.
*/
public static boolean hasSufficientLiveNodesForTopologyChange(Keyspace keyspace, Range<Token> range, Collection<InetAddressAndPort> liveEndpoints)
{
return hasSufficientLiveNodesForTopologyChange(keyspace.getReplicationStrategy().getNaturalReplicasForToken(range.right).endpoints(),
liveEndpoints,
DatabaseDescriptor.getEndpointSnitch()::getDatacenter,
DatabaseDescriptor.paxoTopologyRepairNoDcChecks(),
DatabaseDescriptor.paxoTopologyRepairStrictEachQuorum());
}
/**
* The proposal request handler, i.e. receives a proposal from a peer and responds with either acccept/reject
*/
public static class RequestHandler implements IVerbHandler<PaxosRepair.Request>
{
@Override
public void doVerb(Message<PaxosRepair.Request> message)
{
PaxosRepair.Request request = message.payload;
if (!isInRangeAndShouldProcess(message.from(), request.partitionKey, request.table, false))
{
MessagingService.instance().respondWithFailure(UNKNOWN, message);
return;
}
Ballot latestWitnessed;
Accepted acceptedButNotCommited;
Committed committed;
int nowInSec = FBUtilities.nowInSeconds();
try (PaxosState state = PaxosState.get(request.partitionKey, request.table))
{
PaxosState.Snapshot snapshot = state.current(nowInSec);
latestWitnessed = snapshot.latestWitnessedOrLowBound();
acceptedButNotCommited = snapshot.accepted;
committed = snapshot.committed;
}
Response response = new Response(latestWitnessed, acceptedButNotCommited, committed);
MessagingService.instance().respond(response, message);
}
}
public static class RequestSerializer implements IVersionedSerializer<Request>
{
@Override
public void serialize(Request request, DataOutputPlus out, int version) throws IOException
{
request.table.id.serialize(out);
DecoratedKey.serializer.serialize(request.partitionKey, out, version);
}
@Override
public Request deserialize(DataInputPlus in, int version) throws IOException
{
TableMetadata table = Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
DecoratedKey partitionKey = (DecoratedKey) DecoratedKey.serializer.deserialize(in, table.partitioner, version);
return new Request(partitionKey, table);
}
@Override
public long serializedSize(Request request, int version)
{
return request.table.id.serializedSize()
+ DecoratedKey.serializer.serializedSize(request.partitionKey, version);
}
}
public static class ResponseSerializer implements IVersionedSerializer<Response>
{
public void serialize(Response response, DataOutputPlus out, int version) throws IOException
{
response.latestWitnessedOrLowBound.serialize(out);
serializeNullable(Accepted.serializer, response.acceptedButNotCommitted, out, version);
Committed.serializer.serialize(response.committed, out, version);
}
public Response deserialize(DataInputPlus in, int version) throws IOException
{
Ballot latestWitnessed = Ballot.deserialize(in);
Accepted acceptedButNotCommitted = deserializeNullable(Accepted.serializer, in, version);
Committed committed = Committed.serializer.deserialize(in, version);
return new Response(latestWitnessed, acceptedButNotCommitted, committed);
}
public long serializedSize(Response response, int version)
{
return Ballot.sizeInBytes()
+ serializedSizeNullable(Accepted.serializer, response.acceptedButNotCommitted, version)
+ Committed.serializer.serializedSize(response.committed, version);
}
}
private static volatile boolean SKIP_VERSION_VALIDATION = Boolean.getBoolean("cassandra.skip_paxos_repair_version_validation");
public static void setSkipPaxosRepairCompatibilityCheck(boolean v)
{
SKIP_VERSION_VALIDATION = v;
}
public static boolean getSkipPaxosRepairCompatibilityCheck()
{
return SKIP_VERSION_VALIDATION;
}
static boolean validateVersionCompatibility(CassandraVersion version)
{
if (SKIP_VERSION_VALIDATION)
return true;
if (version == null)
return false;
// assume 4.0 is ok
return (version.major == 4 && version.minor > 0) || version.major > 4;
}
static String getPeerVersion(InetAddressAndPort peer)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(peer);
if (epState == null)
return null;
VersionedValue value = epState.getApplicationState(ApplicationState.RELEASE_VERSION);
if (value == null)
return null;
try
{
return value.value;
}
catch (IllegalArgumentException e)
{
return null;
}
}
static boolean validatePeerCompatibility(Replica peer)
{
String versionString = getPeerVersion(peer.endpoint());
CassandraVersion version = versionString != null ? new CassandraVersion(versionString) : null;
boolean result = validateVersionCompatibility(version);
if (!result)
logger.info("PaxosRepair isn't supported by {} on version {}", peer, versionString);
return result;
}
static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range)
{
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL);
return Iterables.all(participants.all, PaxosRepair::validatePeerCompatibility);
}
public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges)
{
return Iterables.all(ranges, range -> validatePeerCompatibility(table, range));
}
public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownAndWait(timeout, units, RETRIES);
}
}