Deprecate use of gossip state for paxos electorate verification

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson & Benedict Elliott
Smith for CASSANDRA-19904
diff --git a/CHANGES.txt b/CHANGES.txt
index 729e2e1..8f80e4b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Deprecate use of gossip state for paxos electorate verification (CASSANDRA-19904)
  * Update dtest-api to 0.0.17 to fix jvm17 crash in jvm-dtests (CASSANDRA-19239)
  * Add resource leak test and Update Netty to 4.1.113.Final to fix leak (CASSANDRA-19783)
  * Fix incorrect nodetool suggestion when gossip mode is running (CASSANDRA-19905)
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 311ac51..3412add 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -95,6 +95,7 @@
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.triggers.TriggerExecutor;
@@ -253,23 +254,22 @@
             return Iterators.concat(natural.iterator(), pending.iterator());
         }
 
-        static Electorate get(TableMetadata table, DecoratedKey key, ConsistencyLevel consistency)
+        static Electorate.Local get(TableMetadata table, DecoratedKey key, ConsistencyLevel consistency)
         {
             // MetaStrategy distributes the entire keyspace to all replicas. In addition, its tables (currently only
             // the dist log table) don't use the globally configured partitioner. For these reasons we don't lookup the
             // replicas using the supplied token as this can actually be of the incorrect type (for example when
             // performing Paxos repair).
             final Token token = table.partitioner == MetaStrategy.partitioner ? MetaStrategy.entireRange.right : key.getToken();
-            return get(consistency, forTokenWriteLiveAndDown(Keyspace.open(table.keyspace), token));
-        }
-
-        static Electorate get(ConsistencyLevel consistency, ForTokenWrite all)
-        {
-            ForTokenWrite electorate = all;
+            ClusterMetadata metadata = ClusterMetadata.current();
+            Keyspace keyspace = Keyspace.open(table.keyspace);
+            DataPlacement placement = metadata.placements.get(keyspace.getMetadata().params.replication);
+            Epoch epoch = placement.writes.forToken(token).lastModified();
+            ForTokenWrite electorate = forTokenWriteLiveAndDown(metadata, keyspace, token);
             if (consistency == LOCAL_SERIAL)
-                electorate = all.filter(InOurDc.replicas());
+                electorate = electorate.filter(InOurDc.replicas());
 
-            return new Electorate(electorate.natural().endpointList(), electorate.pending().endpointList());
+            return new Local(epoch, electorate.natural().endpointList(), electorate.pending().endpointList());
         }
 
         boolean hasPending()
@@ -285,9 +285,12 @@
         public boolean equals(Object o)
         {
             if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (o == null || !Electorate.class.isAssignableFrom(o.getClass())) return false;
             Electorate that = (Electorate) o;
-            return natural.equals(that.natural) && pending.equals(that.pending);
+            return natural.size() == that.natural.size() &&
+                   pending.size() == that.pending.size() &&
+                   natural.containsAll(that.natural)  &&
+                   pending.containsAll(that.pending);
         }
 
         public int hashCode()
@@ -321,6 +324,16 @@
                        CollectionSerializer.serializedSizeCollection(inetAddressAndPortSerializer, electorate.pending, version);
             }
         }
+
+        static class Local extends Electorate
+        {
+            final Epoch createdAt;
+            public Local(Epoch createdAt, Collection<InetAddressAndPort> natural, Collection<InetAddressAndPort> pending)
+            {
+                super(natural, pending);
+                this.createdAt = createdAt;
+            }
+        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
index 84a18b2..3b04acd 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java
@@ -33,7 +33,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -55,6 +54,7 @@
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.PaxosPrepare.Status.Outcome;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
@@ -459,26 +459,42 @@
         }
 
         Permitted permitted = response.permitted();
