blob: 22f064dbb76ff529e82090027afb8339aaa15c1a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.cassandra.service.paxos;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.metrics.PaxosMetrics;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.service.paxos.uncommitted.PaxosBallotTracker;
import org.apache.cassandra.service.paxos.uncommitted.PaxosStateTracker;
import org.apache.cassandra.service.paxos.uncommitted.PaxosUncommittedTracker;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Nemesis;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_DISABLE_COORDINATOR_LOCKING;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.config.Config.PaxosStatePurging.gc_grace;
import static org.apache.cassandra.config.Config.PaxosStatePurging.legacy;
import static org.apache.cassandra.config.DatabaseDescriptor.paxosStatePurging;
import static org.apache.cassandra.service.paxos.Commit.*;
import static org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.*;
import static org.apache.cassandra.service.paxos.Commit.Accepted.latestAccepted;
import static org.apache.cassandra.service.paxos.Commit.Committed.latestCommitted;
import static org.apache.cassandra.service.paxos.Commit.isAfter;
/**
* We save to memory the result of each operation before persisting to disk, however each operation that performs
* the update does not return a result to the coordinator until the result is fully persisted.
*/
public class PaxosState implements PaxosOperationLock
{
private static volatile boolean DISABLE_COORDINATOR_LOCKING = PAXOS_DISABLE_COORDINATOR_LOCKING.getBoolean();
public static final ConcurrentHashMap<Key, PaxosState> ACTIVE = new ConcurrentHashMap<>();
public static final Map<Key, Snapshot> RECENT = Caffeine.newBuilder()
.maximumWeight(DatabaseDescriptor.getPaxosCacheSizeInMiB() << 20)
.<Key, Snapshot>weigher((k, v) -> Ints.saturatedCast((v.accepted != null ? v.accepted.update.unsharedHeapSize() : 0L) + v.committed.update.unsharedHeapSize()))
.executor(ImmediateExecutor.INSTANCE)
.build().asMap();
private static class TrackerHandle
{
static final PaxosStateTracker tracker;
static
{
try
{
tracker = PaxosStateTracker.create(Directories.dataDirectories);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
public static void setDisableCoordinatorLocking(boolean disable)
{
DISABLE_COORDINATOR_LOCKING = disable;
}
public static boolean getDisableCoordinatorLocking()
{
return DISABLE_COORDINATOR_LOCKING;
}
public static PaxosUncommittedTracker uncommittedTracker()
{
return TrackerHandle.tracker.uncommitted();
}
public static PaxosBallotTracker ballotTracker()
{
return TrackerHandle.tracker.ballots();
}
public static void initializeTrackers()
{
Preconditions.checkState(TrackerHandle.tracker != null);
PaxosMetrics.initialize();
}
public static void maybeRebuildUncommittedState() throws IOException
{
TrackerHandle.tracker.maybeRebuild();
}
public static void startAutoRepairs()
{
TrackerHandle.tracker.uncommitted().startAutoRepairs();
}
public static class Key
{
final DecoratedKey partitionKey;
final TableMetadata metadata;
public Key(DecoratedKey partitionKey, TableMetadata metadata)
{
this.partitionKey = partitionKey;
this.metadata = metadata;
}
public int hashCode()
{
return partitionKey.hashCode() * 31 + metadata.id.hashCode();
}
public boolean equals(Object that)
{
return that instanceof Key && equals((Key) that);
}
public boolean equals(Key that)
{
return this.partitionKey.equals(that.partitionKey)
&& this.metadata.id.equals(that.metadata.id);
}
}
public static class Snapshot
{
public final @Nonnull
Ballot promised;
public final @Nonnull
Ballot promisedWrite; // <= promised
public final @Nullable Accepted accepted; // if already committed, this will be null
public final @Nonnull Committed committed;
public Snapshot(@Nonnull Ballot promised, @Nonnull Ballot promisedWrite, @Nullable Accepted accepted, @Nonnull Committed committed)
{
assert isAfter(promised, promisedWrite) || promised == promisedWrite;
assert accepted == null || accepted.update.partitionKey().equals(committed.update.partitionKey());
assert accepted == null || accepted.update.metadata().id.equals(committed.update.metadata().id);
assert accepted == null || committed.isBefore(accepted.ballot);
this.promised = promised;
this.promisedWrite = promisedWrite;
this.accepted = accepted;
this.committed = committed;
}
public @Nonnull
Ballot latestWitnessedOrLowBound(Ballot latestWriteOrLowBound)
{
return promised == promisedWrite ? latestWriteOrLowBound : latest(promised, latestWriteOrLowBound);
}
public @Nonnull
Ballot latestWitnessedOrLowBound()
{
// warn: if proposal has same timestamp as promised, we should prefer accepted
// since (if different) it reached a quorum of promises; this means providing it as first argument
Ballot latest;
latest = latest(accepted, committed).ballot;
latest = latest(latest, promised);
latest = latest(latest, ballotTracker().getLowBound());
return latest;
}
public @Nonnull
Ballot latestWriteOrLowBound()
{
// warn: if proposal has same timestamp as promised, we should prefer accepted
// since (if different) it reached a quorum of promises; this means providing it as first argument
Ballot latest = accepted != null && !accepted.update.isEmpty() ? accepted.ballot : null;
latest = latest(latest, committed.ballot);
latest = latest(latest, promisedWrite);
latest = latest(latest, ballotTracker().getLowBound());
return latest;
}
public static Snapshot merge(Snapshot a, Snapshot b)
{
if (a == null || b == null)
return a == null ? b : a;
Committed committed = latestCommitted(a.committed, b.committed);
if (a instanceof UnsafeSnapshot && b instanceof UnsafeSnapshot)
return new UnsafeSnapshot(committed);
Accepted accepted;
Ballot promised, promisedWrite;
if (a instanceof UnsafeSnapshot || b instanceof UnsafeSnapshot)
{
if (a instanceof UnsafeSnapshot)
a = b; // we already have the winning Committed saved above, so just want the full snapshot (if either)
if (committed == a.committed)
return a;
promised = a.promised;
promisedWrite = a.promisedWrite;
accepted = isAfter(a.accepted, committed) ? a.accepted : null;
}
else
{
accepted = latestAccepted(a.accepted, b.accepted);
accepted = isAfter(accepted, committed) ? accepted : null;
promised = latest(a.promised, b.promised);
promisedWrite = latest(a.promisedWrite, b.promisedWrite);
}
return new Snapshot(promised, promisedWrite, accepted, committed);
}
Snapshot removeExpired(int nowInSec)
{
boolean isAcceptedExpired = accepted != null && accepted.isExpired(nowInSec);
boolean isCommittedExpired = committed.isExpired(nowInSec);
if (paxosStatePurging() == gc_grace)
{
long expireOlderThan = SECONDS.toMicros(nowInSec - committed.update.metadata().params.gcGraceSeconds);
isAcceptedExpired |= accepted != null && accepted.ballot.unixMicros() < expireOlderThan;
isCommittedExpired |= committed.ballot.unixMicros() < expireOlderThan;
}
if (!isAcceptedExpired && !isCommittedExpired)
return this;
return new Snapshot(promised, promisedWrite,
isAcceptedExpired ? null : accepted,
isCommittedExpired
? Committed.none(committed.update.partitionKey(), committed.update.metadata())
: committed);
}
}
// used to permit recording Committed outcomes without waiting for initial read
public static class UnsafeSnapshot extends Snapshot
{
public UnsafeSnapshot(@Nonnull Committed committed)
{
super(Ballot.none(), Ballot.none(), null, committed);
}
public UnsafeSnapshot(@Nonnull Commit committed)
{
this(new Committed(committed.ballot, committed.update));
}
}
@VisibleForTesting
public static class MaybePromise
{
public enum Outcome { REJECT, PERMIT_READ, PROMISE }
final Snapshot before;
final Snapshot after;
final Ballot supersededBy;
final Outcome outcome;
MaybePromise(Snapshot before, Snapshot after, Ballot supersededBy, Outcome outcome)
{
this.before = before;
this.after = after;
this.supersededBy = supersededBy;
this.outcome = outcome;
}
static MaybePromise promise(Snapshot before, Snapshot after)
{
return new MaybePromise(before, after, null, PROMISE);
}
static MaybePromise permitRead(Snapshot before, Ballot supersededBy)
{
return new MaybePromise(before, before, supersededBy, PERMIT_READ);
}
static MaybePromise reject(Snapshot snapshot, Ballot supersededBy)
{
return new MaybePromise(snapshot, snapshot, supersededBy, REJECT);
}
public Outcome outcome()
{
return outcome;
}
public Ballot supersededBy()
{
return supersededBy;
}
}
@Nemesis private static final AtomicReferenceFieldUpdater<PaxosState, Snapshot> currentUpdater = AtomicReferenceFieldUpdater.newUpdater(PaxosState.class, Snapshot.class, "current");
final Key key;
private int active; // current number of active referents (once drops to zero, we remove the global entry)
@Nemesis private volatile Snapshot current;
@Nemesis private volatile Thread lockedBy;
@Nemesis private volatile int waiting;
private static final AtomicReferenceFieldUpdater<PaxosState, Thread> lockedByUpdater = AtomicReferenceFieldUpdater.newUpdater(PaxosState.class, Thread.class, "lockedBy");
private PaxosState(Key key, Snapshot current)
{
this.key = key;
this.current = current;
}
@VisibleForTesting
public static PaxosState get(Commit commit)
{
return get(commit.update.partitionKey(), commit.update.metadata());
}
public static PaxosState get(DecoratedKey partitionKey, TableMetadata table)
{
// TODO would be nice to refactor verb handlers to support re-submitting to executor if waiting for another thread to read state
return getUnsafe(partitionKey, table).maybeLoad();
}
// does not increment total number of accessors, since we would accept null (so only access if others are, not for own benefit)
private static PaxosState tryGetUnsafe(DecoratedKey partitionKey, TableMetadata metadata)
{
return ACTIVE.compute(new Key(partitionKey, metadata), (key, cur) -> {
if (cur == null)
{
Snapshot saved = RECENT.remove(key);
if (saved != null)
//noinspection resource
cur = new PaxosState(key, saved);
}
if (cur != null)
++cur.active;
return cur;
});
}
private static PaxosState getUnsafe(DecoratedKey partitionKey, TableMetadata metadata)
{
return ACTIVE.compute(new Key(partitionKey, metadata), (key, cur) -> {
if (cur == null)
{
//noinspection resource
cur = new PaxosState(key, RECENT.remove(key));
}
++cur.active;
return cur;
});
}
private static PaxosState getUnsafe(Commit commit)
{
return getUnsafe(commit.update.partitionKey(), commit.update.metadata());
}
// don't increment the total count, as we are only using this for locking purposes when coordinating
@VisibleForTesting
public static PaxosOperationLock lock(DecoratedKey partitionKey, TableMetadata metadata, long deadline, ConsistencyLevel consistencyForConsensus, boolean isWrite) throws RequestTimeoutException
{
if (DISABLE_COORDINATOR_LOCKING)
return PaxosOperationLock.noOp();
PaxosState lock = ACTIVE.compute(new Key(partitionKey, metadata), (key, cur) -> {
if (cur == null)
cur = new PaxosState(key, RECENT.remove(key));
++cur.active;
return cur;
});
try
{
if (!lock.lock(deadline))
throw throwTimeout(metadata, consistencyForConsensus, isWrite);
return lock;
}
catch (Throwable t)
{
lock.close();
throw t;
}
}
private static RequestTimeoutException throwTimeout(TableMetadata metadata, ConsistencyLevel consistencyForConsensus, boolean isWrite)
{
int blockFor = consistencyForConsensus.blockFor(Keyspace.open(metadata.keyspace).getReplicationStrategy());
throw isWrite
? new WriteTimeoutException(WriteType.CAS, consistencyForConsensus, 0, blockFor)
: new ReadTimeoutException(consistencyForConsensus, 0, blockFor, false);
}
private PaxosState maybeLoad()
{
try
{
Snapshot current = this.current;
if (current == null || current instanceof UnsafeSnapshot)
{
synchronized (this)
{
current = this.current;
if (current == null || current instanceof UnsafeSnapshot)
{
Snapshot snapshot = SystemKeyspace.loadPaxosState(key.partitionKey, key.metadata, 0);
currentUpdater.accumulateAndGet(this, snapshot, Snapshot::merge);
}
}
}
}
catch (Throwable t)
{
try { close(); } catch (Throwable t2) { t.addSuppressed(t2); }
throw t;
}
return this;
}
private boolean lock(long deadline)
{
try
{
Thread thread = Thread.currentThread();
if (lockedByUpdater.compareAndSet(this, null, thread))
return true;
synchronized (this)
{
waiting++;
try
{
while (true)
{
if (lockedByUpdater.compareAndSet(this, null, thread))
return true;
while (lockedBy != null)
{
long now = nanoTime();
if (now >= deadline)
return false;
wait(1 + ((deadline - now) - 1) / 1000000);
}
}
}
finally
{
waiting--;
}
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return false;
}
}
private void maybeUnlock()
{
// no visibility requirements, as if we hold the lock it was last updated by us
if (lockedBy == null)
return;
Thread thread = Thread.currentThread();
if (lockedBy == thread)
{
lockedBy = null;
if (waiting > 0)
{
synchronized (this)
{
notify();
}
}
}
}
public void close()
{
maybeUnlock();
ACTIVE.compute(key, (key, cur) ->
{
assert cur != null;
if (--cur.active > 0)
return cur;
Snapshot stash = cur.current;
if (stash != null && stash.getClass() == Snapshot.class)
RECENT.put(key, stash);
return null;
});
}
Snapshot current(Ballot ballot)
{
return current((int)ballot.unix(SECONDS));
}
Snapshot current(int nowInSec)
{
// CASSANDRA-12043 is not an issue for v2, as we perform Commit+Prepare and PrepareRefresh
// which are able to make progress whether or not the old commit is shadowed by the TTL (since they
// depend only on the write being successful, not the data being read again later).
// However, we still use nowInSec to guard reads to ensure we do not log any linearizability violations
// due to discrepancies in gc grace handling
Snapshot current = this.current;
if (current == null || current.getClass() != Snapshot.class)
throw new IllegalStateException();
return current.removeExpired(nowInSec);
}
@VisibleForTesting
public Snapshot currentSnapshot()
{
return current;
}
@VisibleForTesting
public void updateStateUnsafe(Function<Snapshot, Snapshot> f)
{
current = f.apply(current);
}
/**
* Record the requested ballot as promised if it is newer than our current promise; otherwise do nothing.
* @return a PromiseResult containing the before and after state for this operation
*/
public MaybePromise promiseIfNewer(Ballot ballot, boolean isWrite)
{
Snapshot before, after;
while (true)
{
Snapshot realBefore = current;
before = realBefore.removeExpired((int)ballot.unix(SECONDS));
Ballot latestWriteOrLowBound = before.latestWriteOrLowBound();
Ballot latest = before.latestWitnessedOrLowBound(latestWriteOrLowBound);
if (isAfter(ballot, latest))
{
after = new Snapshot(ballot, isWrite ? ballot : before.promisedWrite, before.accepted, before.committed);
if (currentUpdater.compareAndSet(this, before, after))
{
// It doesn't matter if a later operation witnesses this before it's persisted,
// as it can only lead to rejecting a promise which leaves no persistent state
// (and it's anyway safe to arbitrarily reject promises)
if (isWrite)
{
Tracing.trace("Promising read/write ballot {}", ballot);
SystemKeyspace.savePaxosWritePromise(key.partitionKey, key.metadata, ballot);
}
else
{
Tracing.trace("Promising read ballot {}", ballot);
SystemKeyspace.savePaxosReadPromise(key.partitionKey, key.metadata, ballot);
}
return MaybePromise.promise(before, after);
}
}
else if (isAfter(ballot, latestWriteOrLowBound))
{
Tracing.trace("Permitting only read by ballot {}", ballot);
return MaybePromise.permitRead(before, latest);
}
else
{
Tracing.trace("Promise rejected; {} older than {}", ballot, latest);
return MaybePromise.reject(before, latest);
}
Snapshot realAfter = new Snapshot(ballot, isWrite ? ballot : realBefore.promisedWrite, realBefore.accepted, realBefore.committed);
after = new Snapshot(ballot, realAfter.promisedWrite, before.accepted, before.committed);
if (currentUpdater.compareAndSet(this, realBefore, realAfter))
break;
}
// It doesn't matter if a later operation witnesses this before it's persisted,
// as it can only lead to rejecting a promise which leaves no persistent state
// (and it's anyway safe to arbitrarily reject promises)
Tracing.trace("Promising ballot {}", ballot);
if (isWrite) SystemKeyspace.savePaxosWritePromise(key.partitionKey, key.metadata, ballot);
else SystemKeyspace.savePaxosReadPromise(key.partitionKey, key.metadata, ballot);
return MaybePromise.promise(before, after);
}
/**
* Record an acceptance of the proposal if there is no newer promise; otherwise inform the caller of the newer ballot
*/
public Ballot acceptIfLatest(Proposal proposal)
{
if (paxosStatePurging() == legacy && !(proposal instanceof AcceptedWithTTL))
proposal = AcceptedWithTTL.withDefaultTTL(proposal);
// state.promised can be null, because it is invalidated by committed;
// we may also have accepted a newer proposal than we promised, so we confirm that we are the absolute newest
// (or that we have the exact same ballot as our promise, which is the typical case)
Snapshot before, after;
while (true)
{
Snapshot realBefore = current;
before = realBefore.removeExpired((int)proposal.ballot.unix(SECONDS));
Ballot latest = before.latestWitnessedOrLowBound();
if (!proposal.isSameOrAfter(latest))
{
Tracing.trace("Rejecting proposal {}; latest is now {}", proposal.ballot, latest);
return latest;
}
if (proposal.hasSameBallot(before.committed)) // TODO: consider not answering
return null; // no need to save anything, or indeed answer at all
after = new Snapshot(realBefore.promised, realBefore.promisedWrite, proposal.accepted(), realBefore.committed);
if (currentUpdater.compareAndSet(this, realBefore, after))
break;
}
// It is more worrisome to permit witnessing an accepted proposal before we have persisted it
// because this has more tangible effects on the recipient, but again it is safe: either it is
// - witnessed to reject (which is always safe, as it prevents rather than creates an outcome); or
// - witnessed as an in progress proposal
// in the latter case, for there to be any effect on the state the proposal must be re-proposed, or not,
// on its own terms, and must
// be persisted by the re-proposer, and so it remains a non-issue
// though this
Tracing.trace("Accepting proposal {}", proposal);
SystemKeyspace.savePaxosProposal(proposal);
return null;
}
public void commit(Agreed commit)
{
applyCommit(commit, this, (apply, to) ->
currentUpdater.accumulateAndGet(to, new UnsafeSnapshot(apply), Snapshot::merge)
);
}
public static void commitDirect(Commit commit)
{
applyCommit(commit, null, (apply, ignore) -> {
try (PaxosState state = tryGetUnsafe(apply.update.partitionKey(), apply.update.metadata()))
{
if (state != null)
currentUpdater.accumulateAndGet(state, new UnsafeSnapshot(apply), Snapshot::merge);
}
});
}
private static void applyCommit(Commit commit, PaxosState state, BiConsumer<Commit, PaxosState> postCommit)
{
if (paxosStatePurging() == legacy && !(commit instanceof CommittedWithTTL))
commit = CommittedWithTTL.withDefaultTTL(commit);
long start = nanoTime();
try
{
// TODO: run Paxos Repair before truncate so we can excise this
// The table may have been truncated since the proposal was initiated. In that case, we
// don't want to perform the mutation and potentially resurrect truncated data
if (commit.ballot.unixMicros() >= SystemKeyspace.getTruncatedAt(commit.update.metadata().id))
{
Tracing.trace("Committing proposal {}", commit);
Mutation mutation = commit.makeMutation();
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
}
else
{
Tracing.trace("Not committing proposal {} as ballot timestamp predates last truncation time", commit);
}
// for commits we save to disk first, because we can; even here though it is safe to permit later events to
// witness the state before it is persisted. The only tricky situation is that we use the witnessing of
// a quorum of nodes having witnessed the latest commit to decide if we need to disseminate a commit
// again before proceeding with any new operation, but in this case we have already persisted the relevant
// information, namely the base table mutation. So this fact is persistent, even if knowldge of this fact
// is not (and if this is lost, it may only lead to a future operation unnecessarily committing again)
SystemKeyspace.savePaxosCommit(commit);
postCommit.accept(commit, state);
}
finally
{
Keyspace.openAndGetStore(commit.update.metadata()).metric.casCommit.addNano(nanoTime() - start);
}
}
public static PrepareResponse legacyPrepare(Commit toPrepare)
{
long start = nanoTime();
try (PaxosState unsafeState = getUnsafe(toPrepare))
{
synchronized (unsafeState.key)
{
unsafeState.maybeLoad();
assert unsafeState.current != null;
while (true)
{
// ignore nowInSec when merging as this can only be an issue during the transition period, so the unbounded
// problem of CASSANDRA-12043 is not an issue
Snapshot realBefore = unsafeState.current;
Snapshot before = realBefore.removeExpired((int)toPrepare.ballot.unix(SECONDS));
Ballot latest = before.latestWitnessedOrLowBound();
if (toPrepare.isAfter(latest))
{
Snapshot after = new Snapshot(toPrepare.ballot, toPrepare.ballot, realBefore.accepted, realBefore.committed);
if (currentUpdater.compareAndSet(unsafeState, realBefore, after))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
DecoratedKey partitionKey = toPrepare.update.partitionKey();
TableMetadata metadata = toPrepare.update.metadata();
SystemKeyspace.savePaxosWritePromise(partitionKey, metadata, toPrepare.ballot);
return new PrepareResponse(true, before.accepted == null ? Accepted.none(partitionKey, metadata) : before.accepted, before.committed);
}
}
else
{
Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, before.promised);
// return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667)
return new PrepareResponse(false, new Commit(before.promised, toPrepare.update), before.committed);
}
}
}
}
finally
{
Keyspace.openAndGetStore(toPrepare.update.metadata()).metric.casPrepare.addNano(nanoTime() - start);
}
}
public static Boolean legacyPropose(Commit proposal)
{
if (paxosStatePurging() == legacy && !(proposal instanceof AcceptedWithTTL))
proposal = AcceptedWithTTL.withDefaultTTL(proposal);
long start = nanoTime();
try (PaxosState unsafeState = getUnsafe(proposal))
{
synchronized (unsafeState.key)
{
unsafeState.maybeLoad();
assert unsafeState.current != null;
while (true)
{
Snapshot realBefore = unsafeState.current;
Snapshot before = realBefore.removeExpired((int)proposal.ballot.unix(SECONDS));
boolean accept = proposal.isSameOrAfter(before.latestWitnessedOrLowBound());
if (accept)
{
if (proposal.hasSameBallot(before.committed) ||
currentUpdater.compareAndSet(unsafeState, realBefore,
new Snapshot(realBefore.promised, realBefore.promisedWrite,
new Accepted(proposal), realBefore.committed)))
{
Tracing.trace("Accepting proposal {}", proposal);
SystemKeyspace.savePaxosProposal(proposal);
return true;
}
}
else
{
Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, before.promised);
return false;
}
}
}
}
finally
{
Keyspace.openAndGetStore(proposal.update.metadata()).metric.casPropose.addNano(nanoTime() - start);
}
}
public static void unsafeReset()
{
ACTIVE.clear();
RECENT.clear();
ballotTracker().truncate();
}
@SuppressWarnings("resource")
public static Snapshot unsafeGetIfPresent(DecoratedKey partitionKey, TableMetadata metadata)
{
Key key = new Key(partitionKey, metadata);
PaxosState cur = ACTIVE.get(key);
if (cur != null) return cur.current;
return RECENT.get(key);
}
}