Deprecate use of gossip state for paxos electorate verification
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson & Benedict Elliott
Smith for CASSANDRA-19904
diff --git a/CHANGES.txt b/CHANGES.txt
index 729e2e1..8f80e4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Deprecate use of gossip state for paxos electorate verification (CASSANDRA-19904)
* Update dtest-api to 0.0.17 to fix jvm17 crash in jvm-dtests (CASSANDRA-19239)
* Add resource leak test and Update Netty to 4.1.113.Final to fix leak (CASSANDRA-19783)
* Fix incorrect nodetool suggestion when gossip mode is running (CASSANDRA-19905)
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 311ac51..3412add 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -95,6 +95,7 @@
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.triggers.TriggerExecutor;
@@ -253,23 +254,22 @@
return Iterators.concat(natural.iterator(), pending.iterator());
}
- static Electorate get(TableMetadata table, DecoratedKey key, ConsistencyLevel consistency)
+ static Electorate.Local get(TableMetadata table, DecoratedKey key, ConsistencyLevel consistency)
{
// MetaStrategy distributes the entire keyspace to all replicas. In addition, its tables (currently only
// the dist log table) don't use the globally configured partitioner. For these reasons we don't lookup the
// replicas using the supplied token as this can actually be of the incorrect type (for example when
// performing Paxos repair).
final Token token = table.partitioner == MetaStrategy.partitioner ? MetaStrategy.entireRange.right : key.getToken();
- return get(consistency, forTokenWriteLiveAndDown(Keyspace.open(table.keyspace), token));
- }
-
- static Electorate get(ConsistencyLevel consistency, ForTokenWrite all)
- {
- ForTokenWrite electorate = all;
+ ClusterMetadata metadata = ClusterMetadata.current();
+ Keyspace keyspace = Keyspace.open(table.keyspace);
+ DataPlacement placement = metadata.placements.get(keyspace.getMetadata().params.replication);
+ Epoch epoch = placement.writes.forToken(token).lastModified();
+ ForTokenWrite electorate = forTokenWriteLiveAndDown(metadata, keyspace, token);
if (consistency == LOCAL_SERIAL)
- electorate = all.filter(InOurDc.replicas());
+ electorate = electorate.filter(InOurDc.replicas());
- return new Electorate(electorate.natural().endpointList(), electorate.pending().endpointList());
+ return new Local(epoch, electorate.natural().endpointList(), electorate.pending().endpointList());
}
boolean hasPending()
@@ -285,9 +285,12 @@
public boolean equals(Object o)
{
if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (o == null || !Electorate.class.isAssignableFrom(o.getClass())) return false;
Electorate that = (Electorate) o;
- return natural.equals(that.natural) && pending.equals(that.pending);
+ return natural.size() == that.natural.size() &&
+ pending.size() == that.pending.size() &&
+ natural.containsAll(that.natural) &&
+ pending.containsAll(that.pending);
}
public int hashCode()
@@ -321,6 +324,16 @@
CollectionSerializer.serializedSizeCollection(inetAddressAndPortSerializer, electorate.pending, version);
}
}
+
+ static class Local extends Electorate
+ {
+ final Epoch createdAt;
+ public Local(Epoch createdAt, Collection<InetAddressAndPort> natural, Collection<InetAddressAndPort> pending)
+ {
+ super(natural, pending);
+ this.createdAt = createdAt;
+ }
+ }
}
/**
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
index 84a18b2..3b04acd 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
@@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -55,6 +54,7 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.PaxosPrepare.Status.Outcome;
import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.vint.VIntCoding;
@@ -459,26 +459,42 @@
}
Permitted permitted = response.permitted();
- if (permitted.gossipInfo.isEmpty())
- // we agree about the electorate, so can simply accept the promise/permission
- permitted(permitted, from);
- else if (!needsGossipUpdate(permitted.gossipInfo))
- // our gossip is up-to-date, but our original electorate could have been built with stale gossip, so verify it
- permittedOrTerminateIfElectorateMismatch(permitted, from);
- else
- // otherwise our beliefs about the ring potentially diverge, so update gossip with the peer's information
- Stage.GOSSIP.executor().execute(() -> {
- Gossiper.instance.notifyFailureDetector(permitted.gossipInfo);
- Gossiper.instance.applyStateLocally(permitted.gossipInfo);
- // TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager
- // (which currently drops some migration tasks on the floor).
- // Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout.
- // once any pending ranges have been calculated, refresh our Participants list and submit the promise
- // todo: verify that this is correct, we no longer have any pending ranges, just call this immediately
- // PendingRangeCalculatorService.instance.executeWhenFinished(() -> permittedOrTerminateIfElectorateMismatch(permitted, from));
- permittedOrTerminateIfElectorateMismatch(permitted, from);
- });
+ // If the peer's local electorate disagreed with ours it will be signalled in the permitted response.
+ // Pre 5.1 this used gossip state to assess the relative currency of either peer's view of the ring/placements
+ // from which the electorate is derived. Post 5.1, this is driven by cluster metadata rather than gossip but we
+ // preserve the signalling via gossip state for continuity during upgrades
+ Epoch remoteElectorateEpoch = permitted.electorateEpoch;
+
+ if (remoteElectorateEpoch.is(Epoch.EMPTY) && permitted.gossipInfo.isEmpty())
+ {
+ // we agree about the electorate, so can simply accept the promise/permission
+ // TODO: once 5.1 is the minimum supported version, we can stop sending and checking gossipInfo and just
+ // use the electorateEpoch
+ permitted(permitted, from);
+ }
+ else if (remoteElectorateEpoch.isAfter(Epoch.EMPTY))
+ {
+ // The remote peer sent back an epoch for its local electorate, implying that it did not match our original.
+ // That epoch may be after the one we built the original from, so catch up if we need to and haven't
+ // already. Either way, verify the electorate is still valid according to the current topology.
+ ClusterMetadataService.instance().fetchLogFromPeerOrCMS(ClusterMetadata.current(), from, remoteElectorateEpoch);
+ permittedOrTerminateIfElectorateMismatch(permitted, from);
+ }
+ else
+ {
+ // The remote peer indicated a mismatch, but is either still running a pre-5.1 version or we have not yet
+ // initialized the CMS following upgrade to 5.1. Topology changes while in this state are not supported,
+ // failed nodes must be DOWN during upgrade and should be replaced after the CMS has been initialized.
+ if (needsGossipUpdate(permitted.gossipInfo))
+ {
+ // Our gossip state is lagging behind that of our peer, however topology changes are no longer driven
+ // by gossip. We can notify the FD using the peer's gossip state and re-assert that the electorate
+ // is still valid.
+ Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.notifyFailureDetector(permitted.gossipInfo));
+ }
+ permittedOrTerminateIfElectorateMismatch(permitted, from);
+ }
}
private synchronized void permittedOrTerminateIfElectorateMismatch(Permitted permitted, InetAddressAndPort from)
@@ -981,8 +997,9 @@
// it would be great if we could get rid of this, but probably we need to preserve for migration purposes
final Map<InetAddressAndPort, EndpointState> gossipInfo;
@Nullable final Ballot supersededBy;
+ final Epoch electorateEpoch;
- Permitted(MaybePromise.Outcome outcome, long lowBound, @Nullable Accepted latestAcceptedButNotCommitted, Committed latestCommitted, @Nullable ReadResponse readResponse, boolean hadProposalStability, Map<InetAddressAndPort, EndpointState> gossipInfo, @Nullable Ballot supersededBy)
+ Permitted(MaybePromise.Outcome outcome, long lowBound, @Nullable Accepted latestAcceptedButNotCommitted, Committed latestCommitted, @Nullable ReadResponse readResponse, boolean hadProposalStability, Map<InetAddressAndPort, EndpointState> gossipInfo, Epoch electorateEpoch, @Nullable Ballot supersededBy)
{
super(outcome);
this.lowBound = lowBound;
@@ -991,13 +1008,14 @@
this.hadProposalStability = hadProposalStability;
this.readResponse = readResponse;
this.gossipInfo = gossipInfo;
+ this.electorateEpoch = electorateEpoch;
this.supersededBy = supersededBy;
}
@Override
public String toString()
{
- return "Promise(" + latestAcceptedButNotCommitted + ", " + latestCommitted + ", " + hadProposalStability + ", " + gossipInfo + ')';
+ return "Promise(" + latestAcceptedButNotCommitted + ", " + latestCommitted + ", " + hadProposalStability + ", " + gossipInfo + ", " + electorateEpoch.getEpoch() + ')';
}
}
@@ -1055,8 +1073,15 @@
{
case PROMISE:
case PERMIT_READ:
- // verify electorates; if they differ, send back gossip info for superset of two participant sets
- Map<InetAddressAndPort, EndpointState> gossipInfo = verifyElectorate(request.electorate, Electorate.get(request.table, request.partitionKey, consistency(request.ballot)));
+ // verify electorates; if they differ, send back indication of the mismatch. For use during an
+ // upgrade this includes gossip info for the superset of thes two participant sets. For ongoing
+ // usage we just include the epoch of the data placements used to construct the local electorate.
+ Electorate.Local localElectorate = Electorate.get(request.table,
+ request.partitionKey,
+ consistency(request.ballot));
+ Map<InetAddressAndPort, EndpointState> gossipInfo = verifyElectorate(request.electorate, localElectorate);
+ // TODO when 5.1 is the minimum supported version we can modify verifyElectorate to just return this epoch
+ Epoch electorateEpoch = gossipInfo.isEmpty() ? Epoch.EMPTY : localElectorate.createdAt;
ReadResponse readResponse = null;
// Check we cannot race with a proposal, i.e. that we have not made a promise that
@@ -1100,7 +1125,7 @@
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(request.table.id);
long lowBound = cfs.getPaxosRepairLowBound(request.partitionKey).uuidTimestamp();
- return new Permitted(result.outcome, lowBound, acceptedButNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
+ return new Permitted(result.outcome, lowBound, acceptedButNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, electorateEpoch, supersededBy);
case REJECT:
return new Rejected(result.supersededBy());
@@ -1208,6 +1233,8 @@
if (promised.readResponse != null)
ReadResponse.serializer.serialize(promised.readResponse, out, version);
serializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, promised.gossipInfo, out, version);
+ if (version >= MessagingService.VERSION_51)
+ Epoch.messageSerializer.serialize(promised.electorateEpoch, out, version);
if (promised.outcome == PERMIT_READ)
promised.supersededBy.serialize(out);
}
@@ -1228,12 +1255,13 @@
Committed committed = Committed.serializer.deserialize(in, version);
ReadResponse readResponse = (flags & 4) != 0 ? ReadResponse.serializer.deserialize(in, version) : null;
Map<InetAddressAndPort, EndpointState> gossipInfo = deserializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, newHashMap(), in, version);
+ Epoch electorateEpoch = version >= MessagingService.VERSION_51 ? Epoch.messageSerializer.deserialize(in, version) : Epoch.EMPTY;
MaybePromise.Outcome outcome = (flags & 16) != 0 ? PERMIT_READ : PROMISE;
boolean hasProposalStability = (flags & 8) != 0;
Ballot supersededBy = null;
if (outcome == PERMIT_READ)
supersededBy = Ballot.deserialize(in);
- return new Permitted(outcome, lowBound, acceptedNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
+ return new Permitted(outcome, lowBound, acceptedNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, electorateEpoch, supersededBy);
}
}
@@ -1252,6 +1280,7 @@
+ Committed.serializer.serializedSize(permitted.latestCommitted, version)
+ (permitted.readResponse == null ? 0 : ReadResponse.serializer.serializedSize(permitted.readResponse, version))
+ serializedSizeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, permitted.gossipInfo, version)
+ + (version >= MessagingService.VERSION_51 ? Epoch.messageSerializer.serializedSize(permitted.electorateEpoch, version) : 0)
+ (permitted.outcome == PERMIT_READ ? Ballot.sizeInBytes() : 0);
}
}
diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
index ac9da8e..a375481 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
@@ -31,13 +31,14 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.schema.Schema;
@@ -74,9 +75,11 @@
}
/**
- * We run paxos repair as part of topology changes, so we include the local endpoint state in the paxos repair
- * prepare message to prevent racing with gossip dissemination and guarantee that every repair participant is aware
- * of the pending ring change during repair.
+ * We run paxos repair as part of topology changes, so prior to 5.1 we would include the local endpoint state in
+ * the paxos repair prepare message to prevent racing with gossip dissemination and guarantee that every repair
+ * participant is aware of the pending ring change during repair. This is now deprecated as topology changes are no
+ * longer driven by gossip state. We continue to include the state in internode messages temporarily for
+ * compatibility during upgrades.
*/
public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges, boolean isUrgent)
{
@@ -113,27 +116,10 @@
trySuccess(new PaxosCleanupHistory(table, maxBallot, history));
}
- private static void maybeUpdateTopology(SharedContext ctx, InetAddressAndPort endpoint, EndpointState remote)
- {
- EndpointState local = ctx.gossiper().getEndpointStateForEndpoint(endpoint);
- if (local == null || local.isSupersededBy(remote))
- {
- logger.trace("updating endpoint info for {} with {}", endpoint, remote);
- Map<InetAddressAndPort, EndpointState> states = Collections.singletonMap(endpoint, remote);
-
- Gossiper.runInGossipStageBlocking(() -> {
- ctx.gossiper().notifyFailureDetector(states);
- ctx.gossiper().applyStateLocally(states);
- });
- // TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager
- // (which currently drops some migration tasks on the floor).
- // Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout.
- }
- }
-
public static class Request
{
final TableId tableId;
+ @Deprecated(since = "5.1")
final EndpointState epState;
final Collection<Range<Token>> ranges;
@@ -150,7 +136,9 @@
public void serialize(Request request, DataOutputPlus out, int version) throws IOException
{
request.tableId.serialize(out);
- EndpointState.serializer.serialize(request.epState, out, version);
+ // Post-5.1 topology is not driven by gossip state
+ if (version < MessagingService.VERSION_51)
+ EndpointState.serializer.serialize(request.epState, out, version);
out.writeInt(request.ranges.size());
for (Range<Token> rt : request.ranges)
AbstractBounds.tokenSerializer.serialize(rt, out, version);
@@ -159,7 +147,10 @@
public Request deserialize(DataInputPlus in, int version) throws IOException
{
TableId tableId = TableId.deserialize(in);
- EndpointState epState = EndpointState.serializer.deserialize(in, version);
+ EndpointState epState = version < MessagingService.VERSION_51
+ ? EndpointState.serializer.deserialize(in, version)
+ : new EndpointState(HeartBeatState.empty());
+
TableMetadata table = Schema.instance.getTableMetadata(tableId);
IPartitioner partitioner = table != null ? table.partitioner : IPartitioner.global();
int numRanges = in.readInt();
@@ -175,7 +166,8 @@
public long serializedSize(Request request, int version)
{
long size = request.tableId.serializedSize();
- size += EndpointState.serializer.serializedSize(request.epState, version);
+ if (version < MessagingService.VERSION_51)
+ size += EndpointState.serializer.serializedSize(request.epState, version);
size += TypeSizes.sizeof(request.ranges.size());
for (Range<Token> range : request.ranges)
size += AbstractBounds.tokenSerializer.serializedSize(range, version);
@@ -187,7 +179,8 @@
{
return in -> {
ColumnFamilyStore table = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
- maybeUpdateTopology(ctx, in.from(), in.payload.epState);
+ // Note: pre-5.1 we would use gossip state included in the request payload to update topology
+ // prior to cleanup. Topology is no longer derived from gossip state, so this has been removed.
Ballot highBound = newBallot(ballotTracker().getHighBound(), ConsistencyLevel.SERIAL);
PaxosRepairHistory history = table.getPaxosRepairHistoryForRanges(in.payload.ranges);
Message<PaxosCleanupHistory> out = in.responseWith(new PaxosCleanupHistory(table.metadata.id, highBound, history));
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 90b489e..36b6e41 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -32,6 +32,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -257,6 +258,9 @@
protected final Set<ChangeListener.Async> asyncChangeListeners;
protected final LogSpec spec;
+ // for testing - used to inject filters which cause entries to be dropped before appending
+ protected final List<Predicate<Entry>> entryFilters;
+
private LocalLog(LogSpec logSpec)
{
this.spec = logSpec;
@@ -272,6 +276,7 @@
listeners = Sets.newConcurrentHashSet();
changeListeners = Sets.newConcurrentHashSet();
asyncChangeListeners = Sets.newConcurrentHashSet();
+ entryFilters = Lists.newCopyOnWriteArrayList();
}
public void bootstrap(InetAddressAndPort addr)
@@ -351,18 +356,24 @@
{
if (!entries.isEmpty())
{
- if (logger.isDebugEnabled())
- logger.debug("Appending entries to the pending buffer: {}", entries.stream().map(e -> e.epoch).collect(Collectors.toList()));
- pending.addAll(entries);
+ if (!entryFilters.isEmpty())
+ {
+ logger.debug("Appending batch of entries to the pending buffer individually due to presence of entry filters");
+ entries.forEach(this::maybeAppend);
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Appending entries to the pending buffer: {}", entries.stream().map(e -> e.epoch).collect(Collectors.toList()));
+ pending.addAll(entries);
+ }
processPending();
}
}
public void append(Entry entry)
{
- logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
- pending.add(entry);
- processPending();
+ maybeAppend(entry);
}
/**
@@ -383,17 +394,32 @@
// Create a synthetic "force snapshot" transformation to instruct the log to pick up given metadata
ForceSnapshot transformation = new ForceSnapshot(logState.baseState);
Entry newEntry = new Entry(Entry.Id.NONE, epoch, transformation);
- pending.add(newEntry);
+ maybeAppend(newEntry);
}
// Finally, append any additional transformations in the snapshot. Some or all of these could be earlier than the
// currently enacted epoch (if we'd already moved on beyond the epoch of the base state for instance, or if newer
// entries have been received via normal replication), but this is fine as entries will be put in the reorder
// log, and duplicates will be dropped.
- pending.addAll(logState.entries);
+ append(logState.entries);
processPending();
}
+ private void maybeAppend(Entry entry)
+ {
+ for(Predicate<Entry> filter : entryFilters)
+ {
+ if (filter.test(entry))
+ {
+ logger.debug("Not appending entry to the pending buffer due to configured filters: {}", entry.epoch);
+ return;
+ }
+ }
+
+ logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
+ pending.add(entry);
+ }
+
public abstract ClusterMetadata awaitAtLeast(Epoch epoch) throws InterruptedException, TimeoutException;
/**
@@ -600,6 +626,18 @@
ScheduledExecutors.optionalTasks.submit(() -> listener.notifyPostCommit(before, after, fromSnapshot));
}
+ public void addFilter(Predicate<Entry> filter)
+ {
+ logger.debug("Adding filter to pending entry buffer");
+ entryFilters.add(filter);
+ }
+
+ public void clearFilters()
+ {
+ logger.debug("Clearing filters from pending entry buffer");
+ entryFilters.removeAll(entryFilters);
+ }
+
/**
* Essentially same as `ready` but throws an unchecked exception
*/
diff --git a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
index 8693486..cb49723 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
@@ -334,6 +334,9 @@
{
for (Range<Token> otherRange : e.getValue())
{
+ if (!map.containsKey(e.getKey()))
+ continue;
+
for (Range<Token> thisRange : map.get(e.getKey()))
{
if (thisRange.intersects(otherRange))
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index cbfa687..e7ffed2 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1032,18 +1032,24 @@
// Start any instances with auto_bootstrap enabled first, and in series to avoid issues
// with multiple nodes bootstrapping with consistent range movement enabled,
// and then start any instances with it disabled in parallel.
+ // Whichever instance starts first will be the initial CMS for the cluster.
List<I> startSequentially = new ArrayList<>();
List<I> startParallel = new ArrayList<>();
for (int i = 0; i < instances.size(); i++)
{
I instance = instances.get(i);
- if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
+ if ((boolean) instance.config().get("auto_bootstrap"))
startSequentially.add(instance);
else
startParallel.add(instance);
}
+ // If no instances have auto_bootstrap enabled, start the first in the list
+ // so it can become the initial CMS member.
+ if (startSequentially.isEmpty())
+ startSequentially.add(startParallel.remove(0));
+
forEach(startSequentially, i -> {
i.startup(this);
});
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index f37d634..4aeb630 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -77,6 +77,7 @@
import org.apache.cassandra.tcm.Commit;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.utils.Isolated;
@@ -437,6 +438,16 @@
ClusterUtils.waitForCMSToQuiesce(cluster, getClusterMetadataVersion(leader), ignored);
}
+ public static void dropAllEntriesBeginningAt(IInvokableInstance instance, Epoch epoch)
+ {
+ instance.runOnInstance(() -> ClusterMetadataService.instance().log().addFilter(e -> e.epoch.isEqualOrAfter(epoch)));
+ }
+
+ public static void clearEntryFilters(IInvokableInstance instance)
+ {
+ instance.runOnInstance(() -> ClusterMetadataService.instance().log().clearFilters());
+ }
+
public static Callable<Void> pauseBeforeEnacting(IInvokableInstance instance, Epoch epoch)
{
return pauseBeforeEnacting(instance, epoch, 10, TimeUnit.SECONDS);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index e5b1108..80da569 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.test;
import java.io.IOException;
+import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -31,13 +32,25 @@
import org.junit.Test;
import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInstanceConfig.ParameterizedClass;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION;
@@ -56,6 +69,7 @@
import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
import static org.apache.cassandra.net.Verb.READ_REQ;
+import static org.apache.cassandra.service.paxos.MessageHelper.electorateMismatchChecker;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -85,11 +99,22 @@
{
PAXOS_USE_SELF_EXECUTION.setBoolean(false);
TestBaseImpl.beforeClass();
+ // At times during these tests, node1 is going to be blocked from appending entries to its local metadata
+ // log in order to induce divergent views of cluster topology between instances. This precludes it from
+ // acting as the sole CMS member for the cluster. Instead, we make node2 the single CMS member by we
+ // configuring it with auto_bootstrap: true to ensure that it is the first instance to start up. This
+ // also means that it needs to be the seed for the other nodes so that they can discover it when they
+ // startup.
+ ParameterizedClass seeds = new ParameterizedClass(SimpleSeedProvider.class.getName(),
+ Collections.singletonMap("seeds", "127.0.0.2"));
+
Consumer<IInstanceConfig> conf = config -> config
.set("paxos_variant", "v2")
.set("write_request_timeout", REQUEST_TIMEOUT)
.set("cas_contention_timeout", CONTENTION_TIMEOUT)
- .set("request_timeout", REQUEST_TIMEOUT);
+ .set("request_timeout", REQUEST_TIMEOUT)
+ .set("seed_provider", seeds)
+ .set("auto_bootstrap", config.num() == 2);
// TODO: fails with vnode enabled
THREE_NODES = init(Cluster.build(3).withConfig(conf).withoutVNodes().start());
FOUR_NODES = init(Cluster.build(4).withConfig(conf).withoutVNodes().start(), 3);
@@ -389,22 +414,26 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
// make it so {1} is unaware (yet) that {4} is an owner of the token
- FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ joinFully(FOUR_NODES, 4);
int pk = pk(FOUR_NODES, 1, 2);
// {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1}
drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
row(true));
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
for (int i = 1; i <= 3; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
- }
// {4} reads from !{2} => {3, 4}
drop(FOUR_NODES, 4, to(2), to(2), to());
@@ -426,8 +455,11 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
// make it so {1} is unaware (yet) that {4} is an owner of the token
- FOUR_NODES.get(1).acceptOnInstance(CASTestBase::removeFromRing, FOUR_NODES.get(4));
- FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ joinFully(FOUR_NODES, 4);
// {4} promises, accepts and commits on !{2} => {3, 4}
int pk = pk(FOUR_NODES, 1, 2);
@@ -436,6 +468,9 @@
row(true));
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
// {1} promises, accepts and commmits on !{3} => {1, 2}
drop(FOUR_NODES, 1, to(3), to(3), to(3));
assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk),
@@ -460,20 +495,23 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
// make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
- for (int i = 1 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
- }
- for (int i = 2 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ // Now bring {4} to the mid join point
+ joinPartially(FOUR_NODES, 4);
+
+ for (int i = 2; i <= 4; ++i)
FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
- }
int pk = pk(FOUR_NODES, 1, 2);
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
// {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1}
drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
@@ -481,8 +519,7 @@
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
// finish topology change
- for (int i = 1 ; i <= 4 ; ++i)
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+ finishJoin(FOUR_NODES, 4);
// {3} reads from !{2} => {3, 4}
drop(FOUR_NODES, 3, to(2), to(), to());
@@ -504,20 +541,23 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
// make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
- for (int i = 1 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
- }
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ // Now bring {4} to the mid join point
+ joinPartially(FOUR_NODES, 4);
+
for (int i = 2 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
- }
int pk = pk(FOUR_NODES, 1, 2);
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
// {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
@@ -525,8 +565,7 @@
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
// finish topology change
- for (int i = 1 ; i <= 4 ; ++i)
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+ finishJoin(FOUR_NODES, 4);
// {3} reads from !{2} => {3, 4}
drop(FOUR_NODES, 3, to(2), to(), to());
@@ -557,16 +596,15 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
// make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
- for (int i = 1 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
- }
- for (int i = 2 ; i <= 4 ; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ // Now bring {4} to the mid join point
+ joinPartially(FOUR_NODES, 4);
+
+ for (int i = 2; i <= 4; ++i)
FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
- }
int pk = pk(FOUR_NODES, 1, 2);
@@ -586,6 +624,9 @@
drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
// two options: either we can invalidate the previous operation and succeed, or we can complete the previous operation
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
Object[][] result = executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk);
Object[] expectRow;
if (result[0].length == 1)
@@ -601,8 +642,7 @@
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
// finish topology change
- for (int i = 1 ; i <= 4 ; ++i)
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+ finishJoin(FOUR_NODES, 4);
// {3} reads from !{2} => {3, 4}
drop(FOUR_NODES, 3, to(2), to(2), to());
@@ -631,16 +671,15 @@
FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
// make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
- for (int i = 1; i <= 4; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
- }
+ removeFromRing(FOUR_NODES.get(4));
+ // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+ Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+ ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+ // Now bring {4} to the mid join point
+ joinPartially(FOUR_NODES, 4);
+
for (int i = 2; i <= 4; ++i)
- {
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
- }
int pk = pk(FOUR_NODES, 1, 2);
@@ -661,6 +700,9 @@
drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
// two options: either we can invalidate the previous operation and succeed, or we can complete the previous operation
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+ // Allow {1} to append entries into its log again
+ ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
Object[][] result = executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk);
Object[] expectRow;
if (result[0].length == 1)
@@ -676,14 +718,14 @@
FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
// finish topology change
- for (int i = 1; i <= 4; ++i)
- FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+ finishJoin(FOUR_NODES, 4);
// {3} reads from !{2} => {3, 4}
FOUR_NODES.filters().verbs(PAXOS2_PREPARE_REQ.id, PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
FOUR_NODES.filters().verbs(PAXOS2_PROPOSE_REQ.id, PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
assertRows(FOUR_NODES.coordinator(3).execute("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk),
expectRow);
+
}
// TODO: RF changes
@@ -787,4 +829,44 @@
}
+ private void joinFully(Cluster cluster, int node)
+ {
+ IInstanceConfig config = cluster.get(node).config();
+ InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
+ IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.join(address, token));
+ }
+
+ private void joinPartially(Cluster cluster, int node)
+ {
+ IInstanceConfig config = cluster.get(node).config();
+ InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
+ IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.joinPartially(address, token));
+ }
+
+ private void finishJoin(Cluster cluster, int node)
+ {
+ cluster.get(node).runOnInstance(() -> {
+ BootstrapAndJoin plan = ClusterMetadataTestHelper.getBootstrapPlan(ClusterMetadata.current().myNodeId());
+ InProgressSequences.resume(plan);
+ });
+ }
+
+ @Test
+ public void testMatchingElectorates()
+ {
+ // Verify that when the local and remote electorates have not diverged, we don't include redundant
+ // information in the Permitted responses
+ String tableName = tableName("tbl");
+ FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ int pk = pk(FOUR_NODES, 1, 2);
+ IMessageFilters.Matcher matcher = electorateMismatchChecker(FOUR_NODES);
+ FOUR_NODES.filters().verbs(Verb.PAXOS2_PREPARE_RSP.id).from(2, 3, 4).to(1).messagesMatching(matcher).drop();
+ assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
+ row(true));
+ }
+
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
index d0819d1..58b47cc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
@@ -35,11 +35,11 @@
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.HeartBeatState;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
@@ -188,28 +188,9 @@
Assert.assertTrue(Gossiper.instance.isAlive(endpoint));
}
- // reset gossip state so we know of the node being alive only
- public static void removeFromRing(IInstance peer)
+ public static void removeFromRing(IInvokableInstance peer)
{
- try
- {
- IInstanceConfig config = peer.config();
- IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
- Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
- InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
-
- Gossiper.runInGossipStageBlocking(() -> {
- StorageService.instance.onChange(address,
- ApplicationState.STATUS,
- new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L, 0));
- Gossiper.instance.unsafeAnnulEndpoint(address);
- Gossiper.instance.realMarkAlive(address, new EndpointState(new HeartBeatState(0, 0)));
- });
- }
- catch (Throwable e) // UnknownHostException
- {
- throw new RuntimeException(e);
- }
+ peer.runOnInstance(() -> ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort()));
}
public static void assertNotVisibleInRing(IInstance peer)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 99cc316..b3dbdf7 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -252,7 +252,12 @@
public static NodeId nodeId(int nodeIdx)
{
- return ClusterMetadata.current().directory.peerId(addr(nodeIdx));
+ return nodeId(addr(nodeIdx));
+ }
+
+ public static NodeId nodeId(InetAddressAndPort addr)
+ {
+ return ClusterMetadata.current().directory.peerId(addr);
}
public static InetAddressAndPort addr(int nodeIdx)
@@ -365,10 +370,15 @@
public static void leave(int nodeIdx)
{
+ leave(addr(nodeIdx));
+ }
+
+ public static void leave(InetAddressAndPort endpoint)
+ {
try
{
- NodeId nodeId = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName("127.0.0." + nodeIdx));
- LeaveProcess process = lazyLeave(nodeIdx, false);
+ NodeId nodeId = nodeId(endpoint);
+ LeaveProcess process = lazyLeave(endpoint, false);
process.prepareLeave()
.startLeave()
.midLeave()
diff --git a/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java b/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java
new file mode 100644
index 0000000..496f35d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.tcm.Epoch;
+
+import static org.apache.cassandra.distributed.impl.Instance.deserializeMessage;
+import static org.junit.Assert.assertTrue;
+
+public class MessageHelper
+{
+ public static IMessageFilters.Matcher electorateMismatchChecker(final Cluster cluster)
+ {
+ return (from, to, msg) -> {
+ cluster.get(to).runOnInstance(() -> {
+ Message<?> message = deserializeMessage(msg);
+ if (message.payload instanceof PaxosPrepare.Response)
+ {
+ PaxosPrepare.Permitted permitted = ((PaxosPrepare.Response)message.payload).permitted();
+ assertTrue(permitted.gossipInfo.isEmpty());
+ assertTrue(permitted.electorateEpoch.is(Epoch.EMPTY));
+ }
+ });
+ return false;
+ };
+ }
+}
diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
index 19204e4..e564266 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
@@ -104,6 +104,7 @@
).config().num()
).toArray();
+ local.add(new OnClusterMarkDown(actions, leaving));
local.add(new OnClusterRepairRanges(actions, others, true, false, repairRanges));
local.add(new ExecuteNextStep(actions, joining, Transformation.Kind.START_REPLACE));
local.addAll(Quiesce.all(actions));