-        if (permitted.gossipInfo.isEmpty())
-            // we agree about the electorate, so can simply accept the promise/permission
-            permitted(permitted, from);
-        else if (!needsGossipUpdate(permitted.gossipInfo))
-            // our gossip is up-to-date, but our original electorate could have been built with stale gossip, so verify it
-            permittedOrTerminateIfElectorateMismatch(permitted, from);
-        else
-            // otherwise our beliefs about the ring potentially diverge, so update gossip with the peer's information
-            Stage.GOSSIP.executor().execute(() -> {
-                Gossiper.instance.notifyFailureDetector(permitted.gossipInfo);
-                Gossiper.instance.applyStateLocally(permitted.gossipInfo);
-                // TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager
-                //       (which currently drops some migration tasks on the floor).
-                //       Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout.
 
-                // once any pending ranges have been calculated, refresh our Participants list and submit the promise
-                // todo: verify that this is correct, we no longer have any pending ranges, just call this immediately
-                // PendingRangeCalculatorService.instance.executeWhenFinished(() -> permittedOrTerminateIfElectorateMismatch(permitted, from));
-                permittedOrTerminateIfElectorateMismatch(permitted, from);
-            });
+        // If the peer's local electorate disagreed with ours it will be signalled in the permitted response.
+        // Pre 5.1 this used gossip state to assess the relative currency of either peer's view of the ring/placements
+        // from which the electorate is derived. Post 5.1, this is driven by cluster metadata rather than gossip but we
+        // preserve the signalling via gossip state for continuity during upgrades
+        Epoch remoteElectorateEpoch = permitted.electorateEpoch;
+
+        if (remoteElectorateEpoch.is(Epoch.EMPTY) && permitted.gossipInfo.isEmpty())
+        {
+            // we agree about the electorate, so can simply accept the promise/permission
+            // TODO: once 5.1 is the minimum supported version, we can stop sending and checking gossipInfo and just
+            //       use the electorateEpoch
+            permitted(permitted, from);
+        }
+        else if (remoteElectorateEpoch.isAfter(Epoch.EMPTY))
+        {
+            // The remote peer sent back an epoch for its local electorate, implying that it did not match our original.
+            // That epoch may be after the one we built the original from, so catch up if we need to and haven't
+            // already. Either way, verify the electorate is still valid according to the current topology.
+            ClusterMetadataService.instance().fetchLogFromPeerOrCMS(ClusterMetadata.current(), from, remoteElectorateEpoch);
+            permittedOrTerminateIfElectorateMismatch(permitted, from);
+        }
+        else
+        {
+            // The remote peer indicated a mismatch, but is either still running a pre-5.1 version or we have not yet
+            // initialized the CMS following upgrade to 5.1. Topology changes while in this state are not supported,
+            // failed nodes must be DOWN during upgrade and should be replaced after the CMS has been initialized.
+            if (needsGossipUpdate(permitted.gossipInfo))
+            {
+                // Our gossip state is lagging behind that of our peer, however topology changes are no longer driven
+                // by gossip. We can notify the FD using the peer's gossip state and re-assert that the electorate
+                // is still valid.
+                Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.notifyFailureDetector(permitted.gossipInfo));
+            }
+            permittedOrTerminateIfElectorateMismatch(permitted, from);
+        }
     }
 
     private synchronized void permittedOrTerminateIfElectorateMismatch(Permitted permitted, InetAddressAndPort from)
@@ -981,8 +997,9 @@
         // it would be great if we could get rid of this, but probably we need to preserve for migration purposes
         final Map<InetAddressAndPort, EndpointState> gossipInfo;
         @Nullable final Ballot supersededBy;
+        final Epoch electorateEpoch;
 
-        Permitted(MaybePromise.Outcome outcome, long lowBound, @Nullable Accepted latestAcceptedButNotCommitted, Committed latestCommitted, @Nullable ReadResponse readResponse, boolean hadProposalStability, Map<InetAddressAndPort, EndpointState> gossipInfo, @Nullable Ballot supersededBy)
+        Permitted(MaybePromise.Outcome outcome, long lowBound, @Nullable Accepted latestAcceptedButNotCommitted, Committed latestCommitted, @Nullable ReadResponse readResponse, boolean hadProposalStability, Map<InetAddressAndPort, EndpointState> gossipInfo, Epoch electorateEpoch, @Nullable Ballot supersededBy)
         {
             super(outcome);
             this.lowBound = lowBound;
@@ -991,13 +1008,14 @@
             this.hadProposalStability = hadProposalStability;
             this.readResponse = readResponse;
             this.gossipInfo = gossipInfo;
+            this.electorateEpoch = electorateEpoch;
             this.supersededBy = supersededBy;
         }
 
         @Override
         public String toString()
         {
-            return "Promise(" + latestAcceptedButNotCommitted + ", " + latestCommitted + ", " + hadProposalStability + ", " + gossipInfo + ')';
+            return "Promise(" + latestAcceptedButNotCommitted + ", " + latestCommitted + ", " + hadProposalStability + ", " + gossipInfo + ", " + electorateEpoch.getEpoch() + ')';
         }
     }
 
