blob: 900a40f0da81316f783c615a3727019553f21df8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringBound;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.EmptyIterators;
import org.apache.cassandra.db.MutableDeletionInfo;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import org.apache.cassandra.service.reads.repair.TestableReadRepair;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.assertClustering;
import static org.apache.cassandra.Util.assertColumn;
import static org.apache.cassandra.Util.assertColumns;
import static org.apache.cassandra.db.ClusteringBound.Kind;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class DataResolverTest extends AbstractReadResponseTest
{
private ReadCommand command;
private TestableReadRepair readRepair;
private Keyspace ks;
private ColumnFamilyStore cfs;
private EndpointsForRange makeReplicas(int num)
{
StorageService.instance.getTokenMetadata().clearUnsafe();
switch (num)
{
case 2:
ks = AbstractReadResponseTest.ks;
cfs = AbstractReadResponseTest.cfs;
break;
case 4:
ks = AbstractReadResponseTest.ks3;
cfs = AbstractReadResponseTest.cfs3;
break;
default:
throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors");
}
command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
readRepair = new TestableReadRepair(command);
Token token = Murmur3Partitioner.instance.getMinimumToken();
EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
for (int i = 0; i < num; i++)
{
try
{
InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) });
replicas.add(ReplicaUtils.full(endpoint));
StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint);
Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
}
catch (UnknownHostException e)
{
throw new AssertionError(e);
}
}
return replicas.build();
}
@Test
public void testResolveNewerSingleRow()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
.add("c1", "v1")
.buildUpdate()), false));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
.add("c1", "v2")
.buildUpdate()), false));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "c1");
assertColumn(cfm, row, "c1", "v2", 1);
}
}
assertEquals(1, readRepair.sent.size());
// peer 1 just needs to repair with the row from peer 2
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "1", "c1", "v2", 1);
}
@Test
public void testResolveDisjointSingleRow()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
.add("c1", "v1")
.buildUpdate())));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
.add("c2", "v2")
.buildUpdate())));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "c1", "c2");
assertColumn(cfm, row, "c1", "v1", 0);
assertColumn(cfm, row, "c2", "v2", 1);
}
}
assertEquals(2, readRepair.sent.size());
// each peer needs to repair with each other's column
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
}
@Test
public void testResolveDisjointMultipleRows() throws UnknownHostException
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
.add("c1", "v1")
.buildUpdate())));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("2")
.add("c2", "v2")
.buildUpdate())));
try (PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = data.next())
{
// We expect the resolved superset to contain both rows
Row row = rows.next();
assertClustering(cfm, row, "1");
assertColumns(row, "c1");
assertColumn(cfm, row, "c1", "v1", 0);
row = rows.next();
assertClustering(cfm, row, "2");
assertColumns(row, "c2");
assertColumn(cfm, row, "c2", "v2", 1);
assertFalse(rows.hasNext());
assertFalse(data.hasNext());
}
}
assertEquals(2, readRepair.sent.size());
// each peer needs to repair the row from the other
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "2", "c2", "v2", 1);
mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "1", "c1", "v1", 0);
}
@Test
public void testResolveDisjointMultipleRowsWithRangeTombstones()
{
EndpointsForRange replicas = makeReplicas(4);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
.addRangeTombstone(tombstone2)
.buildUpdate();
InetAddressAndPort peer1 = replicas.get(0).endpoint();
UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
.addRangeTombstone(tombstone2)
.buildUpdate());
resolver.preprocess(response(command, peer1, iter1));
// not covered by any range tombstone
InetAddressAndPort peer2 = replicas.get(1).endpoint();
UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0")
.add("c1", "v0")
.buildUpdate());
resolver.preprocess(response(command, peer2, iter2));
// covered by a range tombstone
InetAddressAndPort peer3 = replicas.get(2).endpoint();
UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10")
.add("c2", "v1")
.buildUpdate());
resolver.preprocess(response(command, peer3, iter3));
// range covered by rt, but newer
InetAddressAndPort peer4 = replicas.get(3).endpoint();
UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3")
.add("one", "A")
.buildUpdate());
resolver.preprocess(response(command, peer4, iter4));
try (PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = data.next())
{
Row row = rows.next();
assertClustering(cfm, row, "0");
assertColumns(row, "c1");
assertColumn(cfm, row, "c1", "v0", 0);
row = rows.next();
assertClustering(cfm, row, "3");
assertColumns(row, "one");
assertColumn(cfm, row, "one", "A", 2);
assertFalse(rows.hasNext());
}
}
assertEquals(4, readRepair.sent.size());
// peer1 needs the rows from peers 2 and 4
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
assertRepairContainsColumn(mutation, "3", "one", "A", 2);
// peer2 needs to get the row from peer4 and the RTs
mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
assertRepairContainsColumn(mutation, "3", "one", "A", 2);
// peer 3 needs both rows and the RTs
mutation = readRepair.getForEndpoint(peer3);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
assertRepairContainsColumn(mutation, "3", "one", "A", 2);
// peer4 needs the row from peer2 and the RTs
mutation = readRepair.getForEndpoint(peer4);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, null, tombstone1, tombstone2);
assertRepairContainsColumn(mutation, "0", "c1", "v0", 0);
}
@Test
public void testResolveWithOneEmpty()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
.add("c2", "v2")
.buildUpdate())));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, EmptyIterators.unfilteredPartition(cfm)));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "c2");
assertColumn(cfm, row, "c2", "v2", 1);
}
}
assertEquals(1, readRepair.sent.size());
// peer 2 needs the row from peer 1
Mutation mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "1", "c2", "v2", 1);
}
@Test
public void testResolveWithBothEmpty()
{
EndpointsForRange replicas = makeReplicas(2);
TestableReadRepair readRepair = new TestableReadRepair(command);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
}
assertTrue(readRepair.sent.isEmpty());
}
@Test
public void testResolveDeleted()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
// one response with columns timestamped before a delete in another response
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
.add("one", "A")
.buildUpdate())));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, fullPartitionDelete(cfm, dk, 1, nowInSec)));
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
}
// peer1 should get the deletion from peer2
assertEquals(1, readRepair.sent.size());
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, new DeletionTime(1, nowInSec));
assertRepairContainsNoColumns(mutation);
}
@Test
public void testResolveMultipleDeleted()
{
EndpointsForRange replicas = makeReplicas(4);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
// deletes and columns with interleaved timestamp, with out of order return sequence
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(command, peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
// these columns created after the previous deletion
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(command, peer2, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
.add("one", "A")
.add("two", "A")
.buildUpdate())));
//this column created after the next delete
InetAddressAndPort peer3 = replicas.get(2).endpoint();
resolver.preprocess(response(command, peer3, iter(new RowUpdateBuilder(cfm, nowInSec, 3L, dk).clustering("1")
.add("two", "B")
.buildUpdate())));
InetAddressAndPort peer4 = replicas.get(3).endpoint();
resolver.preprocess(response(command, peer4, fullPartitionDelete(cfm, dk, 2, nowInSec)));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "two");
assertColumn(cfm, row, "two", "B", 3);
}
}
// peer 1 needs to get the partition delete from peer 4 and the row from peer 3
assertEquals(4, readRepair.sent.size());
Mutation mutation = readRepair.getForEndpoint(peer1);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
assertRepairContainsColumn(mutation, "1", "two", "B", 3);
// peer 2 needs the deletion from peer 4 and the row from peer 3
mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
assertRepairContainsColumn(mutation, "1", "two", "B", 3);
// peer 3 needs just the deletion from peer 4
mutation = readRepair.getForEndpoint(peer3);
assertRepairMetadata(mutation);
assertRepairContainsDeletions(mutation, new DeletionTime(2, nowInSec));
assertRepairContainsNoColumns(mutation);
// peer 4 needs just the row from peer 3
mutation = readRepair.getForEndpoint(peer4);
assertRepairMetadata(mutation);
assertRepairContainsNoDeletions(mutation);
assertRepairContainsColumn(mutation, "1", "two", "B", 3);
}
@Test
public void testResolveRangeTombstonesOnBoundaryRightWins() throws UnknownHostException
{
resolveRangeTombstonesOnBoundary(1, 2);
}
@Test
public void testResolveRangeTombstonesOnBoundaryLeftWins() throws UnknownHostException
{
resolveRangeTombstonesOnBoundary(2, 1);
}
@Test
public void testResolveRangeTombstonesOnBoundarySameTimestamp() throws UnknownHostException
{
resolveRangeTombstonesOnBoundary(1, 1);
}
/*
* We want responses to merge on tombstone boundary. So we'll merge 2 "streams":
* 1: [1, 2)(3, 4](5, 6] 2
* 2: [2, 3][4, 5) 1
* which tests all combination of open/close boundaries (open/close, close/open, open/open, close/close).
*
* Note that, because DataResolver returns a "filtered" iterator, it should resolve into an empty iterator.
* However, what should be sent to each source depends on the exact on the timestamps of each tombstones and we
* test a few combination.
*/
private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream"
RangeTombstone one_two = tombstone("1", true , "2", false, timestamp1, nowInSec);
RangeTombstone three_four = tombstone("3", false, "4", true , timestamp1, nowInSec);
RangeTombstone five_six = tombstone("5", false, "6", true , timestamp1, nowInSec);
UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(one_two)
.addRangeTombstone(three_four)
.addRangeTombstone(five_six)
.buildUpdate());
// 2nd "stream"
RangeTombstone two_three = tombstone("2", true, "3", true , timestamp2, nowInSec);
RangeTombstone four_five = tombstone("4", true, "5", false, timestamp2, nowInSec);
UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(two_three)
.addRangeTombstone(four_five)
.buildUpdate());
resolver.preprocess(response(command, peer1, iter1));
resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
}
assertEquals(2, readRepair.sent.size());
Mutation msg1 = readRepair.getForEndpoint(peer1);
assertRepairMetadata(msg1);
assertRepairContainsNoColumns(msg1);
Mutation msg2 = readRepair.getForEndpoint(peer2);
assertRepairMetadata(msg2);
assertRepairContainsNoColumns(msg2);
// Both streams are mostly complementary, so they will roughly get the ranges of the other stream. One subtlety is
// around the value "4" however, as it's included by both stream.
// So for a given stream, unless the other stream has a strictly higher timestamp, the value 4 will be excluded
// from whatever range it receives as repair since the stream already covers it.
// Message to peer1 contains peer2 ranges
assertRepairContainsDeletions(msg1, null, two_three, withExclusiveStartIf(four_five, timestamp1 >= timestamp2));
// Message to peer2 contains peer1 ranges
assertRepairContainsDeletions(msg2, null, one_two, withExclusiveEndIf(three_four, timestamp2 >= timestamp1), five_six);
}
/**
* Test cases where a boundary of a source is covered by another source deletion and timestamp on one or both side
* of the boundary are equal to the "merged" deletion.
* This is a test for CASSANDRA-13237 to make sure we handle this case properly.
*/
@Test
public void testRepairRangeTombstoneBoundary() throws UnknownHostException
{
testRepairRangeTombstoneBoundary(1, 0, 1);
readRepair.sent.clear();
testRepairRangeTombstoneBoundary(1, 1, 0);
readRepair.sent.clear();
testRepairRangeTombstoneBoundary(1, 1, 1);
}
/**
* Test for CASSANDRA-13237, checking we don't fail (and handle correctly) the case where a RT boundary has the
* same deletion on both side (while is useless but could be created by legacy code pre-CASSANDRA-13237 and could
* thus still be sent).
*/
private void testRepairRangeTombstoneBoundary(int timestamp1, int timestamp2, int timestamp3) throws UnknownHostException
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream"
RangeTombstone one_nine = tombstone("0", true , "9", true, timestamp1, nowInSec);
UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
.addRangeTombstone(one_nine)
.buildUpdate());
// 2nd "stream" (build more manually to ensure we have the boundary we want)
RangeTombstoneBoundMarker open_one = marker("0", true, true, timestamp2, nowInSec);
RangeTombstoneBoundaryMarker boundary_five = boundary("5", false, timestamp2, nowInSec, timestamp3, nowInSec);
RangeTombstoneBoundMarker close_nine = marker("9", false, true, timestamp3, nowInSec);
UnfilteredPartitionIterator iter2 = iter(dk, open_one, boundary_five, close_nine);
resolver.preprocess(response(command, peer1, iter1));
resolver.preprocess(response(command, peer2, iter2));
boolean shouldHaveRepair = timestamp1 != timestamp2 || timestamp1 != timestamp3;
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
}
assertEquals(shouldHaveRepair? 1 : 0, readRepair.sent.size());
if (!shouldHaveRepair)
return;
Mutation mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsNoColumns(mutation);
RangeTombstone expected = timestamp1 != timestamp2
// We've repaired the 1st part
? tombstone("0", true, "5", false, timestamp1, nowInSec)
// We've repaired the 2nd part
: tombstone("5", true, "9", true, timestamp1, nowInSec);
assertRepairContainsDeletions(mutation, null, expected);
}
/**
* Test for CASSANDRA-13719: tests that having a partition deletion shadow a range tombstone on another source
* doesn't trigger an assertion error.
*/
@Test
public void testRepairRangeTombstoneWithPartitionDeletion()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream": just a partition deletion
UnfilteredPartitionIterator iter1 = iter(PartitionUpdate.fullPartitionDelete(cfm, dk, 10, nowInSec));
// 2nd "stream": a range tombstone that is covered by the 1st stream
RangeTombstone rt = tombstone("0", true , "10", true, 5, nowInSec);
UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
.addRangeTombstone(rt)
.buildUpdate());
resolver.preprocess(response(command, peer1, iter1));
resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
// 2nd stream should get repaired
}
assertEquals(1, readRepair.sent.size());
Mutation mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsNoColumns(mutation);
assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec));
}
/**
* Additional test for CASSANDRA-13719: tests the case where a partition deletion doesn't shadow a range tombstone.
*/
@Test
public void testRepairRangeTombstoneWithPartitionDeletion2()
{
EndpointsForRange replicas = makeReplicas(2);
DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
// 1st "stream": a partition deletion and a range tombstone
RangeTombstone rt1 = tombstone("0", true , "9", true, 11, nowInSec);
PartitionUpdate upd1 = new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
.addRangeTombstone(rt1)
.buildUpdate();
((MutableDeletionInfo)upd1.deletionInfo()).add(new DeletionTime(10, nowInSec));
UnfilteredPartitionIterator iter1 = iter(upd1);
// 2nd "stream": a range tombstone that is covered by the other stream rt
RangeTombstone rt2 = tombstone("2", true , "3", true, 11, nowInSec);
RangeTombstone rt3 = tombstone("4", true , "5", true, 10, nowInSec);
UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk)
.addRangeTombstone(rt2)
.addRangeTombstone(rt3)
.buildUpdate());
resolver.preprocess(response(command, peer1, iter1));
resolver.preprocess(response(command, peer2, iter2));
// No results, we've only reconciled tombstones.
try (PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
// 2nd stream should get repaired
}
assertEquals(1, readRepair.sent.size());
Mutation mutation = readRepair.getForEndpoint(peer2);
assertRepairMetadata(mutation);
assertRepairContainsNoColumns(mutation);
// 2nd stream should get both the partition deletion, as well as the part of the 1st stream RT that it misses
assertRepairContainsDeletions(mutation, new DeletionTime(10, nowInSec),
tombstone("0", true, "2", false, 11, nowInSec),
tombstone("3", false, "9", true, 11, nowInSec));
}
// Forces the start to be exclusive if the condition holds
private static RangeTombstone withExclusiveStartIf(RangeTombstone rt, boolean condition)
{
if (!condition)
return rt;
Slice slice = rt.deletedSlice();
ClusteringBound<?> newStart = ClusteringBound.create(Kind.EXCL_START_BOUND, slice.start());
return condition
? new RangeTombstone(Slice.make(newStart, slice.end()), rt.deletionTime())
: rt;
}
// Forces the end to be exclusive if the condition holds
private static RangeTombstone withExclusiveEndIf(RangeTombstone rt, boolean condition)
{
if (!condition)
return rt;
Slice slice = rt.deletedSlice();
ClusteringBound<?> newEnd = ClusteringBound.create(Kind.EXCL_END_BOUND, slice.end());
return condition
? new RangeTombstone(Slice.make(slice.start(), newEnd), rt.deletionTime())
: rt;
}
private static ByteBuffer bb(int b)
{
return ByteBufferUtil.bytes(b);
}
private Cell<?> mapCell(int k, int v, long ts)
{
return BufferCell.live(m, ts, bb(v), CellPath.create(bb(k)));
}
@Test
public void testResolveComplexDelete()
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
builder.addCell(mapCell(0, 0, ts[0]));
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
builder.addComplexDeletion(m, expectedCmplxDelete);
Cell<?> expectedCell = mapCell(1, 1, ts[1]);
builder.addCell(expectedCell);
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "m");
Assert.assertNull(row.getCell(m, CellPath.create(bb(0))));
Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1))));
}
}
Mutation mutation = readRepair.getForEndpoint(peer1);
Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
assertTrue(rowIter.hasNext());
Row row = rowIter.next();
assertFalse(rowIter.hasNext());
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
assertEquals(expectedCmplxDelete, cd.complexDeletion());
Assert.assertNull(readRepair.sent.get(peer2));
}
@Test
public void testResolveDeletedCollection()
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
builder.addCell(mapCell(0, 0, ts[0]));
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
builder.addComplexDeletion(m, expectedCmplxDelete);
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
assertFalse(data.hasNext());
}
Mutation mutation = readRepair.getForEndpoint(peer1);
Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
assertTrue(rowIter.hasNext());
Row row = rowIter.next();
assertFalse(rowIter.hasNext());
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Collections.emptySet(), Sets.newHashSet(cd));
assertEquals(expectedCmplxDelete, cd.complexDeletion());
Assert.assertNull(readRepair.sent.get(peer2));
}
@Test
public void testResolveNewCollection()
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
// map column
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[0] - 1, nowInSec);
builder.addComplexDeletion(m, expectedCmplxDelete);
Cell<?> expectedCell = mapCell(0, 0, ts[0]);
builder.addCell(expectedCell);
// empty map column
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk))));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "m");
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
}
}
Assert.assertNull(readRepair.sent.get(peer1));
Mutation mutation = readRepair.getForEndpoint(peer2);
Iterator<Row> rowIter = mutation.getPartitionUpdate(cfm2).iterator();
assertTrue(rowIter.hasNext());
Row row = rowIter.next();
assertFalse(rowIter.hasNext());
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Sets.newHashSet(expectedCell), Sets.newHashSet(cd));
assertEquals(expectedCmplxDelete, cd.complexDeletion());
}
@Test
public void testResolveNewCollectionOverwritingDeleted()
{
EndpointsForRange replicas = makeReplicas(2);
ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
TestableReadRepair readRepair = new TestableReadRepair(cmd);
DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
long[] ts = {100, 200};
// cleared map column
Row.Builder builder = BTreeRow.unsortedBuilder();
builder.newRow(Clustering.EMPTY);
builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
InetAddressAndPort peer1 = replicas.get(0).endpoint();
resolver.preprocess(response(cmd, peer1, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
// newer, overwritten map column
builder.newRow(Clustering.EMPTY);
DeletionTime expectedCmplxDelete = new DeletionTime(ts[1] - 1, nowInSec);
builder.addComplexDeletion(m, expectedCmplxDelete);
Cell<?> expectedCell = mapCell(1, 1, ts[1]);
builder.addCell(expectedCell);
InetAddressAndPort peer2 = replicas.get(1).endpoint();
resolver.preprocess(response(cmd, peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build()))));
try(PartitionIterator data = resolver.resolve())
{
try (RowIterator rows = Iterators.getOnlyElement(data))
{
Row row = Iterators.getOnlyElement(rows);
assertColumns(row, "m");
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
}
}
Row row = Iterators.getOnlyElement(readRepair.getForEndpoint(peer1).getPartitionUpdate(cfm2).iterator());
ComplexColumnData cd = row.getComplexColumnData(m);
assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd));
assertEquals(expectedCmplxDelete, cd.complexDeletion());
Assert.assertNull(readRepair.sent.get(peer2));
}
/** Tests for repaired data tracking */
@Test
public void trackMatchingEmptyDigestsWithAllConclusive()
{
EndpointsForRange replicas = makeReplicas(2);
ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingEmptyDigestsWithSomeConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, false);
verifier.expectDigest(peer2, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingEmptyDigestsWithNoneConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.EMPTY_BYTE_BUFFER;
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, false);
verifier.expectDigest(peer2, digest1, false);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingDigestsWithAllConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingDigestsWithSomeConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest1, false);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingDigestsWithNoneConclusive()
{
EndpointsForRange replicas = makeReplicas(2);
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, false);
verifier.expectDigest(peer2, digest1, false);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMatchingRepairedDigestsWithDifferentData()
{
// As far as repaired data tracking is concerned, the actual data in the response is not relevant
EndpointsForRange replicas = makeReplicas(2);
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMismatchingRepairedDigestsWithAllConclusive()
{
EndpointsForRange replicas = makeReplicas(2);
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest2, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMismatchingRepairedDigestsWithSomeConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, false);
verifier.expectDigest(peer2, digest2, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMismatchingRepairedDigestsWithNoneConclusive()
{
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, false);
verifier.expectDigest(peer2, digest2, false);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest1, false, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, false, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void trackMismatchingRepairedDigestsWithDifferentData()
{
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
EndpointsForRange replicas = makeReplicas(2);
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
verifier.expectDigest(peer2, digest2, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1") .buildUpdate()), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm, dk)), digest2, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void noVerificationForSingletonResponse()
{
// for CL <= 1 a coordinator shouldn't request repaired data tracking but we
// can easily assert that the verification isn't attempted even if it did
EndpointsForRange replicas = makeReplicas(2);
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
verifier.expectDigest(peer1, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
resolveAndConsume(resolver);
assertFalse(verifier.verified);
}
@Test
public void responsesFromOlderVersionsAreNotTracked()
{
// In a mixed version cluster, responses from a replicas running older versions won't include
// tracking info, so the digest and pending session status are defaulted. To make sure these
// default values don't result in false positives we make sure not to consider them when
// processing in DataResolver
EndpointsForRange replicas = makeReplicas(2);
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
verifier.expectDigest(peer1, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
// peer2 is advertising an older version, so when we deserialize its response there are two things to note:
// i) the actual serialized response cannot contain any tracking info so deserialization will use defaults of
// an empty digest and pending sessions = false
// ii) under normal circumstances, this would cause a mismatch with peer1, but because of the older version,
// here it will not
resolver.preprocess(response(command, peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)),
false, MessagingService.VERSION_30,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
@Test
public void responsesFromTransientReplicasAreNotTracked()
{
EndpointsForRange replicas = makeReplicas(2);
EndpointsForRange.Builder mutable = replicas.newBuilder(2);
mutable.add(replicas.get(0));
mutable.add(Replica.transientReplica(replicas.get(1).endpoint(), replicas.range()));
replicas = mutable.build();
TestRepairedDataVerifier verifier = new TestRepairedDataVerifier();
ByteBuffer digest1 = ByteBufferUtil.bytes("digest1");
ByteBuffer digest2 = ByteBufferUtil.bytes("digest2");
InetAddressAndPort peer1 = replicas.get(0).endpoint();
InetAddressAndPort peer2 = replicas.get(1).endpoint();
verifier.expectDigest(peer1, digest1, true);
DataResolver resolver = resolverWithVerifier(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime(), verifier);
resolver.preprocess(response(peer1, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest1, true, command));
resolver.preprocess(response(peer2, iter(PartitionUpdate.emptyUpdate(cfm,dk)), digest2, true, command));
resolveAndConsume(resolver);
assertTrue(verifier.verified);
}
private static class TestRepairedDataVerifier implements RepairedDataVerifier
{
private final RepairedDataTracker expected = new RepairedDataTracker(null);
private boolean verified = false;
private void expectDigest(InetAddressAndPort from, ByteBuffer digest, boolean conclusive)
{
expected.recordDigest(from, digest, conclusive);
}
@Override
public void verify(RepairedDataTracker tracker)
{
verified = expected.equals(tracker);
}
}
private DataResolver resolverWithVerifier(final ReadCommand command,
final ReplicaPlan.SharedForRangeRead plan,
final ReadRepair readRepair,
final long queryStartNanoTime,
final RepairedDataVerifier verifier)
{
class TestableDataResolver extends DataResolver
{
public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime)
{
super(command, plan, readRepair, queryStartNanoTime, true);
}
protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
{
return verifier;
}
}
return new TestableDataResolver(command, plan, readRepair, queryStartNanoTime);
}
private void assertRepairContainsDeletions(Mutation mutation,
DeletionTime deletionTime,
RangeTombstone...rangeTombstones)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
DeletionInfo deletionInfo = update.deletionInfo();
if (deletionTime != null)
assertEquals(deletionTime, deletionInfo.getPartitionDeletion());
assertEquals(rangeTombstones.length, deletionInfo.rangeCount());
Iterator<RangeTombstone> ranges = deletionInfo.rangeIterator(false);
int i = 0;
while (ranges.hasNext())
{
RangeTombstone expected = rangeTombstones[i++];
RangeTombstone actual = ranges.next();
String msg = String.format("Expected %s, but got %s", expected.toString(cfm.comparator), actual.toString(cfm.comparator));
assertEquals(msg, expected, actual);
}
}
private void assertRepairContainsNoDeletions(Mutation mutation)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
assertTrue(update.deletionInfo().isLive());
}
private void assertRepairContainsColumn(Mutation mutation,
String clustering,
String columnName,
String value,
long timestamp)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
Row row = update.getRow(update.metadata().comparator.make(clustering));
assertNotNull(row);
assertColumn(cfm, row, columnName, value, timestamp);
}
private void assertRepairContainsNoColumns(Mutation mutation)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
assertFalse(update.iterator().hasNext());
}
private void assertRepairMetadata(Mutation mutation)
{
PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
assertEquals(update.metadata().keyspace, ks.getName());
assertEquals(update.metadata().name, cfm.name);
}
private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
{
return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, ks.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas, 1));
}
private static void resolveAndConsume(DataResolver resolver)
{
try (PartitionIterator iterator = resolver.resolve())
{
while (iterator.hasNext())
{
try (RowIterator partition = iterator.next())
{
while (partition.hasNext())
partition.next();
}
}
}
}
}