| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.cassandra.db.commitlog; |
| |
| import java.io.*; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.function.BiConsumer; |
| import java.util.stream.Collectors; |
| import java.util.zip.CRC32; |
| import java.util.zip.Checksum; |
| |
| import com.google.common.collect.Iterables; |
| |
| import org.junit.*; |
| import com.google.common.io.Files; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.config.ParameterizedClass; |
| import org.apache.cassandra.config.Config.DiskFailurePolicy; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; |
| import org.apache.cassandra.db.compaction.CompactionManager; |
| import org.apache.cassandra.db.marshal.AsciiType; |
| import org.apache.cassandra.db.marshal.BytesType; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.io.FSWriteError; |
| import org.apache.cassandra.io.compress.DeflateCompressor; |
| import org.apache.cassandra.io.compress.LZ4Compressor; |
| import org.apache.cassandra.io.compress.SnappyCompressor; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.security.EncryptionContext; |
| import org.apache.cassandra.security.EncryptionContextGenerator; |
| import org.apache.cassandra.utils.Hex; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.KillerForTests; |
| import org.apache.cassandra.utils.Pair; |
| import org.apache.cassandra.utils.vint.VIntCoding; |
| |
| import org.junit.After; |
| import static org.apache.cassandra.utils.ByteBufferUtil.bytes; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| @RunWith(Parameterized.class) |
| public class CommitLogTest |
| { |
| private static final String KEYSPACE1 = "CommitLogTest"; |
| private static final String KEYSPACE2 = "CommitLogTestNonDurable"; |
| private static final String STANDARD1 = "Standard1"; |
| private static final String STANDARD2 = "Standard2"; |
| private static final String CUSTOM1 = "Custom1"; |
| |
| private static JVMStabilityInspector.Killer oldKiller; |
| private static KillerForTests testKiller; |
| |
| public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext) |
| { |
| DatabaseDescriptor.setCommitLogCompression(commitLogCompression); |
| DatabaseDescriptor.setEncryptionContext(encryptionContext); |
| } |
| |
| @Parameters() |
| public static Collection<Object[]> generateData() |
| { |
| return Arrays.asList(new Object[][]{ |
| {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption |
| {null, EncryptionContextGenerator.createContext(true)}, // Encryption |
| {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, |
| {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}, |
| {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}}); |
| } |
| |
| @BeforeClass |
| public static void beforeClass() throws ConfigurationException |
| { |
| // Disable durable writes for system keyspaces to prevent system mutations, e.g. sstable_activity, |
| // to end up in CL segments and cause unexpected results in this test wrt counting CL segments, |
| // see CASSANDRA-12854 |
| KeyspaceParams.DEFAULT_LOCAL_DURABLE_WRITES = false; |
| |
| SchemaLoader.prepareServer(); |
| |
| CFMetaData custom = CFMetaData.compile(String.format("CREATE TABLE \"%s\" (" + |
| "k int," + |
| "c1 frozen<map<text, text>>," + |
| "c2 frozen<set<text>>," + |
| "s int static," + |
| "PRIMARY KEY (k, c1, c2)" + |
| ");", CUSTOM1), KEYSPACE1); |
| |
| SchemaLoader.createKeyspace(KEYSPACE1, |
| KeyspaceParams.simple(1), |
| SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), |
| SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance), |
| custom); |
| SchemaLoader.createKeyspace(KEYSPACE2, |
| KeyspaceParams.simpleTransient(1), |
| SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), |
| SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); |
| CompactionManager.instance.disableAutoCompaction(); |
| |
| testKiller = new KillerForTests(); |
| |
| // While we don't want the JVM to be nuked from under us on a test failure, we DO want some indication of |
| // an error. If we hit a "Kill the JVM" condition while working with the CL when we don't expect it, an aggressive |
| // KillerForTests will assertion out on us. |
| oldKiller = JVMStabilityInspector.replaceKiller(testKiller); |
| } |
| |
| @AfterClass |
| public static void afterClass() |
| { |
| JVMStabilityInspector.replaceKiller(oldKiller); |
| } |
| |
| @Before |
| public void beforeTest() throws IOException |
| { |
| CommitLog.instance.resetUnsafe(true); |
| } |
| |
| @After |
| public void afterTest() |
| { |
| testKiller.reset(); |
| } |
| |
| @Test |
| public void testRecoveryWithEmptyLog() throws Exception |
| { |
| runExpecting(() -> { |
| CommitLog.instance.recoverFiles(new File[]{ |
| tmpFile(CommitLogDescriptor.current_version), |
| tmpFile(CommitLogDescriptor.current_version) |
| }); |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| |
| @Test |
| public void testRecoveryWithFinalEmptyLog() throws Exception |
| { |
| // Even though it's empty, it's the last commitlog segment, so allowTruncation=true should allow it to pass |
| CommitLog.instance.recoverFiles(new File[]{tmpFile(CommitLogDescriptor.current_version)}); |
| } |
| |
| /** |
| * Since commit log segments can be allocated before they're needed, the commit log file with the highest |
| * id isn't neccesarily the last log that we wrote to. We should remove header only logs on recover so we |
| * can tolerate truncated logs |
| */ |
| @Test |
| public void testHeaderOnlyFileFiltering() throws Exception |
| { |
| File directory = Files.createTempDir(); |
| |
| CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, DatabaseDescriptor.getEncryptionContext()); |
| CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null, DatabaseDescriptor.getEncryptionContext()); |
| |
| ByteBuffer buffer; |
| |
| // this has a header and malformed data |
| File file1 = new File(directory, desc1.fileName()); |
| buffer = ByteBuffer.allocate(1024); |
| CommitLogDescriptor.writeHeader(buffer, desc1); |
| int pos = buffer.position(); |
| CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(), buffer.position() + 128); |
| buffer.position(pos + 8); |
| buffer.putInt(5); |
| buffer.putInt(6); |
| |
| try (OutputStream lout = new FileOutputStream(file1)) |
| { |
| lout.write(buffer.array()); |
| } |
| |
| // this has only a header |
| File file2 = new File(directory, desc2.fileName()); |
| buffer = ByteBuffer.allocate(1024); |
| CommitLogDescriptor.writeHeader(buffer, desc2); |
| try (OutputStream lout = new FileOutputStream(file2)) |
| { |
| lout.write(buffer.array()); |
| } |
| |
| // one corrupt file and one header only file should be ok |
| runExpecting(() -> { |
| CommitLog.instance.recoverFiles(file1, file2); |
| return null; |
| }, null); |
| |
| // 2 corrupt files and one header only file should fail |
| runExpecting(() -> { |
| CommitLog.instance.recoverFiles(file1, file1, file2); |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| |
| @Test |
| public void testRecoveryWithEmptyLog20() throws Exception |
| { |
| CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.VERSION_20)); |
| } |
| |
| @Test |
| public void testRecoveryWithZeroLog() throws Exception |
| { |
| testRecovery(new byte[10], null); |
| } |
| |
| @Test |
| public void testRecoveryWithShortLog() throws Exception |
| { |
| // force EOF while reading log |
| testRecoveryWithBadSizeArgument(100, 10); |
| } |
| |
| @Test |
| public void testRecoveryWithShortPadding() throws Exception |
| { |
| // If we have 0-3 bytes remaining, commitlog replayer |
| // should pass, because there's insufficient room |
| // left in the segment for the legacy size marker. |
| testRecovery(new byte[1], null); |
| testRecovery(new byte[2], null); |
| testRecovery(new byte[3], null); |
| } |
| |
| @Test |
| public void testRecoveryWithShortSize() throws Exception |
| { |
| byte[] data = new byte[5]; |
| data[3] = 1; // Not a legacy marker, give it a fake (short) size |
| runExpecting(() -> { |
| testRecovery(data, CommitLogDescriptor.VERSION_20); |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| |
| @Test |
| public void testRecoveryWithShortMutationSize() throws Exception |
| { |
| testRecoveryWithBadSizeArgument(9, 10); |
| } |
| |
| private void testRecoveryWithGarbageLog() throws Exception |
| { |
| byte[] garbage = new byte[100]; |
| (new java.util.Random()).nextBytes(garbage); |
| testRecovery(garbage, CommitLogDescriptor.current_version); |
| } |
| |
| @Test |
| public void testRecoveryWithGarbageLog_fail() throws Exception |
| { |
| runExpecting(() -> { |
| testRecoveryWithGarbageLog(); |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| |
| @Test |
| public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception |
| { |
| try { |
| System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); |
| testRecoveryWithGarbageLog(); |
| } finally { |
| System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); |
| } |
| } |
| |
| @Test |
| public void testRecoveryWithBadSizeChecksum() throws Exception |
| { |
| Checksum checksum = new CRC32(); |
| checksum.update(100); |
| testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); |
| } |
| |
| @Test |
| public void testRecoveryWithNegativeSizeArgument() throws Exception |
| { |
| // garbage from a partial/bad flush could be read as a negative size even if there is no EOF |
| testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF |
| } |
| |
| @Test |
| public void testDontDeleteIfDirty() throws Exception |
| { |
| Keyspace ks = Keyspace.open(KEYSPACE1); |
| ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1); |
| ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2); |
| |
| // Roughly 32 MB mutation |
| Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) |
| .build(); |
| |
| // Adding it 5 times |
| CommitLog.instance.add(m); |
| CommitLog.instance.add(m); |
| CommitLog.instance.add(m); |
| CommitLog.instance.add(m); |
| CommitLog.instance.add(m); |
| |
| // Adding new mutation on another CF |
| Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(4)) |
| .build(); |
| CommitLog.instance.add(m2); |
| |
| assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| |
| UUID cfid2 = m2.getColumnFamilyIds().iterator().next(); |
| CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); |
| |
| // Assert we still have both our segments |
| assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| } |
| |
| @Test |
| public void testDeleteIfNotDirty() throws Exception |
| { |
| Keyspace ks = Keyspace.open(KEYSPACE1); |
| ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1); |
| ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2); |
| |
| // Roughly 32 MB mutation |
| Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1)) |
| .build(); |
| |
| // Adding it twice (won't change segment) |
| CommitLog.instance.add(rm); |
| CommitLog.instance.add(rm); |
| |
| assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| |
| // "Flush": this won't delete anything |
| UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); |
| CommitLog.instance.sync(true); |
| CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); |
| |
| assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| |
| // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created |
| Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(DatabaseDescriptor.getMaxMutationSize() - 200)) |
| .build(); |
| CommitLog.instance.add(rm2); |
| // also forces a new segment, since each entry-with-overhead is just under half the CL size |
| CommitLog.instance.add(rm2); |
| CommitLog.instance.add(rm2); |
| |
| Collection<CommitLogSegment> segments = CommitLog.instance.segmentManager.getActiveSegments(); |
| |
| assertEquals(String.format("Expected 3 segments but got %d (%s)", segments.size(), getDirtyCFIds(segments)), |
| 3, |
| segments.size()); |
| |
| // "Flush" second cf: The first segment should be deleted since we |
| // didn't write anything on cf1 since last flush (and we flush cf2) |
| |
| UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); |
| CommitLog.instance.discardCompletedSegments(cfid2, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); |
| |
| segments = CommitLog.instance.segmentManager.getActiveSegments(); |
| |
| // Assert we still have both our segment |
| assertEquals(String.format("Expected 1 segment but got %d (%s)", segments.size(), getDirtyCFIds(segments)), |
| 1, |
| segments.size()); |
| } |
| |
| private String getDirtyCFIds(Collection<CommitLogSegment> segments) |
| { |
| return "Dirty cfIds: <" |
| + String.join(", ", segments.stream() |
| .map(CommitLogSegment::getDirtyCFIDs) |
| .flatMap(uuids -> uuids.stream()) |
| .distinct() |
| .map(uuid -> uuid.toString()).collect(Collectors.toList())) |
| + ">"; |
| } |
| |
| private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName) |
| { |
| ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName); |
| // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would |
| // break testEqualRecordLimit |
| int allocSize = 1; |
| Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key) |
| .clustering(colName) |
| .add("val", ByteBuffer.allocate(allocSize)).build(); |
| |
| int max = DatabaseDescriptor.getMaxMutationSize(); |
| max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead |
| |
| // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size |
| int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize); |
| max -= mutationOverhead; |
| |
| // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value. |
| int sizeOfMax = VIntCoding.computeVIntSize(max); |
| max -= sizeOfMax; |
| assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would |
| return max; |
| } |
| |
| private static int getMaxRecordDataSize() |
| { |
| return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes"); |
| } |
| |
| // CASSANDRA-3615 |
| @Test |
| public void testEqualRecordLimit() throws Exception |
| { |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); |
| Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(getMaxRecordDataSize())) |
| .build(); |
| CommitLog.instance.add(rm); |
| } |
| |
| @Test(expected = IllegalArgumentException.class) |
| public void testExceedRecordLimit() throws Exception |
| { |
| Keyspace ks = Keyspace.open(KEYSPACE1); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(STANDARD1); |
| Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize())) |
| .build(); |
| CommitLog.instance.add(rm); |
| throw new AssertionError("mutation larger than limit was accepted"); |
| } |
| |
| protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception |
| { |
| Checksum checksum = new CRC32(); |
| checksum.update(size); |
| testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); |
| } |
| |
| protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception |
| { |
| ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| DataOutputStream dout = new DataOutputStream(out); |
| dout.writeInt(size); |
| dout.writeLong(checksum); |
| dout.write(new byte[dataSize]); |
| dout.close(); |
| testRecovery(out.toByteArray(), CommitLogReplayException.class); |
| } |
| |
| /** |
| * Create a temporary commit log file with an appropriate descriptor at the head. |
| * |
| * @return the commit log file reference and the first position after the descriptor in the file |
| * (so that subsequent writes happen at the correct file location). |
| */ |
| protected Pair<File, Integer> tmpFile() throws IOException |
| { |
| EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext(); |
| CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version, |
| CommitLogSegment.getNextId(), |
| DatabaseDescriptor.getCommitLogCompression(), |
| encryptionContext); |
| |
| |
| ByteBuffer buf = ByteBuffer.allocate(1024); |
| CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext)); |
| buf.flip(); |
| int positionAfterHeader = buf.limit() + 1; |
| |
| File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName()); |
| |
| try (OutputStream lout = new FileOutputStream(logFile)) |
| { |
| lout.write(buf.array(), 0, buf.limit()); |
| } |
| |
| return Pair.create(logFile, positionAfterHeader); |
| } |
| |
| private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext) |
| { |
| if (!encryptionContext.isEnabled()) |
| return Collections.emptyMap(); |
| |
| // if we're testing encryption, we need to write out a cipher IV to the descriptor headers |
| byte[] buf = new byte[16]; |
| new Random().nextBytes(buf); |
| return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf)); |
| } |
| |
| protected File tmpFile(int version) throws IOException |
| { |
| File logFile = File.createTempFile("CommitLog-" + version + "-", ".log"); |
| assert logFile.length() == 0; |
| return logFile; |
| } |
| |
| protected Void testRecovery(byte[] logData, int version) throws Exception |
| { |
| File logFile = tmpFile(version); |
| try (OutputStream lout = new FileOutputStream(logFile)) |
| { |
| lout.write(logData); |
| //statics make it annoying to test things correctly |
| CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ |
| } |
| return null; |
| } |
| |
| protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception |
| { |
| File logFile = tmpFile(desc.version); |
| CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName()); |
| // Change id to match file. |
| desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext()); |
| ByteBuffer buf = ByteBuffer.allocate(1024); |
| CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext())); |
| try (OutputStream lout = new FileOutputStream(logFile)) |
| { |
| lout.write(buf.array(), 0, buf.position()); |
| lout.write(logData); |
| //statics make it annoying to test things correctly |
| CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ |
| } |
| return null; |
| } |
| |
| @Test |
| public void testRecoveryWithIdMismatch() throws Exception |
| { |
| CommitLogDescriptor desc = new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()); |
| File logFile = tmpFile(desc.version); |
| ByteBuffer buf = ByteBuffer.allocate(1024); |
| CommitLogDescriptor.writeHeader(buf, desc); |
| try (OutputStream lout = new FileOutputStream(logFile)) |
| { |
| lout.write(buf.array(), 0, buf.position()); |
| |
| runExpecting(() -> { |
| CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| } |
| |
| @Test |
| public void testRecoveryWithBadCompressor() throws Exception |
| { |
| CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null), EncryptionContextGenerator.createDisabledContext()); |
| runExpecting(() -> { |
| testRecovery(desc, new byte[0]); |
| return null; |
| }, CommitLogReplayException.class); |
| } |
| |
| protected void runExpecting(Callable<Void> r, Class<?> expected) |
| { |
| Throwable caught = null; |
| try |
| { |
| r.call(); |
| } |
| catch (Throwable t) |
| { |
| if (expected != t.getClass()) |
| throw new AssertionError("Expected exception " + expected + ", got " + t, t); |
| caught = t; |
| } |
| if (expected != null && caught == null) |
| Assert.fail("Expected exception " + expected + " but call completed successfully."); |
| |
| assertEquals("JVM kill state doesn't match expectation.", expected != null, testKiller.wasKilled()); |
| } |
| |
| protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception |
| { |
| ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression(); |
| EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext(); |
| runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected); |
| runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected); |
| } |
| |
| @Test |
| public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException |
| { |
| boolean originalState = DatabaseDescriptor.isAutoSnapshot(); |
| try |
| { |
| boolean prev = DatabaseDescriptor.isAutoSnapshot(); |
| DatabaseDescriptor.setAutoSnapshot(false); |
| Keyspace ks = Keyspace.open(KEYSPACE1); |
| ColumnFamilyStore cfs1 = ks.getColumnFamilyStore(STANDARD1); |
| ColumnFamilyStore cfs2 = ks.getColumnFamilyStore(STANDARD2); |
| |
| new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe(); |
| cfs1.truncateBlocking(); |
| DatabaseDescriptor.setAutoSnapshot(prev); |
| Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") |
| .clustering("bytes") |
| .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) |
| .build(); |
| |
| for (int i = 0 ; i < 5 ; i++) |
| CommitLog.instance.add(m2); |
| |
| assertEquals(2, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| CommitLogPosition position = CommitLog.instance.getCurrentPosition(); |
| for (Keyspace keyspace : Keyspace.system()) |
| for (ColumnFamilyStore syscfs : keyspace.getColumnFamilyStores()) |
| CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, CommitLogPosition.NONE, position); |
| CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, CommitLogPosition.NONE, position); |
| assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size()); |
| } |
| finally |
| { |
| DatabaseDescriptor.setAutoSnapshot(originalState); |
| } |
| } |
| |
| @Test |
| public void testTruncateWithoutSnapshotNonDurable() throws IOException |
| { |
| boolean originalState = DatabaseDescriptor.getAutoSnapshot(); |
| try |
| { |
| DatabaseDescriptor.setAutoSnapshot(false); |
| Keyspace notDurableKs = Keyspace.open(KEYSPACE2); |
| Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites); |
| |
| ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); |
| new RowUpdateBuilder(cfs.metadata, 0, "key1") |
| .clustering("bytes").add("val", bytes("abcd")) |
| .build() |
| .applyUnsafe(); |
| |
| assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build()) |
| .cells().iterator().next().value().equals(bytes("abcd"))); |
| |
| cfs.truncateBlocking(); |
| |
| Util.assertEmpty(Util.cmd(cfs).columns("val").build()); |
| } |
| finally |
| { |
| DatabaseDescriptor.setAutoSnapshot(originalState); |
| } |
| } |
| |
| @Test |
| public void replaySimple() throws IOException |
| { |
| int cellCount = 0; |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); |
| final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1") |
| .clustering("bytes") |
| .add("val", bytes("this is a string")) |
| .build(); |
| cellCount += 1; |
| CommitLog.instance.add(rm1); |
| |
| final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2") |
| .clustering("bytes") |
| .add("val", bytes("this is a string")) |
| .build(); |
| cellCount += 1; |
| CommitLog.instance.add(rm2); |
| |
| CommitLog.instance.sync(true); |
| |
| SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata); |
| List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); |
| Assert.assertFalse(activeSegments.isEmpty()); |
| |
| File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); |
| replayer.replayFiles(files); |
| |
| assertEquals(cellCount, replayer.cells); |
| } |
| |
| @Test |
| public void replayWithDiscard() throws IOException |
| { |
| int cellCount = 0; |
| int max = 1024; |
| int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay |
| CommitLogPosition commitLogPosition = null; |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); |
| |
| for (int i = 0; i < max; i++) |
| { |
| final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1) |
| .clustering("bytes") |
| .add("val", bytes("this is a string")) |
| .build(); |
| CommitLogPosition position = CommitLog.instance.add(rm1); |
| |
| if (i == discardPosition) |
| commitLogPosition = position; |
| if (i > discardPosition) |
| { |
| cellCount += 1; |
| } |
| } |
| |
| CommitLog.instance.sync(true); |
| |
| SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata); |
| List<String> activeSegments = CommitLog.instance.getActiveSegmentNames(); |
| Assert.assertFalse(activeSegments.isEmpty()); |
| |
| File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name)); |
| replayer.replayFiles(files); |
| |
| assertEquals(cellCount, replayer.cells); |
| } |
| |
| class SimpleCountingReplayer extends CommitLogReplayer |
| { |
| private final CommitLogPosition filterPosition; |
| private final CFMetaData metadata; |
| int cells; |
| int skipped; |
| |
| SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm) |
| { |
| super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create()); |
| this.filterPosition = filterPosition; |
| this.metadata = cfm; |
| } |
| |
| @SuppressWarnings("resource") |
| @Override |
| public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) |
| { |
| // Filter out system writes that could flake the test. |
| if (!KEYSPACE1.equals(m.getKeyspaceName())) |
| return; |
| |
| if (entryLocation <= filterPosition.position) |
| { |
| // Skip over this mutation. |
| skipped++; |
| return; |
| } |
| for (PartitionUpdate partitionUpdate : m.getPartitionUpdates()) |
| { |
| // Only process mutations for the CF's we're testing against, since we can't deterministically predict |
| // whether or not system keyspaces will be mutated during a test. |
| if (partitionUpdate.metadata().cfName.equals(metadata.cfName)) |
| { |
| for (Row row : partitionUpdate) |
| cells += Iterables.size(row.cells()); |
| } |
| } |
| } |
| } |
| |
| public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException |
| { |
| CommitLog.instance.resetUnsafe(true); |
| |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); |
| |
| DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); |
| try |
| { |
| DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); |
| |
| for (int i = 0 ; i < 5 ; i++) |
| { |
| new RowUpdateBuilder(cfs.metadata, 0, "k") |
| .clustering("c" + i).add("val", ByteBuffer.allocate(100)) |
| .build() |
| .apply(); |
| |
| if (i == 2) |
| { |
| try (Closeable c = Util.markDirectoriesUnwriteable(cfs)) |
| { |
| cfs.forceBlockingFlush(); |
| } |
| catch (Throwable t) |
| { |
| // expected. Cause (after some wrappings) should be a write error |
| while (!(t instanceof FSWriteError)) |
| t = t.getCause(); |
| } |
| } |
| else |
| cfs.forceBlockingFlush(); |
| } |
| } |
| finally |
| { |
| DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); |
| } |
| |
| CommitLog.instance.sync(true); |
| System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); |
| // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog. |
| // If retries work subsequent flushes should clear up error and this should change to expect 0. |
| Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); |
| System.clearProperty("cassandra.replayList"); |
| } |
| |
| public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction) |
| throws ExecutionException, InterruptedException, IOException |
| { |
| CommitLog.instance.resetUnsafe(true); |
| |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); |
| |
| for (int i = 0 ; i < 5 ; i++) |
| { |
| new RowUpdateBuilder(cfs.metadata, 0, "k") |
| .clustering("c" + i).add("val", ByteBuffer.allocate(100)) |
| .build() |
| .apply(); |
| |
| Memtable current = cfs.getTracker().getView().getCurrentMemtable(); |
| if (i == 2) |
| current.makeUnflushable(); |
| |
| flushAction.accept(cfs, current); |
| } |
| if (performCompaction) |
| cfs.forceMajorCompaction(); |
| // Make sure metadata saves and reads fine |
| for (SSTableReader reader : cfs.getLiveSSTables()) |
| reader.reloadSSTableMetadata(); |
| |
| CommitLog.instance.sync(true); |
| System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); |
| // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have |
| // persisted all data in the commit log. Because we know there was an error, there must be something left to |
| // replay. |
| Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); |
| System.clearProperty("cassandra.replayList"); |
| } |
| |
| BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) -> |
| { |
| try |
| { |
| cfs.forceBlockingFlush(); |
| } |
| catch (Throwable t) |
| { |
| // expected after makeUnflushable. Cause (after some wrappings) should be a write error |
| while (!(t instanceof FSWriteError)) |
| t = t.getCause(); |
| // Wait for started flushes to complete. |
| cfs.switchMemtableIfCurrent(current); |
| } |
| }; |
| |
| BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) -> |
| { |
| // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain |
| // flushed data. |
| // This does not stop on errors and should retain segments for which flushing failed. |
| CommitLog.instance.forceRecycleAllSegments(); |
| |
| // Wait for started flushes to complete. |
| cfs.switchMemtableIfCurrent(current); |
| }; |
| |
| @Test |
| public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException |
| { |
| testOutOfOrderFlushRecovery(flush, false); |
| } |
| |
| @Test |
| public void testOutOfOrderLogDiscard() throws ExecutionException, InterruptedException, IOException |
| { |
| testOutOfOrderFlushRecovery(recycleSegments, false); |
| } |
| |
| @Test |
| public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException |
| { |
| testOutOfOrderFlushRecovery(flush, true); |
| } |
| |
| @Test |
| public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException |
| { |
| testOutOfOrderFlushRecovery(recycleSegments, true); |
| } |
| |
| @Test |
| public void testRecoveryWithCollectionClusteringKeysStatic() throws Exception |
| { |
| |
| ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CUSTOM1); |
| RowUpdateBuilder rb = new RowUpdateBuilder(cfs.metadata, 0, 1); |
| |
| rb.add("s", 2); |
| |
| Mutation rm = rb.build(); |
| CommitLog.instance.add(rm); |
| |
| int replayed = 0; |
| |
| try |
| { |
| System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); |
| replayed = CommitLog.instance.resetUnsafe(false); |
| } |
| finally |
| { |
| System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); |
| } |
| |
| Assert.assertEquals(replayed, 1); |
| |
| } |
| } |
| |