blob: 40050bf63cac96100dbc5c4dc63782b1d0b9ca87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.uncommitted.PaxosKeyState;
import org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
import org.apache.cassandra.service.paxos.uncommitted.PaxosUncommittedTracker;
import org.apache.cassandra.service.paxos.uncommitted.PaxosUncommittedTracker.UpdateSupplier;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTO_REPAIR_FREQUENCY_SECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_PAXOS_AUTO_REPAIRS;
import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
import static org.apache.cassandra.service.paxos.BallotGenerator.Global.staleBallot;
import org.apache.cassandra.utils.CloseableIterator;
// quick workaround for metaspace ooms, will properly reuse clusters later
public class PaxosRepair2Test extends TestBaseImpl
{
private static final Logger logger = LoggerFactory.getLogger(PaxosRepair2Test.class);
private static final String TABLE = "tbl";
public static final String OFFSETTABLE_CLOCK_NAME = OffsettableClock.class.getName();
static
{
CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION.setBoolean(false);
DatabaseDescriptor.daemonInitialization();
}
private static int getUncommitted(IInvokableInstance instance, String keyspace, String table)
{
if (instance.isShutdown())
return 0;
int uncommitted = instance.callsOnInstance(() -> {
TableMetadata cfm = Schema.instance.getTableMetadata(keyspace, table);
return Iterators.size(PaxosState.uncommittedTracker().uncommittedKeyIterator(cfm.id, null));
}).call();
logger.info("{} has {} uncommitted instances", instance, uncommitted);
return uncommitted;
}
private static void assertUncommitted(IInvokableInstance instance, String ks, String table, int expected)
{
Assert.assertEquals(expected, getUncommitted(instance, ks, table));
}
private static void repair(Cluster cluster, String keyspace, String table, boolean force)
{
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, RepairParallelism.SEQUENTIAL.getName());
options.put(RepairOption.PRIMARY_RANGE_KEY, Boolean.toString(false));
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(false));
options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(1));
options.put(RepairOption.TRACE_KEY, Boolean.toString(false));
options.put(RepairOption.COLUMNFAMILIES_KEY, "");
options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(false));
options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force));
options.put(RepairOption.PREVIEW, PreviewKind.NONE.toString());
options.put(RepairOption.IGNORE_UNREPLICATED_KS, Boolean.toString(false));
options.put(RepairOption.REPAIR_PAXOS_KEY, Boolean.toString(true));
options.put(RepairOption.PAXOS_ONLY_KEY, Boolean.toString(true));
cluster.get(1).runOnInstance(() -> {
int cmd = StorageService.instance.repairAsync(keyspace, options);
while (true)
{
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
Pair<ActiveRepairService.ParentRepairStatus, List<String>> status = ActiveRepairService.instance.getRepairStatus(cmd);
if (status == null)
continue;
switch (status.left)
{
case IN_PROGRESS:
continue;
case COMPLETED:
return;
default:
throw new AssertionError("Repair failed with errors: " + status.right);
}
}
});
}
private static void repair(Cluster cluster, String keyspace, String table)
{
repair(cluster, keyspace, table, false);
}
@Test
public void paxosRepairPreventsStaleReproposal() throws Throwable
{
Ballot staleBallot = Paxos.newBallot(Ballot.none(), org.apache.cassandra.db.ConsistencyLevel.SERIAL);
try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
.set("paxos_variant", "v2")
.set("paxos_purge_grace_period", "0s")
.set("truncate_request_timeout_in_ms", 1000L)))
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (k int primary key, v int)");
repair(cluster, KEYSPACE, TABLE);
// stop and start node 2 to test loading paxos repair history from disk
cluster.get(2).shutdown().get();
cluster.get(2).startup();
for (int i=0; i<cluster.size(); i++)
{
cluster.get(i+1).runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
DecoratedKey key = cfs.decorateKey(ByteBufferUtil.bytes(1));
Assert.assertFalse(FBUtilities.getBroadcastAddressAndPort().toString(), Commit.isAfter(staleBallot, cfs.getPaxosRepairLowBound(key)));
});
}
// add in the stale proposal
cluster.get(1).runOnInstance(() -> {
TableMetadata cfm = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(1));
ColumnMetadata cdef = cfm.getColumn(new ColumnIdentifier("v", false));
Cell cell = BufferCell.live(cdef, staleBallot.unixMicros(), ByteBufferUtil.bytes(1));
Row row = BTreeRow.singleCellRow(Clustering.EMPTY, cell);
PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, key, row);
Commit.Proposal proposal = new Commit.Proposal(staleBallot, update);
SystemKeyspace.savePaxosProposal(proposal);
});
// shutdown node 3 so we're guaranteed to see the stale proposal
cluster.get(3).shutdown().get();
// the stale inflight proposal should be ignored and the query should succeed
String query = "INSERT INTO " + KEYSPACE + '.' + TABLE + " (k, v) VALUES (1, 2) IF NOT EXISTS";
Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM);
Assert.assertEquals(new Object[][]{new Object[]{ true }}, result);
assertLowBoundPurged(cluster.get(1));
assertLowBoundPurged(cluster.get(2));
}
}
@Test
public void paxosRepairHistoryIsntUpdatedInForcedRepair() throws Throwable
{
Ballot staleBallot = staleBallot(System.currentTimeMillis() - 1000000, System.currentTimeMillis() - 100000, GLOBAL);
try (Cluster cluster = init(Cluster.create(3, cfg -> cfg.with(Feature.GOSSIP, Feature.NETWORK)
.set("paxos_variant", "v2")
.set("truncate_request_timeout_in_ms", 1000L)))
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (k int primary key, v int)");
ClusterUtils.stopUnchecked(cluster.get(3));
InetAddressAndPort node3 = InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());
// make sure node1 knows node3 is down
Awaitility.waitAtMost(1,TimeUnit.MINUTES).until(
() -> !cluster.get(1).callOnInstance(() -> FailureDetector.instance.isAlive(node3)));
repair(cluster, KEYSPACE, TABLE, true);
for (int i = 0; i < cluster.size() - 1; i++)
{
cluster.get(i + 1).runOnInstance(() -> {
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
DecoratedKey key = cfs.decorateKey(ByteBufferUtil.bytes(1));
Assert.assertTrue(FBUtilities.getBroadcastAddressAndPort().toString(), Commit.isAfter(staleBallot, cfs.getPaxosRepairLowBound(key)));
});
}
}
}
private static class PaxosRow
{
final DecoratedKey key;
final Row row;
PaxosRow(DecoratedKey key, Row row)
{
this.key = key;
this.row = row;
}
public String toString()
{
TableMetadata cfm = Schema.instance.getTableMetadata(SYSTEM_KEYSPACE_NAME, SystemKeyspace.PAXOS);
return ByteBufferUtil.bytesToHex(key.getKey()) + " -> " + row.toString(cfm, true);
}
}
private static void compactPaxos()
{
ColumnFamilyStore paxos = Keyspace.open(SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PAXOS);
FBUtilities.waitOnFuture(paxos.forceFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS));
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(paxos, 0, false));
}
private static Map<Integer, PaxosRow> getPaxosRows()
{
Map<Integer, PaxosRow> rows = new HashMap<>();
String queryStr = "SELECT * FROM " + SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PAXOS;
SelectStatement stmt = (SelectStatement) QueryProcessor.parseStatement(queryStr).prepare(ClientState.forInternalCalls());
ReadQuery query = stmt.getQuery(QueryOptions.DEFAULT, FBUtilities.nowInSeconds());
try (ReadExecutionController controller = query.executionController(); PartitionIterator partitions = query.executeInternal(controller))
{
while (partitions.hasNext())
{
RowIterator partition = partitions.next();
while (partition.hasNext())
{
rows.put(Int32Type.instance.compose(partition.partitionKey().getKey()),
new PaxosRow(partition.partitionKey(), partition.next()));
}
}
}
return rows;
}
private static void assertLowBoundPurged(Collection<PaxosRow> rows)
{
Assert.assertEquals(0, DatabaseDescriptor.getPaxosPurgeGrace(SECONDS));
String ip = FBUtilities.getBroadcastAddressAndPort().toString();
for (PaxosRow row : rows)
{
Ballot keyLowBound = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE).getPaxosRepairLowBound(row.key);
Assert.assertTrue(ip, Commit.isAfter(keyLowBound, Ballot.none()));
Assert.assertFalse(ip, PaxosRows.hasBallotBeforeOrEqualTo(row.row, keyLowBound));
}
}
private static void assertLowBoundPurged(IInvokableInstance instance)
{
instance.runOnInstance(() -> assertLowBoundPurged(getPaxosRows().values()));
}
private static void assertLowBoundPurged(Cluster cluster)
{
cluster.forEach(PaxosRepair2Test::assertLowBoundPurged);
}
@Test
public void paxosAutoRepair() throws Throwable
{
try (WithProperties properties = new WithProperties().set(AUTO_REPAIR_FREQUENCY_SECONDS, 1).set(DISABLE_PAXOS_AUTO_REPAIRS, true);
Cluster cluster = init(Cluster.create(3, cfg -> cfg
.set("paxos_variant", "v2")
.set("paxos_repair_enabled", true)
.set("truncate_request_timeout_in_ms", 1000L)));
)
{
cluster.forEach(i -> {
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
Assert.assertEquals(1, CassandraRelevantProperties.AUTO_REPAIR_FREQUENCY_SECONDS.getInt());
Assert.assertTrue(CassandraRelevantProperties.DISABLE_PAXOS_AUTO_REPAIRS.getBoolean());
});
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
ClusterUtils.stopUnchecked(cluster.get(3));
cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
try
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
Assert.fail("expected write timeout");
}
catch (Throwable t)
{
// expected
}
assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 1);
assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 1);
cluster.filters().reset();
// paxos table needs at least 1 flush to be picked up by auto-repairs
cluster.get(1).flush("system");
cluster.get(2).flush("system");
// re-enable repairs
cluster.get(1).runOnInstance(() -> StorageService.instance.setPaxosAutoRepairsEnabled(true));
cluster.get(2).runOnInstance(() -> StorageService.instance.setPaxosAutoRepairsEnabled(true));
Thread.sleep(2000);
for (int i=0; i<20; i++)
{
if (!cluster.get(1).callsOnInstance(() -> PaxosState.uncommittedTracker().hasInflightAutoRepairs()).call()
&& !cluster.get(2).callsOnInstance(() -> PaxosState.uncommittedTracker().hasInflightAutoRepairs()).call())
break;
logger.info("Waiting for auto repairs to finish...");
Thread.sleep(1000);
}
assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
}
}
@Test
public void paxosPurgeGraceSeconds() throws Exception
{
int graceSeconds = 5;
try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
.set("paxos_variant", "v2")
.set("paxos_purge_grace_period", graceSeconds + "s")
.set("paxos_state_purging", Config.PaxosStatePurging.repaired.toString())
.set("truncate_request_timeout_in_ms", 1000L)))
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
repair(cluster, KEYSPACE, TABLE);
cluster.forEach(i -> i.runOnInstance(() -> {
Assert.assertFalse(CassandraRelevantProperties.CLOCK_GLOBAL.isPresent());
compactPaxos();
Map<Integer, PaxosRow> rows = getPaxosRows();
Assert.assertEquals(Sets.newHashSet(1), rows.keySet());
}));
// wait for the grace period to pass, repair again, and the rows should be removed
Thread.sleep((graceSeconds + 1) * 1000);
repair(cluster, KEYSPACE, TABLE);
cluster.forEach(i -> i.runOnInstance(() -> {
compactPaxos();
Map<Integer, PaxosRow> rows = getPaxosRows();
Assert.assertEquals(Sets.newHashSet(), rows.keySet());
}));
}
}
static void assertTimeout(Runnable runnable)
{
try
{
runnable.run();
Assert.fail("timeout expected");
}
catch (RuntimeException e)
{
Assert.assertEquals(CasWriteTimeoutException.class.getName(), e.getClass().getName());
}
}
private static int ballotDeletion(Commit commit)
{
return (int) TimeUnit.MICROSECONDS.toSeconds(commit.ballot.unixMicros()) + SystemKeyspace.legacyPaxosTtlSec(commit.update.metadata());
}
private static void backdateTimestamps(int seconds)
{
long offsetMillis = SECONDS.toMillis(seconds);
ClientState.resetLastTimestamp(System.currentTimeMillis() - offsetMillis);
OffsettableClock.offsetMillis = -offsetMillis;
}
public static class OffsettableClock implements Clock
{
private static volatile long offsetMillis = 0;
public long nanoTime()
{
return System.nanoTime(); // checkstyle: permit system clock
}
public long currentTimeMillis()
{
return System.currentTimeMillis() + offsetMillis; // checkstyle: permit system clock
}
}
@Test
public void legacyPurgeRepairLoop() throws Exception
{
try
{
CassandraRelevantProperties.CLOCK_GLOBAL.setString(OFFSETTABLE_CLOCK_NAME);
try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
.set("paxos_variant", "v2")
.set("paxos_state_purging", "legacy")
.set("paxos_purge_grace_period", "0s")
.set("truncate_request_timeout_in_ms", 1000L)))
)
{
cluster.forEach(i -> Assert.assertEquals(OFFSETTABLE_CLOCK_NAME, CassandraRelevantProperties.CLOCK_GLOBAL.getString()));
int ttl = 3 * 3600;
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH gc_grace_seconds=" + ttl);
// prepare an operation ttl + 1 hour into the past on a single node
cluster.forEach(instance -> {
instance.runOnInstance(() -> {
Assert.assertEquals(OFFSETTABLE_CLOCK_NAME, CassandraRelevantProperties.CLOCK_GLOBAL.getString());
backdateTimestamps(ttl + 3600);
});
});
cluster.filters().inbound().to(1, 2).drop();
assertTimeout(() -> cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS", ConsistencyLevel.QUORUM));
Ballot oldBallot = Ballot.fromUuid(cluster.get(3).callOnInstance(() -> {
TableMetadata cfm = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
DecoratedKey dk = cfm.partitioner.decorateKey(ByteBufferUtil.bytes(400));
try (PaxosState state = PaxosState.get(dk, cfm))
{
return state.currentSnapshot().promised.asUUID();
}
}));
assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 1);
// commit an operation just over ttl in the past on the other nodes
cluster.filters().reset();
cluster.filters().inbound().to(2).drop();
cluster.forEach(instance -> {
instance.runOnInstance(() -> {
backdateTimestamps(ttl + 2);
});
});
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (400, 2, 2) IF NOT EXISTS", ConsistencyLevel.QUORUM);
// expire the cache entries
int nowInSec = FBUtilities.nowInSeconds();
cluster.get(1).runOnInstance(() -> {
TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
try (PaxosState state = PaxosState.get(dk, table))
{
state.updateStateUnsafe(s -> {
Assert.assertNull(s.accepted);
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
Commit.CommittedWithTTL committed = new Commit.CommittedWithTTL(s.committed.ballot,
s.committed.update,
ballotDeletion(s.committed));
Assert.assertTrue(committed.localDeletionTime < nowInSec);
return new PaxosState.Snapshot(Ballot.none(), Ballot.none(), null, committed);
});
}
});
cluster.get(3).runOnInstance(() -> {
TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
try (PaxosState state = PaxosState.get(dk, table))
{
state.updateStateUnsafe(s -> {
Assert.assertNull(s.accepted);
Assert.assertTrue(Commit.isAfter(s.committed.ballot, oldBallot));
Commit.CommittedWithTTL committed = new Commit.CommittedWithTTL(s.committed.ballot,
s.committed.update,
ballotDeletion(s.committed));
Assert.assertTrue(committed.localDeletionTime < nowInSec);
return new PaxosState.Snapshot(oldBallot, oldBallot, null, committed);
});
}
});
cluster.forEach(instance -> {
instance.runOnInstance(() -> {
backdateTimestamps(0);
});
});
cluster.filters().reset();
cluster.filters().inbound().to(2).drop();
cluster.get(3).runOnInstance(() -> {
TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.bytes(400));
UpdateSupplier supplier = PaxosState.uncommittedTracker().unsafGetUpdateSupplier();
try
{
PaxosUncommittedTracker.unsafSetUpdateSupplier(new SingleUpdateSupplier(table, dk, oldBallot));
StorageService.instance.autoRepairPaxos(table.id).get();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
finally
{
PaxosUncommittedTracker.unsafSetUpdateSupplier(supplier);
}
});
assertUncommitted(cluster.get(1), KEYSPACE, TABLE, 0);
assertUncommitted(cluster.get(2), KEYSPACE, TABLE, 0);
assertUncommitted(cluster.get(3), KEYSPACE, TABLE, 0);
}
}
finally
{
CassandraRelevantProperties.CLOCK_GLOBAL.reset();
}
}
private static class SingleUpdateSupplier implements UpdateSupplier
{
private final TableMetadata cfm;
private final DecoratedKey dk;
private final Ballot ballot;
public SingleUpdateSupplier(TableMetadata cfm, DecoratedKey dk, Ballot ballot)
{
this.cfm = cfm;
this.dk = dk;
this.ballot = ballot;
}
public CloseableIterator<PaxosKeyState> repairIterator(TableId cfId, Collection<Range<Token>> ranges)
{
if (!cfId.equals(cfm.id))
return CloseableIterator.empty();
return CloseableIterator.wrap(Collections.singleton(new PaxosKeyState(cfId, dk, ballot, false)).iterator());
}
public CloseableIterator<PaxosKeyState> flushIterator(Memtable paxos)
{
throw new UnsupportedOperationException();
}
}
}