@@ -1055,8 +1073,15 @@
             {
                 case PROMISE:
                 case PERMIT_READ:
-                    // verify electorates; if they differ, send back gossip info for superset of two participant sets
-                    Map<InetAddressAndPort, EndpointState> gossipInfo = verifyElectorate(request.electorate, Electorate.get(request.table, request.partitionKey, consistency(request.ballot)));
+                    // verify electorates; if they differ, send back indication of the mismatch. For use during an
+                    // upgrade this includes gossip info for the superset of thes two participant sets. For ongoing
+                    // usage we just include the epoch of the data placements used to construct the local electorate.
+                    Electorate.Local localElectorate = Electorate.get(request.table,
+                                                                      request.partitionKey,
+                                                                      consistency(request.ballot));
+                    Map<InetAddressAndPort, EndpointState> gossipInfo = verifyElectorate(request.electorate, localElectorate);
+                    // TODO when 5.1 is the minimum supported version we can modify verifyElectorate to just return this epoch
+                    Epoch electorateEpoch = gossipInfo.isEmpty() ? Epoch.EMPTY : localElectorate.createdAt;
                     ReadResponse readResponse = null;
 
                     // Check we cannot race with a proposal, i.e. that we have not made a promise that
@@ -1100,7 +1125,7 @@
 
                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(request.table.id);
                     long lowBound = cfs.getPaxosRepairLowBound(request.partitionKey).uuidTimestamp();
-                    return new Permitted(result.outcome, lowBound, acceptedButNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
+                    return new Permitted(result.outcome, lowBound, acceptedButNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, electorateEpoch, supersededBy);
 
                 case REJECT:
                     return new Rejected(result.supersededBy());
@@ -1208,6 +1233,8 @@
                 if (promised.readResponse != null)
                     ReadResponse.serializer.serialize(promised.readResponse, out, version);
                 serializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, promised.gossipInfo, out, version);
+                if (version >= MessagingService.VERSION_51)
+                    Epoch.messageSerializer.serialize(promised.electorateEpoch, out, version);
                 if (promised.outcome == PERMIT_READ)
                     promised.supersededBy.serialize(out);
             }
@@ -1228,12 +1255,13 @@
                 Committed committed = Committed.serializer.deserialize(in, version);
                 ReadResponse readResponse = (flags & 4) != 0 ? ReadResponse.serializer.deserialize(in, version) : null;
                 Map<InetAddressAndPort, EndpointState> gossipInfo = deserializeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, newHashMap(), in, version);
+                Epoch electorateEpoch = version >= MessagingService.VERSION_51 ? Epoch.messageSerializer.deserialize(in, version) : Epoch.EMPTY;
                 MaybePromise.Outcome outcome = (flags & 16) != 0 ? PERMIT_READ : PROMISE;
                 boolean hasProposalStability = (flags & 8) != 0;
                 Ballot supersededBy = null;
                 if (outcome == PERMIT_READ)
                     supersededBy = Ballot.deserialize(in);
-                return new Permitted(outcome, lowBound, acceptedNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, supersededBy);
+                return new Permitted(outcome, lowBound, acceptedNotCommitted, committed, readResponse, hasProposalStability, gossipInfo, electorateEpoch, supersededBy);
             }
         }
 
@@ -1252,6 +1280,7 @@
                         + Committed.serializer.serializedSize(permitted.latestCommitted, version)
                         + (permitted.readResponse == null ? 0 : ReadResponse.serializer.serializedSize(permitted.readResponse, version))
                         + serializedSizeMap(inetAddressAndPortSerializer, EndpointState.nullableSerializer, permitted.gossipInfo, version)
+                        + (version >= MessagingService.VERSION_51 ? Epoch.messageSerializer.serializedSize(permitted.electorateEpoch, version) : 0)
                         + (permitted.outcome == PERMIT_READ ? Ballot.sizeInBytes() : 0);
             }
         }
diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
index ac9da8e..a375481 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
@@ -31,13 +31,14 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
@@ -74,9 +75,11 @@
     }
 
     /**
-     * We run paxos repair as part of topology changes, so we include the local endpoint state in the paxos repair
-     * prepare message to prevent racing with gossip dissemination and guarantee that every repair participant is aware
-     * of the pending ring change during repair.
+     * We run paxos repair as part of topology changes, so prior to 5.1 we would include the local endpoint state in
+     * the paxos repair prepare message to prevent racing with gossip dissemination and guarantee that every repair
+     * participant is aware of the pending ring change during repair. This is now deprecated as topology changes are no
+     * longer driven by gossip state. We continue to include the state in internode messages temporarily for
+     * compatibility during upgrades.
      */
     public static PaxosStartPrepareCleanup prepare(SharedContext ctx, TableId tableId, Collection<InetAddressAndPort> endpoints, EndpointState localEpState, Collection<Range<Token>> ranges, boolean isUrgent)
     {
@@ -113,27 +116,10 @@
             trySuccess(new PaxosCleanupHistory(table, maxBallot, history));
     }
 
