| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.sstable; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.google.common.collect.Sets; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.compaction.AbstractCompactedRow; |
| import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; |
| import org.apache.cassandra.db.compaction.CompactionController; |
| import org.apache.cassandra.db.compaction.LazilyCompactedRow; |
| import org.apache.cassandra.db.compaction.OperationType; |
| import org.apache.cassandra.db.compaction.SSTableSplitter; |
| import org.apache.cassandra.dht.Range; |
| import org.apache.cassandra.dht.Token; |
| import org.apache.cassandra.io.sstable.metadata.MetadataCollector; |
| import org.apache.cassandra.metrics.StorageMetrics; |
| import org.apache.cassandra.notifications.INotification; |
| import org.apache.cassandra.notifications.INotificationConsumer; |
| import org.apache.cassandra.notifications.SSTableListChangedNotification; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.streaming.StreamSession; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.Pair; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| public class SSTableRewriterTest extends SchemaLoader |
| { |
| private static final String KEYSPACE = "Keyspace1"; |
| private static final String CF = "Standard1"; |
| @Test |
| public void basicTest() throws InterruptedException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| for (int j = 0; j < 100; j ++) |
| { |
| ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); |
| Mutation rm = new Mutation(KEYSPACE, key); |
| rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j); |
| rm.apply(); |
| } |
| cfs.forceBlockingFlush(); |
| Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); |
| assertEquals(1, sstables.size()); |
| SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false); |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) |
| { |
| ISSTableScanner scanner = scanners.scanners.get(0); |
| CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); |
| writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| while(scanner.hasNext()) |
| { |
| AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); |
| writer.append(row); |
| } |
| } |
| Collection<SSTableReader> newsstables = writer.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); |
| Thread.sleep(100); |
| validateCFS(cfs); |
| int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); |
| assertEquals(1, filecounts); |
| |
| } |
| @Test |
| public void basicTest2() throws InterruptedException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); |
| assertEquals(1, sstables.size()); |
| SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) |
| { |
| ISSTableScanner scanner = scanners.scanners.get(0); |
| CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); |
| writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| while (scanner.hasNext()) |
| { |
| AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); |
| writer.append(row); |
| } |
| } |
| Collection<SSTableReader> newsstables = writer.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); |
| Thread.sleep(100); |
| validateCFS(cfs); |
| int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); |
| assertEquals(1, filecounts); |
| } |
| |
| @Test |
| public void getPositionsTest() throws InterruptedException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); |
| assertEquals(1, sstables.size()); |
| SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); |
| boolean checked = false; |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) |
| { |
| ISSTableScanner scanner = scanners.scanners.get(0); |
| CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); |
| writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| while (scanner.hasNext()) |
| { |
| AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next())); |
| writer.append(row); |
| if (!checked && writer.currentWriter().getFilePointer() > 15000000) |
| { |
| checked = true; |
| for (SSTableReader sstable : cfs.getSSTables()) |
| { |
| if (sstable.openReason == SSTableReader.OpenReason.EARLY) |
| { |
| SSTableReader c = sstables.iterator().next(); |
| Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken())); |
| List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r); |
| List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r); |
| assertEquals(1, tmplinkPositions.size()); |
| assertEquals(1, compactingPositions.size()); |
| assertEquals(0, tmplinkPositions.get(0).left.longValue()); |
| // make sure we have no overlap between the early opened file and the compacting one: |
| assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left.longValue()); |
| assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue()); |
| } |
| } |
| } |
| } |
| } |
| assertTrue(checked); |
| Collection<SSTableReader> newsstables = writer.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION); |
| Thread.sleep(100); |
| validateCFS(cfs); |
| int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); |
| assertEquals(1, filecounts); |
| truncate(cfs); |
| } |
| |
| @Test |
| public void testFileRemoval() throws InterruptedException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); |
| for (int i = 0; i < 1000; i++) |
| cf.addColumn(Util.column(String.valueOf(i), "a", 1)); |
| File dir = cfs.directories.getDirectoryForNewSSTables(); |
| SSTableWriter writer = getWriter(cfs, dir); |
| try |
| { |
| for (int i = 0; i < 1000; i++) |
| writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf); |
| SSTableReader s = writer.openEarly(1000); |
| assertFileCounts(dir.list(), 2, 2); |
| for (int i = 1000; i < 2000; i++) |
| writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf); |
| SSTableReader s2 = writer.openEarly(1000); |
| assertTrue(s.last.compareTo(s2.last) < 0); |
| assertFileCounts(dir.list(), 2, 2); |
| s.markObsolete(cfs.getDataTracker()); |
| s.selfRef().release(); |
| s2.selfRef().release(); |
| Thread.sleep(1000); |
| assertFileCounts(dir.list(), 0, 2); |
| writer.abort(); |
| Thread.sleep(1000); |
| int datafiles = assertFileCounts(dir.list(), 0, 0); |
| assertEquals(datafiles, 0); |
| validateCFS(cfs); |
| } |
| catch (Throwable t) |
| { |
| writer.abort(); |
| throw t; |
| } |
| } |
| |
| @Test |
| public void testNumberOfFilesAndSizes() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| long startStorageMetricsLoad = StorageMetrics.load.count(); |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| int files = 1; |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count()); |
| assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count()); |
| |
| } |
| } |
| } |
| List<SSTableReader> sstables = rewriter.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); |
| long sum = 0; |
| for (SSTableReader x : cfs.getSSTables()) |
| sum += x.bytesOnDisk(); |
| assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count()); |
| assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count()); |
| assertEquals(files, sstables.size()); |
| assertEquals(files, cfs.getSSTables().size()); |
| Thread.sleep(1000); |
| // tmplink and tmp files should be gone: |
| assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count()); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| validateCFS(cfs); |
| } |
| |
| @Test |
| public void testNumberOfFiles_dont_clean_readers() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| int files = 1; |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| } |
| |
| List<SSTableReader> sstables = rewriter.finish(); |
| assertEquals(files, sstables.size()); |
| assertEquals(files, cfs.getSSTables().size()); |
| assertEquals(1, cfs.getDataTracker().getView().shadowed.size()); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); |
| assertEquals(files, cfs.getSSTables().size()); |
| assertEquals(0, cfs.getDataTracker().getView().shadowed.size()); |
| Thread.sleep(1000); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| validateCFS(cfs); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| throw t; |
| } |
| } |
| |
| |
| @Test |
| public void testNumberOfFiles_abort() throws Exception |
| { |
| testNumberOfFiles_abort(new RewriterTest() |
| { |
| public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) |
| { |
| int files = 1; |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| } |
| rewriter.abort(); |
| } |
| }); |
| } |
| |
| @Test |
| public void testNumberOfFiles_abort2() throws Exception |
| { |
| testNumberOfFiles_abort(new RewriterTest() |
| { |
| public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) |
| { |
| int files = 1; |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| if (files == 3) |
| { |
| //testing to abort when we have nothing written in the new file |
| rewriter.abort(); |
| break; |
| } |
| } |
| } |
| }); |
| } |
| |
| @Test |
| public void testNumberOfFiles_abort3() throws Exception |
| { |
| testNumberOfFiles_abort(new RewriterTest() |
| { |
| public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter) |
| { |
| int files = 1; |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (files == 1 && rewriter.currentWriter().getFilePointer() > 10000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| } |
| rewriter.abort(); |
| } |
| }); |
| } |
| |
| private static interface RewriterTest |
| { |
| public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter); |
| } |
| |
| private void testNumberOfFiles_abort(RewriterTest test) throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| |
| DecoratedKey origFirst = s.first; |
| DecoratedKey origLast = s.last; |
| long startSize = cfs.metric.liveDiskSpaceUsed.count(); |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| test.run(scanner, controller, s, cfs, rewriter); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| throw t; |
| } |
| |
| Thread.sleep(1000); |
| assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count()); |
| assertEquals(1, cfs.getSSTables().size()); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| assertEquals(cfs.getSSTables().iterator().next().first, origFirst); |
| assertEquals(cfs.getSSTables().iterator().next().last, origLast); |
| validateCFS(cfs); |
| } |
| |
| @Test |
| public void testNumberOfFiles_finish_empty_new_writer() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| int files = 1; |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| if (files == 3) |
| { |
| //testing to finish when we have nothing written in the new file |
| List<SSTableReader> sstables = rewriter.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); |
| break; |
| } |
| } |
| |
| Thread.sleep(1000); |
| assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| validateCFS(cfs); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| throw t; |
| } |
| } |
| |
| @Test |
| public void testNumberOfFiles_truncate() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| cfs.disableAutoCompaction(); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| int files = 1; |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| files++; |
| assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out. |
| } |
| } |
| |
| List<SSTableReader> sstables = rewriter.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); |
| Thread.sleep(1000); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| truncate(cfs); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| throw t; |
| } |
| } |
| |
| @Test |
| public void testSmallFiles() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| cfs.disableAutoCompaction(); |
| |
| SSTableReader s = writeFile(cfs, 400); |
| cfs.addSSTable(s); |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000); |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| |
| int files = 1; |
| try (ISSTableScanner scanner = s.getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while(scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000) |
| { |
| assertEquals(files, cfs.getSSTables().size()); // all files are now opened early |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| files++; |
| } |
| } |
| |
| List<SSTableReader> sstables = rewriter.finish(); |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); |
| assertEquals(files, sstables.size()); |
| assertEquals(files, cfs.getSSTables().size()); |
| Thread.sleep(1000); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| validateCFS(cfs); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| throw t; |
| } |
| } |
| @Test |
| public void testSSTableSplit() throws InterruptedException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| cfs.disableAutoCompaction(); |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false); |
| SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10); |
| splitter.split(); |
| Thread.sleep(1000); |
| assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| s.selfRef().release(); |
| for (File f : s.descriptor.directory.listFiles()) |
| { |
| // we need to clear out the data dir, otherwise tests running after this breaks |
| f.delete(); |
| } |
| truncate(cfs); |
| } |
| |
| @Test |
| public void testOfflineAbort() throws Exception |
| { |
| testAbortHelper(true, true); |
| } |
| @Test |
| public void testOfflineAbort2() throws Exception |
| { |
| testAbortHelper(false, true); |
| } |
| |
| @Test |
| public void testAbort() throws Exception |
| { |
| testAbortHelper(false, false); |
| } |
| |
| @Test |
| public void testAbort2() throws Exception |
| { |
| testAbortHelper(true, false); |
| } |
| |
| private void testAbortHelper(boolean earlyException, boolean offline) throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| SSTableReader s = writeFile(cfs, 1000); |
| if (!offline) |
| cfs.addSSTable(s); |
| Set<SSTableReader> compacting = Sets.newHashSet(s); |
| cfs.getDataTracker().markCompacting(compacting); |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000); |
| SSTableWriter w = getWriter(cfs, s.descriptor.directory); |
| rewriter.switchWriter(w); |
| try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while (scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| } |
| } |
| try |
| { |
| rewriter.finishAndThrow(earlyException); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| } |
| } |
| finally |
| { |
| cfs.getDataTracker().unmarkCompacting(compacting); |
| } |
| Thread.sleep(1000); |
| int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| assertEquals(filecount, 1); |
| if (!offline) |
| { |
| assertEquals(1, cfs.getSSTables().size()); |
| validateCFS(cfs); |
| truncate(cfs); |
| } |
| else |
| { |
| assertEquals(0, cfs.getSSTables().size()); |
| cfs.truncateBlocking(); |
| } |
| filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| if (offline) |
| { |
| s.selfRef().release(); |
| // the file is not added to the CFS, therefor not truncated away above |
| assertEquals(1, filecount); |
| for (File f : s.descriptor.directory.listFiles()) |
| { |
| f.delete(); |
| } |
| filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); |
| } |
| |
| assertEquals(0, filecount); |
| truncate(cfs); |
| } |
| |
| @Test |
| public void testAllKeysReadable() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| for (int i = 0; i < 100; i++) |
| { |
| DecoratedKey key = Util.dk(Integer.toString(i)); |
| Mutation rm = new Mutation(KEYSPACE, key.getKey()); |
| for (int j = 0; j < 10; j++) |
| rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100); |
| rm.apply(); |
| } |
| cfs.forceBlockingFlush(); |
| cfs.forceMajorCompaction(); |
| validateKeys(keyspace); |
| |
| assertEquals(1, cfs.getSSTables().size()); |
| SSTableReader s = cfs.getSSTables().iterator().next(); |
| Set<SSTableReader> compacting = new HashSet<>(); |
| compacting.add(s); |
| cfs.getDataTracker().markCompacting(compacting); |
| |
| SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1); |
| SSTableWriter w = getWriter(cfs, s.descriptor.directory); |
| rewriter.switchWriter(w); |
| int keyCount = 0; |
| try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); |
| CompactionController controller = new CompactionController(cfs, compacting, 0)) |
| { |
| while (scanner.hasNext()) |
| { |
| rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); |
| if (keyCount % 10 == 0) |
| { |
| rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); |
| } |
| keyCount++; |
| validateKeys(keyspace); |
| } |
| try |
| { |
| cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION); |
| cfs.getDataTracker().unmarkCompacting(compacting); |
| } |
| catch (Throwable t) |
| { |
| rewriter.abort(); |
| } |
| } |
| validateKeys(keyspace); |
| Thread.sleep(1000); |
| validateCFS(cfs); |
| truncate(cfs); |
| } |
| |
| @Test |
| public void testCanonicalView() throws IOException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting()); |
| assertEquals(1, sstables.size()); |
| SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); |
| boolean checked = false; |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) |
| { |
| ISSTableScanner scanner = scanners.scanners.get(0); |
| CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); |
| writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| while (scanner.hasNext()) |
| { |
| AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next())); |
| writer.append(row); |
| if (!checked && writer.currentWriter().getFilePointer() > 15000000) |
| { |
| checked = true; |
| ColumnFamilyStore.ViewFragment viewFragment = cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES); |
| // canonical view should have only one SSTable which is not opened early. |
| assertEquals(1, viewFragment.sstables.size()); |
| SSTableReader sstable = viewFragment.sstables.get(0); |
| assertEquals(s.descriptor, sstable.descriptor); |
| assertTrue("Found early opened SSTable in canonical view: " + sstable.getFilename(), sstable.openReason != SSTableReader.OpenReason.EARLY); |
| } |
| } |
| } |
| writer.abort(); |
| cfs.getDataTracker().unmarkCompacting(sstables); |
| truncate(cfs); |
| } |
| |
| @Test |
| public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| cfs.addSSTable(writeFile(cfs, 1000)); |
| |
| Collection<SSTableReader> allSSTables = cfs.getSSTables(); |
| assertEquals(1, allSSTables.size()); |
| final Token firstToken = allSSTables.iterator().next().first.getToken(); |
| DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); |
| |
| List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( |
| Collections.singleton(new Range<Token>(firstToken, firstToken)), |
| Collections.singleton(cfs), 0L, false); |
| assertEquals(1, sectionsBeforeRewrite.size()); |
| for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) |
| section.ref.release(); |
| final AtomicInteger checkCount = new AtomicInteger(); |
| // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables |
| INotificationConsumer consumer = new INotificationConsumer() |
| { |
| public void handleNotification(INotification notification, Object sender) |
| { |
| if (notification instanceof SSTableListChangedNotification) |
| { |
| Collection<SSTableReader> added = ((SSTableListChangedNotification) notification).added; |
| Collection<SSTableReader> removed = ((SSTableListChangedNotification) notification).removed; |
| // note that we need to check if added.equals(removed) because once the compaction is done the old sstable will have |
| // selfRef().globalCount() == 0 and we cant get the SectionsForRanges then. During incremental opening we always add and remove the same |
| // sstable (note that the sstables are x.equal(y) but not x == y since the new one will be a new instance with a moved starting point |
| // In this case we must avoid trying to call getSSTableSectionsForRanges since we are in the notification |
| // method and trying to reference an sstable with globalcount == 0 puts it into a loop, and this blocks the tracker from removing the |
| // unreferenced sstable. |
| if (added.isEmpty() || !added.iterator().next().getColumnFamilyName().equals(cfs.getColumnFamilyName()) || !added.equals(removed)) |
| return; |
| |
| // at no point must the rewrite process hide |
| // sections returned by getSSTableSectionsForRanges |
| Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); |
| List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false); |
| assertEquals(1, sections.size()); |
| for (StreamSession.SSTableStreamingSections section : sections) |
| section.ref.release(); |
| checkCount.incrementAndGet(); |
| } |
| } |
| }; |
| cfs.getDataTracker().subscribe(consumer); |
| try |
| { |
| cfs.forceMajorCompaction(); |
| // reset |
| } |
| finally |
| { |
| DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); |
| cfs.getDataTracker().unsubscribe(consumer); |
| } |
| assertTrue(checkCount.get() >= 2); |
| truncate(cfs); |
| } |
| |
| /** |
| * emulates anticompaction - writing from one source sstable to two new sstables |
| * |
| * @throws IOException |
| */ |
| @Test |
| public void testTwoWriters() throws IOException |
| { |
| Keyspace keyspace = Keyspace.open(KEYSPACE); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); |
| truncate(cfs); |
| |
| SSTableReader s = writeFile(cfs, 1000); |
| cfs.addSSTable(s); |
| Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting()); |
| assertEquals(1, sstables.size()); |
| SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false); |
| SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false); |
| try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) |
| { |
| ISSTableScanner scanner = scanners.scanners.get(0); |
| CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); |
| writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); |
| while (scanner.hasNext()) |
| { |
| AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next())); |
| |
| if (writer.currentWriter().getFilePointer() < 15000000) |
| writer.append(row); |
| else |
| writer2.append(row); |
| } |
| for (int i = 0; i < 5000; i++) |
| { |
| DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i)); |
| ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF); |
| assertTrue(cf != null); |
| } |
| } |
| writer.abort(); |
| writer2.abort(); |
| cfs.getDataTracker().unmarkCompacting(sstables); |
| truncate(cfs); |
| } |
| |
| |
| private void validateKeys(Keyspace ks) |
| { |
| for (int i = 0; i < 100; i++) |
| { |
| DecoratedKey key = Util.dk(Integer.toString(i)); |
| ColumnFamily cf = Util.getColumnFamily(ks, key, CF); |
| assertTrue(cf != null); |
| } |
| } |
| |
| private SSTableReader writeFile(ColumnFamilyStore cfs, int count) |
| { |
| ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); |
| for (int i = 0; i < count / 100; i++) |
| cf.addColumn(Util.cellname(i), random(0, 1000), 1); |
| File dir = cfs.directories.getDirectoryForNewSSTables(); |
| String filename = cfs.getTempSSTablePath(dir); |
| |
| SSTableWriter writer = new SSTableWriter(filename, |
| 0, |
| 0, |
| cfs.metadata, |
| StorageService.getPartitioner(), |
| new MetadataCollector(cfs.metadata.comparator)); |
| |
| for (int i = 0; i < count * 5; i++) |
| writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf); |
| return writer.closeAndOpenReader(); |
| } |
| |
| private void validateCFS(ColumnFamilyStore cfs) |
| { |
| Set<Integer> liveDescriptors = new HashSet<>(); |
| for (SSTableReader sstable : cfs.getSSTables()) |
| { |
| assertFalse(sstable.isMarkedCompacted()); |
| assertEquals(1, sstable.selfRef().globalCount()); |
| liveDescriptors.add(sstable.descriptor.generation); |
| } |
| for (File dir : cfs.directories.getCFDirectories()) |
| { |
| for (String f : dir.list()) |
| { |
| if (f.contains("Data")) |
| { |
| Descriptor d = Descriptor.fromFilename(f); |
| assertTrue(d.toString(), liveDescriptors.contains(d.generation)); |
| } |
| } |
| } |
| assertTrue(cfs.getDataTracker().getCompacting().isEmpty()); |
| assertTrue("" + cfs.getTotalDiskSpaceUsed(), cfs.getTotalDiskSpaceUsed() >= 0); |
| } |
| |
| private void truncate(ColumnFamilyStore cfs) |
| { |
| cfs.truncateBlocking(); |
| SSTableDeletingTask.waitForDeletions(); |
| validateCFS(cfs); |
| assertTrue(cfs.getTotalDiskSpaceUsed() == 0); |
| } |
| |
| private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) |
| { |
| int tmplinkcount = 0; |
| int tmpcount = 0; |
| int datacount = 0; |
| for (String f : files) |
| { |
| if (f.endsWith("-CRC.db")) |
| continue; |
| if (f.contains("-tmplink-")) |
| tmplinkcount++; |
| else if (f.contains("-tmp-")) |
| tmpcount++; |
| else if (f.contains("Data")) |
| datacount++; |
| } |
| assertEquals(expectedtmplinkCount, tmplinkcount); |
| assertEquals(expectedtmpCount, tmpcount); |
| return datacount; |
| } |
| |
| private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory) |
| { |
| String filename = cfs.getTempSSTablePath(directory); |
| return new SSTableWriter(filename, |
| 0, |
| 0, |
| cfs.metadata, |
| StorageService.getPartitioner(), |
| new MetadataCollector(cfs.metadata.comparator)); |
| } |
| |
| private ByteBuffer random(int i, int size) |
| { |
| byte[] bytes = new byte[size + 4]; |
| ThreadLocalRandom.current().nextBytes(bytes); |
| ByteBuffer r = ByteBuffer.wrap(bytes); |
| r.putInt(0, i); |
| return r; |
| } |
| |
| } |