blob: a332f743dfff5fb818a20b3219b938afb182ddcd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db;
import com.google.common.base.Charsets;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.*;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(OrderedJUnit4ClassRunner.class)
public class VerifyTest
{
public static final String KEYSPACE = "Keyspace1";
public static final String CF = "Standard1";
public static final String CF2 = "Standard2";
public static final String CF3 = "Standard3";
public static final String CF4 = "Standard4";
public static final String COUNTER_CF = "Counter1";
public static final String COUNTER_CF2 = "Counter2";
public static final String COUNTER_CF3 = "Counter3";
public static final String COUNTER_CF4 = "Counter4";
public static final String CORRUPT_CF = "Corrupt1";
public static final String CORRUPT_CF2 = "Corrupt2";
public static final String CORRUPTCOUNTER_CF = "CounterCorrupt1";
public static final String CORRUPTCOUNTER_CF2 = "CounterCorrupt2";
public static final String CF_UUID = "UUIDKeys";
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
CompressionParams compressionParameters = CompressionParams.snappy(32768);
SchemaLoader.loadSchema();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(KEYSPACE, CF).compression(compressionParameters),
SchemaLoader.standardCFMD(KEYSPACE, CF2).compression(compressionParameters),
SchemaLoader.standardCFMD(KEYSPACE, CF3),
SchemaLoader.standardCFMD(KEYSPACE, CF4),
SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF),
SchemaLoader.standardCFMD(KEYSPACE, CORRUPT_CF2),
SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF).compression(compressionParameters),
SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF2).compression(compressionParameters),
SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF3),
SchemaLoader.counterCFMD(KEYSPACE, COUNTER_CF4),
SchemaLoader.counterCFMD(KEYSPACE, CORRUPTCOUNTER_CF),
SchemaLoader.counterCFMD(KEYSPACE, CORRUPTCOUNTER_CF2),
SchemaLoader.standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance));
}
@Test
public void testVerifyCorrect() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testVerifyCounterCorrect() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF);
fillCounterCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testExtendedVerifyCorrect() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2);
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(true);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testExtendedVerifyCounterCorrect() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF2);
fillCounterCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(true);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testVerifyCorrectUncompressed() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF3);
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testVerifyCounterCorrectUncompressed() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF3);
fillCounterCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testExtendedVerifyCorrectUncompressed() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF4);
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(true);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testExtendedVerifyCounterCorrectUncompressed() throws IOException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(COUNTER_CF4);
fillCounterCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(true);
}
catch (CorruptSSTableException err)
{
fail("Unexpected CorruptSSTableException");
}
}
@Test
public void testVerifyIncorrectDigest() throws IOException, WriteTimeoutException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF);
fillCF(cfs, 2);
Util.getAll(Util.cmd(cfs).build());
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent), "rw"))
{
Long correctChecksum = Long.valueOf(file.readLine());
writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));
}
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
fail("Expected a CorruptSSTableException to be thrown");
}
catch (CorruptSSTableException err) {}
}
@Test
public void testVerifyCorruptRowCorrectDigest() throws IOException, WriteTimeoutException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
fillCF(cfs, 2);
Util.getAll(Util.cmd(cfs).build());
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
// overwrite one row with garbage
long row0Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("0"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
long row1Start = sstable.getPosition(PartitionPosition.ForKey.get(ByteBufferUtil.bytes("1"), cfs.getPartitioner()), SSTableReader.Operator.EQ).position;
long startPosition = row0Start < row1Start ? row0Start : row1Start;
long endPosition = row0Start < row1Start ? row1Start : row0Start;
RandomAccessFile file = new RandomAccessFile(sstable.getFilename(), "rw");
file.seek(startPosition);
file.writeBytes(StringUtils.repeat('z', (int) 2));
file.close();
if (ChunkCache.instance != null)
ChunkCache.instance.invalidateFile(sstable.getFilename());
// Update the Digest to have the right Checksum
writeChecksum(simpleFullChecksum(sstable.getFilename()), sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
// First a simple verify checking digest, which should succeed
try
{
verifier.verify(false);
}
catch (CorruptSSTableException err)
{
fail("Simple verify should have succeeded as digest matched");
}
// Now try extended verify
try
{
verifier.verify(true);
}
catch (CorruptSSTableException err)
{
return;
}
fail("Expected a CorruptSSTableException to be thrown");
}
}
@Test(expected = CorruptSSTableException.class)
public void testVerifyBrokenSSTableMetadata() throws IOException, WriteTimeoutException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
fillCF(cfs, 2);
Util.getAll(Util.cmd(cfs).build());
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
String filenameToCorrupt = sstable.descriptor.filenameFor(Component.STATS);
RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw");
file.seek(0);
file.writeBytes(StringUtils.repeat('z', 2));
file.close();
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
}
}
@Test
public void testMutateRepair() throws IOException, ExecutionException, InterruptedException
{
CompactionManager.instance.disableAutoCompaction();
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CORRUPT_CF2);
fillCF(cfs, 2);
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
sstable.reloadSSTableMetadata();
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
assertTrue(sstable.isRepaired());
cfs.forceMajorCompaction();
sstable = cfs.getLiveSSTables().iterator().next();
Long correctChecksum;
try (RandomAccessFile file = new RandomAccessFile(sstable.descriptor.filenameFor(sstable.descriptor.digestComponent), "rw"))
{
correctChecksum = Long.parseLong(file.readLine());
}
writeChecksum(++correctChecksum, sstable.descriptor.filenameFor(sstable.descriptor.digestComponent));
try (Verifier verifier = new Verifier(cfs, sstable, false))
{
verifier.verify(false);
fail("should be corrupt");
}
catch (CorruptSSTableException e)
{}
assertFalse(sstable.isRepaired());
}
protected void fillCF(ColumnFamilyStore cfs, int partitionsPerSSTable)
{
for (int i = 0; i < partitionsPerSSTable; i++)
{
UpdateBuilder.create(cfs.metadata, String.valueOf(i))
.newRow("c1").add("val", "1")
.newRow("c2").add("val", "2")
.apply();
}
cfs.forceBlockingFlush();
}
protected void fillCounterCF(ColumnFamilyStore cfs, int partitionsPerSSTable) throws WriteTimeoutException
{
for (int i = 0; i < partitionsPerSSTable; i++)
{
UpdateBuilder.create(cfs.metadata, String.valueOf(i))
.newRow("c1").add("val", 100L)
.apply();
}
cfs.forceBlockingFlush();
}
protected long simpleFullChecksum(String filename) throws IOException
{
try (FileInputStream inputStream = new FileInputStream(filename))
{
CRC32 checksum = new CRC32();
CheckedInputStream cinStream = new CheckedInputStream(inputStream, checksum);
byte[] b = new byte[128];
while (cinStream.read(b) >= 0) {
}
return cinStream.getChecksum().getValue();
}
}
protected void writeChecksum(long checksum, String filePath)
{
File outFile = new File(filePath);
BufferedWriter out = null;
try
{
out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8);
out.write(String.valueOf(checksum));
out.flush();
out.close();
}
catch (IOException e)
{
throw new FSWriteError(e, outFile);
}
finally
{
FileUtils.closeQuietly(out);
}
}
}