-    private static void maybeUpdateTopology(SharedContext ctx, InetAddressAndPort endpoint, EndpointState remote)
-    {
-        EndpointState local = ctx.gossiper().getEndpointStateForEndpoint(endpoint);
-        if (local == null || local.isSupersededBy(remote))
-        {
-            logger.trace("updating endpoint info for {} with {}", endpoint, remote);
-            Map<InetAddressAndPort, EndpointState> states = Collections.singletonMap(endpoint, remote);
-
-            Gossiper.runInGossipStageBlocking(() -> {
-                ctx.gossiper().notifyFailureDetector(states);
-                ctx.gossiper().applyStateLocally(states);
-            });
-            // TODO: We should also wait for schema pulls/pushes, however this would be quite an involved change to MigrationManager
-            //       (which currently drops some migration tasks on the floor).
-            //       Note it would be fine for us to fail to complete the migration task and simply treat this response as a failure/timeout.
-        }
-    }
-
     public static class Request
     {
         final TableId tableId;
+        @Deprecated(since = "5.1")
         final EndpointState epState;
         final Collection<Range<Token>> ranges;
 
@@ -150,7 +136,9 @@
         public void serialize(Request request, DataOutputPlus out, int version) throws IOException
         {
             request.tableId.serialize(out);
-            EndpointState.serializer.serialize(request.epState, out, version);
+            // Post-5.1 topology is not driven by gossip state
+            if (version < MessagingService.VERSION_51)
+                EndpointState.serializer.serialize(request.epState, out, version);
             out.writeInt(request.ranges.size());
             for (Range<Token> rt : request.ranges)
                 AbstractBounds.tokenSerializer.serialize(rt, out, version);
@@ -159,7 +147,10 @@
         public Request deserialize(DataInputPlus in, int version) throws IOException
         {
             TableId tableId = TableId.deserialize(in);
-            EndpointState epState = EndpointState.serializer.deserialize(in, version);
+            EndpointState epState = version < MessagingService.VERSION_51
+                                    ? EndpointState.serializer.deserialize(in, version)
+                                    : new EndpointState(HeartBeatState.empty());
+
             TableMetadata table = Schema.instance.getTableMetadata(tableId);
             IPartitioner partitioner = table != null ? table.partitioner : IPartitioner.global();
             int numRanges = in.readInt();
@@ -175,7 +166,8 @@
         public long serializedSize(Request request, int version)
         {
             long size = request.tableId.serializedSize();
-            size += EndpointState.serializer.serializedSize(request.epState, version);
+            if (version < MessagingService.VERSION_51)
+                size += EndpointState.serializer.serializedSize(request.epState, version);
             size += TypeSizes.sizeof(request.ranges.size());
             for (Range<Token> range : request.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);
@@ -187,7 +179,8 @@
     {
         return in -> {
             ColumnFamilyStore table = Schema.instance.getColumnFamilyStoreInstance(in.payload.tableId);
-            maybeUpdateTopology(ctx, in.from(), in.payload.epState);
+            // Note: pre-5.1 we would use gossip state included in the request payload to update topology
+            // prior to cleanup. Topology is no longer derived from gossip state, so this has been removed.
             Ballot highBound = newBallot(ballotTracker().getHighBound(), ConsistencyLevel.SERIAL);
             PaxosRepairHistory history = table.getPaxosRepairHistoryForRanges(in.payload.ranges);
             Message<PaxosCleanupHistory> out = in.responseWith(new PaxosCleanupHistory(table.metadata.id, highBound, history));
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 90b489e..36b6e41 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -257,6 +258,9 @@
     protected final Set<ChangeListener.Async> asyncChangeListeners;
     protected final LogSpec spec;
 
+    // for testing - used to inject filters which cause entries to be dropped before appending
+    protected final List<Predicate<Entry>> entryFilters;
+
     private LocalLog(LogSpec logSpec)
     {
         this.spec = logSpec;
@@ -272,6 +276,7 @@
         listeners = Sets.newConcurrentHashSet();
         changeListeners = Sets.newConcurrentHashSet();
         asyncChangeListeners = Sets.newConcurrentHashSet();
+        entryFilters = Lists.newCopyOnWriteArrayList();
     }
 
     public void bootstrap(InetAddressAndPort addr)
@@ -351,18 +356,24 @@
     {
         if (!entries.isEmpty())
         {
-            if (logger.isDebugEnabled())
-                logger.debug("Appending entries to the pending buffer: {}", entries.stream().map(e -> e.epoch).collect(Collectors.toList()));
-            pending.addAll(entries);
+            if (!entryFilters.isEmpty())
+            {
+                logger.debug("Appending batch of entries to the pending buffer individually due to presence of entry filters");
+                entries.forEach(this::maybeAppend);
+            }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Appending entries to the pending buffer: {}", entries.stream().map(e -> e.epoch).collect(Collectors.toList()));
+                pending.addAll(entries);
+            }
             processPending();
         }
     }
 
     public void append(Entry entry)
     {
-        logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
-        pending.add(entry);
-        processPending();
+        maybeAppend(entry);
     }
 
     /**
@@ -383,17 +394,32 @@
             // Create a synthetic "force snapshot" transformation to instruct the log to pick up given metadata
             ForceSnapshot transformation = new ForceSnapshot(logState.baseState);
             Entry newEntry = new Entry(Entry.Id.NONE, epoch, transformation);
-            pending.add(newEntry);
+            maybeAppend(newEntry);
         }
 
         // Finally, append any additional transformations in the snapshot. Some or all of these could be earlier than the
         // currently enacted epoch (if we'd already moved on beyond the epoch of the base state for instance, or if newer
         // entries have been received via normal replication), but this is fine as entries will be put in the reorder
         // log, and duplicates will be dropped.
-        pending.addAll(logState.entries);
+        append(logState.entries);
         processPending();
     }
 
+    private void maybeAppend(Entry entry)
+    {
+        for(Predicate<Entry> filter :  entryFilters)
+        {
+            if (filter.test(entry))
+            {
+                logger.debug("Not appending entry to the pending buffer due to configured filters: {}", entry.epoch);
+                return;
+            }
+        }
+
+        logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
+        pending.add(entry);
+    }
+
     public abstract ClusterMetadata awaitAtLeast(Epoch epoch) throws InterruptedException, TimeoutException;
 
     /**
@@ -600,6 +626,18 @@
             ScheduledExecutors.optionalTasks.submit(() -> listener.notifyPostCommit(before, after, fromSnapshot));
     }
 
+    public void addFilter(Predicate<Entry> filter)
+    {
+        logger.debug("Adding filter to pending entry buffer");
+        entryFilters.add(filter);
+    }
+
+    public void clearFilters()
+    {
+        logger.debug("Clearing filters from pending entry buffer");
+        entryFilters.removeAll(entryFilters);
+    }
+
     /**
      * Essentially same as `ready` but throws an unchecked exception
      */
diff --git a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
index 8693486..cb49723 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
@@ -334,6 +334,9 @@
             {
                 for (Range<Token> otherRange : e.getValue())
                 {
+                    if (!map.containsKey(e.getKey()))
+                        continue;
+
                     for (Range<Token> thisRange : map.get(e.getKey()))
                     {
                         if (thisRange.intersects(otherRange))
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index cbfa687..e7ffed2 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1032,18 +1032,24 @@
             // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
             // with multiple nodes bootstrapping with consistent range movement enabled,
             // and then start any instances with it disabled in parallel.
+            // Whichever instance starts first will be the initial CMS for the cluster.
             List<I> startSequentially = new ArrayList<>();
             List<I> startParallel = new ArrayList<>();
             for (int i = 0; i < instances.size(); i++)
             {
                 I instance = instances.get(i);
 
-                if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
+                if ((boolean) instance.config().get("auto_bootstrap"))
                     startSequentially.add(instance);
                 else
                     startParallel.add(instance);
             }
 
+            // If no instances have auto_bootstrap enabled, start the first in the list
+            // so it can become the initial CMS member.
+            if (startSequentially.isEmpty())
+                startSequentially.add(startParallel.remove(0));
+
             forEach(startSequentially, i -> {
                 i.startup(this);
             });
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index f37d634..4aeb630 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -77,6 +77,7 @@
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.ReplicaGroups;
 import org.apache.cassandra.utils.Isolated;
@@ -437,6 +438,16 @@
         ClusterUtils.waitForCMSToQuiesce(cluster, getClusterMetadataVersion(leader), ignored);
     }
 
+    public static void dropAllEntriesBeginningAt(IInvokableInstance instance, Epoch epoch)
+    {
+        instance.runOnInstance(() -> ClusterMetadataService.instance().log().addFilter(e -> e.epoch.isEqualOrAfter(epoch)));
+    }
+
+    public static void clearEntryFilters(IInvokableInstance instance)
+    {
+        instance.runOnInstance(() -> ClusterMetadataService.instance().log().clearFilters());
+    }
+
     public static Callable<Void> pauseBeforeEnacting(IInvokableInstance instance, Epoch epoch)
     {
         return pauseBeforeEnacting(instance, epoch, 10, TimeUnit.SECONDS);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index e5b1108..80da569 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -31,13 +32,25 @@
 import org.junit.Test;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInstanceConfig.ParameterizedClass;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
+import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION;
@@ -56,6 +69,7 @@
 import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
 import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
 import static org.apache.cassandra.net.Verb.READ_REQ;
+import static org.apache.cassandra.service.paxos.MessageHelper.electorateMismatchChecker;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -85,11 +99,22 @@
     {
         PAXOS_USE_SELF_EXECUTION.setBoolean(false);
         TestBaseImpl.beforeClass();
+        // At times during these tests, node1 is going to be blocked from appending entries to its local metadata
+        // log in order to induce divergent views of cluster topology between instances. This precludes it from
+        // acting as the sole CMS member for the cluster. Instead, we make node2 the single CMS member by we
+        // configuring it with auto_bootstrap: true to ensure that it is the first instance to start up. This
+        // also means that it needs to be the seed for the other nodes so that they can discover it when they
+        // startup.
+        ParameterizedClass seeds = new ParameterizedClass(SimpleSeedProvider.class.getName(),
+                                                          Collections.singletonMap("seeds", "127.0.0.2"));
+
         Consumer<IInstanceConfig> conf = config -> config
                                                    .set("paxos_variant", "v2")
                                                    .set("write_request_timeout", REQUEST_TIMEOUT)
                                                    .set("cas_contention_timeout", CONTENTION_TIMEOUT)
-                                                   .set("request_timeout", REQUEST_TIMEOUT);
+                                                   .set("request_timeout", REQUEST_TIMEOUT)
+                                                   .set("seed_provider", seeds)
+                                                   .set("auto_bootstrap", config.num() == 2);
         // TODO: fails with vnode enabled
         THREE_NODES = init(Cluster.build(3).withConfig(conf).withoutVNodes().start());
         FOUR_NODES = init(Cluster.build(4).withConfig(conf).withoutVNodes().start(), 3);
@@ -389,22 +414,26 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
         // make it so {1} is unaware (yet) that {4} is an owner of the token
-        FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        joinFully(FOUR_NODES, 4);
 
         int pk = pk(FOUR_NODES, 1, 2);
 
         // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1}
         drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
         assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
                    row(true));
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
 
         for (int i = 1; i <= 3; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
             FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
-        }
 
         // {4} reads from !{2} => {3, 4}
         drop(FOUR_NODES, 4, to(2), to(2), to());
@@ -426,8 +455,11 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
         // make it so {1} is unaware (yet) that {4} is an owner of the token
-        FOUR_NODES.get(1).acceptOnInstance(CASTestBase::removeFromRing, FOUR_NODES.get(4));
-        FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        joinFully(FOUR_NODES, 4);
 
         // {4} promises, accepts and commits on !{2} => {3, 4}
         int pk = pk(FOUR_NODES, 1, 2);
@@ -436,6 +468,9 @@
                    row(true));
 
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
         // {1} promises, accepts and commmits on !{3} => {1, 2}
         drop(FOUR_NODES, 1, to(3), to(3), to(3));
         assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk),
@@ -460,20 +495,23 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 
         // make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
-        for (int i = 1 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
-        }
-        for (int i = 2 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        // Now bring {4} to the mid join point
+        joinPartially(FOUR_NODES, 4);
+
+        for (int i = 2; i <= 4; ++i)
             FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
-        }
 
         int pk = pk(FOUR_NODES, 1, 2);
 
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
         // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1}
         drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
         assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
@@ -481,8 +519,7 @@
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
 
         // finish topology change
-        for (int i = 1 ; i <= 4 ; ++i)
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+        finishJoin(FOUR_NODES, 4);
 
         // {3} reads from !{2} => {3, 4}
         drop(FOUR_NODES, 3, to(2), to(), to());
@@ -504,20 +541,23 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
         // make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
-        for (int i = 1 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
-        }
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        // Now bring {4} to the mid join point
+        joinPartially(FOUR_NODES, 4);
+
         for (int i = 2 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
             FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
-        }
 
         int pk = pk(FOUR_NODES, 1, 2);
 
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
+
         // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
         drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
         assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
@@ -525,8 +565,7 @@
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
 
         // finish topology change
-        for (int i = 1 ; i <= 4 ; ++i)
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+        finishJoin(FOUR_NODES, 4);
 
         // {3} reads from !{2} => {3, 4}
         drop(FOUR_NODES, 3, to(2), to(), to());
@@ -557,16 +596,15 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
         // make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
-        for (int i = 1 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
-        }
-        for (int i = 2 ; i <= 4 ; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        // Now bring {4} to the mid join point
+        joinPartially(FOUR_NODES, 4);
+
+        for (int i = 2; i <= 4; ++i)
             FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
-        }
 
         int pk = pk(FOUR_NODES, 1, 2);
 
@@ -586,6 +624,9 @@
         drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
         // two options: either we can invalidate the previous operation and succeed, or we can complete the previous operation
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
         Object[][] result = executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk);
         Object[] expectRow;
         if (result[0].length == 1)
@@ -601,8 +642,7 @@
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
 
         // finish topology change
-        for (int i = 1 ; i <= 4 ; ++i)
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+        finishJoin(FOUR_NODES, 4);
 
         // {3} reads from !{2} => {3, 4}
         drop(FOUR_NODES, 3, to(2), to(2), to());
@@ -631,16 +671,15 @@
         FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 
         // make it so {4} is bootstrapping, and this has propagated to only a quorum of other nodes
-        for (int i = 1; i <= 4; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::removeFromRing).accept(FOUR_NODES.get(4));
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
-        }
+        removeFromRing(FOUR_NODES.get(4));
+        // This is the epoch of the START_JOIN transform. We'll make {1} ignore any entry from here on
+        Epoch targetEpoch = ClusterUtils.getCurrentEpoch(FOUR_NODES.get(2)).nextEpoch().nextEpoch(); // +prepare +start
+        ClusterUtils.dropAllEntriesBeginningAt(FOUR_NODES.get(1), targetEpoch);
+        // Now bring {4} to the mid join point
+        joinPartially(FOUR_NODES, 4);
+
         for (int i = 2; i <= 4; ++i)
-        {
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingBootstrapping).accept(FOUR_NODES.get(4));
             FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
-        }
 
         int pk = pk(FOUR_NODES, 1, 2);
 
@@ -661,6 +700,9 @@
         drop(FOUR_NODES, 1, to(3), to(3), to(2, 3));
         // two options: either we can invalidate the previous operation and succeed, or we can complete the previous operation
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertNotVisibleInRing).accept(FOUR_NODES.get(4));
+
+        // Allow {1} to append entries into its log again
+        ClusterUtils.clearEntryFilters(FOUR_NODES.get(1));
         Object[][] result = executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk);
         Object[] expectRow;
         if (result[0].length == 1)
@@ -676,14 +718,14 @@
         FOUR_NODES.get(1).acceptsOnInstance(CASTestBase::assertVisibleInRing).accept(FOUR_NODES.get(4));
 
         // finish topology change
-        for (int i = 1; i <= 4; ++i)
-            FOUR_NODES.get(i).acceptsOnInstance(CASTestBase::addToRingNormal).accept(FOUR_NODES.get(4));
+        finishJoin(FOUR_NODES, 4);
 
         // {3} reads from !{2} => {3, 4}
         FOUR_NODES.filters().verbs(PAXOS2_PREPARE_REQ.id, PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
         FOUR_NODES.filters().verbs(PAXOS2_PROPOSE_REQ.id, PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
         assertRows(FOUR_NODES.coordinator(3).execute("INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ONE, pk),
                    expectRow);
+
     }
 
     // TODO: RF changes
@@ -787,4 +829,44 @@
 
     }
 
+    private void joinFully(Cluster cluster, int node)
+    {
+        IInstanceConfig config = cluster.get(node).config();
+        InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
+        IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+        Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+        cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.join(address, token));
+    }
+
+    private void joinPartially(Cluster cluster, int node)
+    {
+        IInstanceConfig config = cluster.get(node).config();
+        InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
+        IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+        Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+        cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.joinPartially(address, token));
+    }
+
+    private void finishJoin(Cluster cluster, int node)
+    {
+        cluster.get(node).runOnInstance(() -> {
+            BootstrapAndJoin plan = ClusterMetadataTestHelper.getBootstrapPlan(ClusterMetadata.current().myNodeId());
+            InProgressSequences.resume(plan);
+        });
+    }
+
+    @Test
+    public void testMatchingElectorates()
+    {
+        // Verify that when the local and remote electorates have not diverged, we don't include redundant
+        // information in the Permitted responses
+        String tableName = tableName("tbl");
+        FOUR_NODES.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+        int pk = pk(FOUR_NODES, 1, 2);
+        IMessageFilters.Matcher matcher = electorateMismatchChecker(FOUR_NODES);
+        FOUR_NODES.filters().verbs(Verb.PAXOS2_PREPARE_RSP.id).from(2, 3, 4).to(1).messagesMatching(matcher).drop();
+        assertRows(executeWithRetry(FOUR_NODES.coordinator(1), "INSERT INTO " + KEYSPACE + "." + tableName + " (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ONE, pk),
+                   row(true));
+    }
+
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
index d0819d1..58b47cc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
@@ -35,11 +35,11 @@
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.HeartBeatState;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
@@ -188,28 +188,9 @@
         Assert.assertTrue(Gossiper.instance.isAlive(endpoint));
     }
 
