| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.service.reads.repair; |
| |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import com.google.common.primitives.Ints; |
| |
| import org.apache.cassandra.dht.ByteOrderedPartitioner; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.gms.Gossiper; |
| import org.apache.cassandra.locator.EndpointsForToken; |
| import org.apache.cassandra.locator.ReplicaPlan; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.ColumnIdentifier; |
| import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; |
| import org.apache.cassandra.db.Clustering; |
| import org.apache.cassandra.db.ColumnFamilyStore; |
| import org.apache.cassandra.db.ConsistencyLevel; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.Keyspace; |
| import org.apache.cassandra.db.Mutation; |
| import org.apache.cassandra.db.ReadCommand; |
| import org.apache.cassandra.db.ReadResponse; |
| import org.apache.cassandra.db.partitions.PartitionIterator; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; |
| import org.apache.cassandra.db.rows.BTreeRow; |
| import org.apache.cassandra.db.rows.BufferCell; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.RowIterator; |
| import org.apache.cassandra.locator.EndpointsForRange; |
| import org.apache.cassandra.locator.InetAddressAndPort; |
| import org.apache.cassandra.locator.Replica; |
| import org.apache.cassandra.locator.ReplicaPlans; |
| import org.apache.cassandra.locator.ReplicaUtils; |
| import org.apache.cassandra.net.Message; |
| import org.apache.cassandra.schema.KeyspaceMetadata; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.schema.MigrationManager; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.schema.Tables; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| |
| import static org.apache.cassandra.locator.Replica.fullReplica; |
| import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE; |
| import static org.apache.cassandra.net.Verb.INTERNAL_RSP; |
| |
| @Ignore |
| public abstract class AbstractReadRepairTest |
| { |
| static Keyspace ks; |
| static ColumnFamilyStore cfs; |
| static TableMetadata cfm; |
| static InetAddressAndPort target1; |
| static InetAddressAndPort target2; |
| static InetAddressAndPort target3; |
| static List<InetAddressAndPort> targets; |
| |
| static Replica replica1; |
| static Replica replica2; |
| static Replica replica3; |
| static EndpointsForRange replicas; |
| static ReplicaPlan.ForRead<?> replicaPlan; |
| |
| static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); |
| static DecoratedKey key; |
| static Cell<?> cell1; |
| static Cell<?> cell2; |
| static Cell<?> cell3; |
| static Mutation resolved; |
| |
| static ReadCommand command; |
| |
| static void assertRowsEqual(Row expected, Row actual) |
| { |
| try |
| { |
| Assert.assertEquals(expected == null, actual == null); |
| if (expected == null) |
| return; |
| Assert.assertEquals(expected.clustering(), actual.clustering()); |
| Assert.assertEquals(expected.deletion(), actual.deletion()); |
| Assert.assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class)); |
| } catch (Throwable t) |
| { |
| throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t); |
| } |
| } |
| |
| static void assertRowsEqual(RowIterator expected, RowIterator actual) |
| { |
| assertRowsEqual(expected.staticRow(), actual.staticRow()); |
| while (expected.hasNext()) |
| { |
| assert actual.hasNext(); |
| assertRowsEqual(expected.next(), actual.next()); |
| } |
| assert !actual.hasNext(); |
| } |
| |
| static void assertPartitionsEqual(PartitionIterator expected, PartitionIterator actual) |
| { |
| while (expected.hasNext()) |
| { |
| assert actual.hasNext(); |
| assertRowsEqual(expected.next(), actual.next()); |
| } |
| |
| assert !actual.hasNext(); |
| } |
| |
| static void assertMutationEqual(Mutation expected, Mutation actual) |
| { |
| Assert.assertEquals(expected.getKeyspaceName(), actual.getKeyspaceName()); |
| Assert.assertEquals(expected.key(), actual.key()); |
| Assert.assertEquals(expected.key(), actual.key()); |
| PartitionUpdate expectedUpdate = Iterables.getOnlyElement(expected.getPartitionUpdates()); |
| PartitionUpdate actualUpdate = Iterables.getOnlyElement(actual.getPartitionUpdates()); |
| assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), Iterables.getOnlyElement(actualUpdate)); |
| } |
| |
| static DecoratedKey dk(int v) |
| { |
| return DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v)); |
| } |
| |
| static Cell<?> cell(String name, String value, long timestamp) |
| { |
| return BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), timestamp, ByteBufferUtil.bytes(value)); |
| } |
| |
| static PartitionUpdate update(Cell<?>... cells) |
| { |
| Row.Builder builder = BTreeRow.unsortedBuilder(); |
| builder.newRow(Clustering.EMPTY); |
| for (Cell<?> cell: cells) |
| { |
| builder.addCell(cell); |
| } |
| return PartitionUpdate.singleRowUpdate(cfm, key, builder.build()); |
| } |
| |
| static PartitionIterator partition(Cell<?>... cells) |
| { |
| UnfilteredPartitionIterator iter = new SingletonUnfilteredPartitionIterator(update(cells).unfilteredIterator()); |
| return UnfilteredPartitionIterators.filter(iter, Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(now))); |
| } |
| |
| static Mutation mutation(Cell<?>... cells) |
| { |
| return new Mutation(update(cells)); |
| } |
| |
| @SuppressWarnings("resource") |
| static Message<ReadResponse> msg(InetAddressAndPort from, Cell<?>... cells) |
| { |
| UnfilteredPartitionIterator iter = new SingletonUnfilteredPartitionIterator(update(cells).unfilteredIterator()); |
| return Message.builder(INTERNAL_RSP, ReadResponse.createDataResponse(iter, command, command.executionController().getRepairedDataInfo())) |
| .from(from) |
| .build(); |
| } |
| |
| static class ResultConsumer implements Consumer<PartitionIterator> |
| { |
| |
| PartitionIterator result = null; |
| |
| @Override |
| public void accept(PartitionIterator partitionIterator) |
| { |
| Assert.assertNotNull(partitionIterator); |
| result = partitionIterator; |
| } |
| } |
| |
| private static boolean configured = false; |
| |
| static void configureClass(ReadRepairStrategy repairStrategy) throws Throwable |
| { |
| SchemaLoader.loadSchema(); |
| String ksName = "ks"; |
| |
| String ddl = String.format("CREATE TABLE tbl (k int primary key, v text) WITH read_repair='%s'", |
| repairStrategy.toString().toLowerCase()); |
| |
| cfm = CreateTableStatement.parse(ddl, ksName).build(); |
| assert cfm.params.readRepair == repairStrategy; |
| KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm)); |
| MigrationManager.announceNewKeyspace(ksm, false); |
| |
| ks = Keyspace.open(ksName); |
| cfs = ks.getColumnFamilyStore("tbl"); |
| |
| cfs.sampleReadLatencyNanos = 0; |
| cfs.additionalWriteLatencyNanos = 0; |
| |
| target1 = InetAddressAndPort.getByName("127.0.0.255"); |
| target2 = InetAddressAndPort.getByName("127.0.0.254"); |
| target3 = InetAddressAndPort.getByName("127.0.0.253"); |
| |
| targets = ImmutableList.of(target1, target2, target3); |
| |
| replica1 = fullReplica(target1, FULL_RANGE); |
| replica2 = fullReplica(target2, FULL_RANGE); |
| replica3 = fullReplica(target3, FULL_RANGE); |
| replicas = EndpointsForRange.of(replica1, replica2, replica3); |
| |
| replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas); |
| |
| StorageService.instance.getTokenMetadata().clearUnsafe(); |
| StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint()); |
| StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint()); |
| StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint()); |
| Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1); |
| Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1); |
| Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1); |
| |
| // default test values |
| key = dk(5); |
| cell1 = cell("v", "val1", now); |
| cell2 = cell("v", "val2", now); |
| cell3 = cell("v", "val3", now); |
| resolved = mutation(cell1, cell2); |
| |
| command = Util.cmd(cfs, 1).build(); |
| |
| configured = true; |
| } |
| |
| static Set<InetAddressAndPort> epSet(InetAddressAndPort... eps) |
| { |
| return Sets.newHashSet(eps); |
| } |
| |
| @Before |
| public void setUp() |
| { |
| assert configured : "configureClass must be called in a @BeforeClass method"; |
| |
| cfs.sampleReadLatencyNanos = 0; |
| cfs.additionalWriteLatencyNanos = 0; |
| } |
| |
| static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas) |
| { |
| return replicaPlan(ks, consistencyLevel, replicas, replicas); |
| } |
| |
| static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan) |
| { |
| return repairPlan(readPlan, readPlan.candidates()); |
| } |
| |
| static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets) |
| { |
| return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown); |
| } |
| |
| static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown) |
| { |
| Token token = readPlan.range().left.getToken(); |
| EndpointsForToken pending = EndpointsForToken.empty(token); |
| return ReplicaPlans.forWrite(readPlan.keyspace(), |
| ConsistencyLevel.TWO, |
| liveAndDown.forToken(token), |
| pending, |
| replica -> true, |
| ReplicaPlans.writeReadRepair(readPlan)); |
| } |
| static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets) |
| { |
| return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets); |
| } |
| static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas) |
| { |
| return replicaPlan(keyspace, consistencyLevel, replicas, replicas); |
| } |
| static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets) |
| { |
| return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, targets, 1); |
| } |
| |
| public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime); |
| |
| public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan) |
| { |
| return createInstrumentedReadRepair(command, replicaPlan, System.nanoTime()); |
| |
| } |
| |
| /** |
| * If we haven't received enough full data responses by the time the speculation |
| * timeout occurs, we should send read requests to additional replicas |
| */ |
| @Test |
| public void readSpeculationCycle() |
| { |
| InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)))); |
| ResultConsumer consumer = new ResultConsumer(); |
| |
| Assert.assertEquals(epSet(), repair.getReadRecipients()); |
| repair.startRepair(null, consumer); |
| |
| Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients()); |
| repair.maybeSendAdditionalReads(); |
| Assert.assertEquals(epSet(target1, target2, target3), repair.getReadRecipients()); |
| Assert.assertNull(consumer.result); |
| } |
| |
| /** |
| * If we receive enough data responses by the before the speculation timeout |
| * passes, we shouldn't send additional read requests |
| */ |
| @Test |
| public void noSpeculationRequired() |
| { |
| InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)))); |
| ResultConsumer consumer = new ResultConsumer(); |
| |
| Assert.assertEquals(epSet(), repair.getReadRecipients()); |
| repair.startRepair(null, consumer); |
| |
| Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients()); |
| repair.getReadCallback().onResponse(msg(target1, cell1)); |
| repair.getReadCallback().onResponse(msg(target2, cell1)); |
| |
| repair.maybeSendAdditionalReads(); |
| Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients()); |
| |
| repair.awaitReads(); |
| |
| assertPartitionsEqual(partition(cell1), consumer.result); |
| } |
| } |