blob: 46c4c9ecf6c3f889d3c5f8798a1a0d81828030bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.simulator.paxos;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.simulator.cluster.ClusterActionListener.TopologyChangeValidator;
import org.apache.cassandra.simulator.cluster.Topology;
import static java.util.Arrays.stream;
import static org.apache.cassandra.simulator.systems.NonInterceptible.Permit.REQUIRED;
public class PaxosTopologyChangeVerifier implements TopologyChangeValidator
{
final Cluster cluster;
final String keyspace;
final String table;
final Object id;
Topology topologyBefore;
Ballots.LatestBallots[][] ballotsBefore;
public PaxosTopologyChangeVerifier(Cluster cluster, String keyspace, String table, Object id)
{
this.cluster = cluster;
this.keyspace = keyspace;
this.table = table;
this.id = id;
}
@Override
public void before(Topology before, int[] participatingKeys)
{
this.topologyBefore = before.select(participatingKeys);
this.ballotsBefore = Ballots.read(REQUIRED, cluster, keyspace, table, topologyBefore.primaryKeys, topologyBefore.replicasForKeys, true);
for (int i = 0; i < topologyBefore.primaryKeys.length ; ++i)
{
if (ballotsBefore[i].length != topologyBefore.quorumRf)
throw new AssertionError("Inconsistent ownership/ballot information");
}
}
@Override
public void after(Topology topologyAfter)
{
afterInternal(topologyAfter.select(topologyBefore.primaryKeys));
}
public void afterInternal(Topology topologyAfter)
{
int[] primaryKeys = topologyAfter.primaryKeys;
int quorumBefore = topologyBefore.quorumRf / 2 + 1;
int quorumAfter = topologyAfter.quorumRf / 2 + 1;
Ballots.LatestBallots[][] allBefore = ballotsBefore;
Ballots.LatestBallots[][] allAfter = Ballots.read(REQUIRED, cluster, keyspace, table, primaryKeys, topologyAfter.replicasForKeys, true);
for (int pki = 0; pki < primaryKeys.length; ++pki)
{
Ballots.LatestBallots[] before = allBefore[pki];
Ballots.LatestBallots[] after = allAfter[pki];
if (after.length != topologyAfter.quorumRf)
throw new AssertionError("Inconsistent ownership/ballot information");
{
// if we had accepted to a quorum we should be committed to a quorum afterwards
// note that we will not always witness something newer than the latest accepted proposal,
// because if we don't witness it during repair, we will simply invalidate it with the low bound
long acceptedBefore = stream(before).mapToLong(n -> n.accept).max().orElse(0L);
long acceptedOfBefore = stream(before).filter(n -> n.accept == acceptedBefore).mapToLong(n -> n.acceptOf).findAny().orElse(0L);
int countBefore = (int) stream(before).filter(n -> n.accept == acceptedBefore).count();
int countAfter = countBefore < quorumAfter
? (int) stream(after).filter(n -> n.any() >= acceptedBefore).count()
: (int) stream(after).filter(n -> n.permanent() >= acceptedOfBefore).count();
if (countBefore >= quorumBefore && countAfter < quorumAfter)
{
throw new AssertionError(String.format("%d: %d accepted by %d before %s but only %s on %d after (expect at least %d)",
primaryKeys[pki], acceptedBefore, countBefore, this, countBefore >= quorumAfter ? "committed" : "accepted", countAfter, quorumAfter));
}
}
{
// we should always have at least a quorum of newer records than the most recently witnessed commit
long committedBefore = stream(before).mapToLong(Ballots.LatestBallots::permanent).max().orElse(0L);
int countAfter = (int) stream(after).filter(n -> n.permanent() >= committedBefore).count();
if (countAfter < quorumAfter)
{
throw new AssertionError(String.format("%d: %d committed before %s but only committed on %d after (expect at least %d)",
primaryKeys[pki], committedBefore, id, countAfter, quorumAfter));
}
}
}
// clear memory usage on success
topologyBefore = null;
ballotsBefore = null;
}
}