-    // reset gossip state so we know of the node being alive only
-    public static void removeFromRing(IInstance peer)
+    public static void removeFromRing(IInvokableInstance peer)
     {
-        try
-        {
-            IInstanceConfig config = peer.config();
-            IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
-            Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
-            InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress());
-
-            Gossiper.runInGossipStageBlocking(() -> {
-                StorageService.instance.onChange(address,
-                                                 ApplicationState.STATUS,
-                                                 new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L, 0));
-                Gossiper.instance.unsafeAnnulEndpoint(address);
-                Gossiper.instance.realMarkAlive(address, new EndpointState(new HeartBeatState(0, 0)));
-            });
-        }
-        catch (Throwable e) // UnknownHostException
-        {
-            throw new RuntimeException(e);
-        }
+        peer.runOnInstance(() -> ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort()));
     }
 
     public static void assertNotVisibleInRing(IInstance peer)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 99cc316..b3dbdf7 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -252,7 +252,12 @@
 
     public static NodeId nodeId(int nodeIdx)
     {
-        return ClusterMetadata.current().directory.peerId(addr(nodeIdx));
+        return nodeId(addr(nodeIdx));
+    }
+
+    public static NodeId nodeId(InetAddressAndPort addr)
+    {
+        return ClusterMetadata.current().directory.peerId(addr);
     }
 
     public static InetAddressAndPort addr(int nodeIdx)
