blob: 169e09d67802995372ab9d0f4f53242f10e4d80a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads.repair;
import java.net.UnknownHostException;
import java.util.Random;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
public class RepairedDataVerifierTest
{
private static final String TEST_NAME = "read_command_vh_test_";
private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
private static final String TABLE = "table1";
private final Random random = new Random();
private TableMetadata metadata;
private TableMetrics metrics;
// counter to generate the last byte of peer addresses
private int addressSuffix = 10;
@BeforeClass
public static void init()
{
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(TEST_NAME);
DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
}
@Before
public void setup()
{
metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
metrics = ColumnFamilyStore.metricsFor(metadata.id);
}
@Test
public void repairedDataMismatchWithSomeConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount + 1 , unconfirmedCount());
}
@Test
public void repairedDataMismatchWithNoneConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount + 1 , unconfirmedCount());
}
@Test
public void repairedDataMismatchWithAllConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
tracker.verify();
assertEquals(confirmedCount + 1, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void repairedDataMatchesWithAllConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void repairedDataMatchesWithSomeConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void repairedDataMatchesWithNoneConclusive()
{
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void allEmptyDigestWithAllConclusive()
{
// if a read didn't touch any repaired sstables, digests will be empty
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void allEmptyDigestsWithSomeConclusive()
{
// if a read didn't touch any repaired sstables, digests will be empty
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void allEmptyDigestsWithNoneConclusive()
{
// if a read didn't touch any repaired sstables, digests will be empty
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
InetAddressAndPort peer1 = peer();
InetAddressAndPort peer2 = peer();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
@Test
public void noTrackingDataRecorded()
{
// if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
long confirmedCount = confirmedCount();
long unconfirmedCount = unconfirmedCount();
RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
RepairedDataTracker tracker = new RepairedDataTracker(verifier);
tracker.verify();
assertEquals(confirmedCount, confirmedCount());
assertEquals(unconfirmedCount, unconfirmedCount());
}
private long confirmedCount()
{
return metrics.confirmedRepairedInconsistencies.table.getCount();
}
private long unconfirmedCount()
{
return metrics.unconfirmedRepairedInconsistencies.table.getCount();
}
private InetAddressAndPort peer()
{
try
{
return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
private int key()
{
return random.nextInt();
}
private ReadCommand command(int key)
{
return new StubReadCommand(key, metadata, false);
}
private static class StubReadCommand extends SinglePartitionReadCommand
{
StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
{
super(isDigest,
0,
false,
metadata,
FBUtilities.nowInSeconds(),
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
new ClusteringIndexSliceFilter(Slices.ALL, false),
null);
}
}
}