@@ -365,10 +370,15 @@
 
     public static void leave(int nodeIdx)
     {
+            leave(addr(nodeIdx));
+    }
+
+    public static void leave(InetAddressAndPort endpoint)
+    {
         try
         {
-            NodeId nodeId = ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName("127.0.0." + nodeIdx));
-            LeaveProcess process = lazyLeave(nodeIdx, false);
+            NodeId nodeId = nodeId(endpoint);
+            LeaveProcess process = lazyLeave(endpoint, false);
             process.prepareLeave()
                    .startLeave()
                    .midLeave()
diff --git a/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java b/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java
new file mode 100644
index 0000000..496f35d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/service/paxos/MessageHelper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.tcm.Epoch;
+
+import static org.apache.cassandra.distributed.impl.Instance.deserializeMessage;
+import static org.junit.Assert.assertTrue;
+
+public class MessageHelper
+{
+    public static IMessageFilters.Matcher electorateMismatchChecker(final Cluster cluster)
+    {
+        return (from, to, msg) -> {
+            cluster.get(to).runOnInstance(() -> {
+                Message<?> message = deserializeMessage(msg);
+                if (message.payload instanceof PaxosPrepare.Response)
+                {
+                    PaxosPrepare.Permitted permitted = ((PaxosPrepare.Response)message.payload).permitted();
+                    assertTrue(permitted.gossipInfo.isEmpty());
+                    assertTrue(permitted.electorateEpoch.is(Epoch.EMPTY));
+                }
+            });
+            return false;
+        };
+    }
+}
diff --git a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
index 19204e4..e564266 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/cluster/OnClusterReplace.java
@@ -104,6 +104,7 @@
             ).config().num()
             ).toArray();
 
+            local.add(new OnClusterMarkDown(actions, leaving));
             local.add(new OnClusterRepairRanges(actions, others, true, false, repairRanges));
             local.add(new ExecuteNextStep(actions, joining, Transformation.Kind.START_REPLACE));
             local.addAll(Quiesce.all(actions));