| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.cassandra.db; |
| |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterators; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.SchemaLoader; |
| import org.apache.cassandra.UpdateBuilder; |
| import org.apache.cassandra.Util; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.cql3.statements.IndexTarget; |
| import org.apache.cassandra.db.compaction.CompactionManager; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.db.marshal.Int32Type; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.db.partitions.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.index.StubIndex; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.metadata.StatsMetadata; |
| import org.apache.cassandra.schema.IndexMetadata; |
| import org.apache.cassandra.schema.KeyspaceParams; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| public class RangeTombstoneTest |
| { |
| private static final String KSNAME = "RangeTombstoneTest"; |
| private static final String CFNAME = "StandardInteger1"; |
| |
| @BeforeClass |
| public static void defineSchema() throws ConfigurationException |
| { |
| SchemaLoader.prepareServer(); |
| SchemaLoader.createKeyspace(KSNAME, |
| KeyspaceParams.simple(1), |
| SchemaLoader.standardCFMD(KSNAME, |
| CFNAME, |
| 1, |
| UTF8Type.instance, |
| Int32Type.instance, |
| Int32Type.instance)); |
| } |
| |
| @Test |
| public void simpleQueryWithRangeTombstoneTest() throws Exception |
| { |
| Keyspace keyspace = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); |
| |
| // Inserting data |
| String key = "k1"; |
| |
| UpdateBuilder builder; |
| |
| builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 40; i += 2) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(10, 22).build().applyUnsafe(); |
| |
| builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(2); |
| for (int i = 1; i < 40; i += 2) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| |
| new RowUpdateBuilder(cfs.metadata, 3, key).addRangeTombstone(19, 27).build().applyUnsafe(); |
| // We don't flush to test with both a range tomsbtone in memtable and in sstable |
| |
| // Queries by name |
| int[] live = new int[]{ 4, 9, 11, 17, 28 }; |
| int[] dead = new int[]{ 12, 19, 21, 24, 27 }; |
| |
| AbstractReadCommandBuilder.SinglePartitionBuilder cmdBuilder = Util.cmd(cfs, key); |
| for (int i : live) |
| cmdBuilder.includeRow(i); |
| for (int i : dead) |
| cmdBuilder.includeRow(i); |
| |
| Partition partition = Util.getOnlyPartitionUnfiltered(cmdBuilder.build()); |
| int nowInSec = FBUtilities.nowInSeconds(); |
| |
| for (int i : live) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| for (int i : dead) |
| assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| |
| // Queries by slices |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(7).toIncl(30).build()); |
| |
| for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 }) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| for (int i : new int[]{ 10, 12, 14, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27 }) |
| assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| } |
| |
| @Test |
| public void rangeTombstoneFilteringTest() throws Exception |
| { |
| CompactionManager.instance.disableAutoCompaction(); |
| Keyspace keyspace = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); |
| |
| // Inserting data |
| String key = "k111"; |
| |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 40; i += 2) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 10).build().applyUnsafe(); |
| |
| new RowUpdateBuilder(cfs.metadata, 2, key).addRangeTombstone(15, 20).build().applyUnsafe(); |
| |
| ImmutableBTreePartition partition; |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(11).toIncl(14).build()); |
| Collection<RangeTombstone> rt = rangeTombstones(partition); |
| assertEquals(0, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(11).toIncl(15).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(20).toIncl(25).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(12).toIncl(25).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(25).toIncl(35).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(0, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(1).toIncl(40).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(2, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(7).toIncl(17).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(2, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(5).toIncl(20).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(2, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(5).toIncl(20).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(2, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(1).toIncl(2).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(0, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(1).toIncl(5).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(1).toIncl(10).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(5).toIncl(6).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(17).toIncl(20).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(17).toIncl(18).build()); |
| rt = rangeTombstones(partition); |
| assertEquals(1, rt.size()); |
| |
| Slices.Builder sb = new Slices.Builder(cfs.getComparator()); |
| sb.add(Slice.Bound.create(cfs.getComparator(), true, true, 1), Slice.Bound.create(cfs.getComparator(), false, true, 10)); |
| sb.add(Slice.Bound.create(cfs.getComparator(), true, true, 16), Slice.Bound.create(cfs.getComparator(), false, true, 20)); |
| |
| partition = Util.getOnlyPartitionUnfiltered(SinglePartitionReadCommand.create(cfs.metadata, FBUtilities.nowInSeconds(), Util.dk(key), sb.build())); |
| rt = rangeTombstones(partition); |
| assertEquals(2, rt.size()); |
| } |
| |
| private Collection<RangeTombstone> rangeTombstones(ImmutableBTreePartition partition) |
| { |
| List<RangeTombstone> tombstones = new ArrayList<RangeTombstone>(); |
| Iterators.addAll(tombstones, partition.deletionInfo().rangeIterator(false)); |
| return tombstones; |
| } |
| |
| @Test |
| public void testTrackTimesPartitionTombstone() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.truncateBlocking(); |
| String key = "rt_times"; |
| |
| int nowInSec = FBUtilities.nowInSeconds(); |
| new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply(); |
| cfs.forceBlockingFlush(); |
| |
| SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 1000, 1000, nowInSec); |
| cfs.forceMajorCompaction(); |
| sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 1000, 1000, nowInSec); |
| } |
| |
| @Test |
| public void testTrackTimesPartitionTombstoneWithData() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.truncateBlocking(); |
| String key = "rt_times"; |
| |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(999).newRow(5).add("val", 5).apply(); |
| |
| key = "rt_times2"; |
| int nowInSec = FBUtilities.nowInSeconds(); |
| new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply(); |
| cfs.forceBlockingFlush(); |
| |
| SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 999, 1000, Integer.MAX_VALUE); |
| cfs.forceMajorCompaction(); |
| sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 999, 1000, Integer.MAX_VALUE); |
| } |
| |
| @Test |
| public void testTrackTimesRangeTombstone() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.truncateBlocking(); |
| String key = "rt_times"; |
| |
| int nowInSec = FBUtilities.nowInSeconds(); |
| new RowUpdateBuilder(cfs.metadata, nowInSec, 1000L, key).addRangeTombstone(1, 2).build().apply(); |
| cfs.forceBlockingFlush(); |
| |
| SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 1000, 1000, nowInSec); |
| cfs.forceMajorCompaction(); |
| sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 1000, 1000, nowInSec); |
| } |
| |
| @Test |
| public void testTrackTimesRangeTombstoneWithData() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.truncateBlocking(); |
| String key = "rt_times"; |
| |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(999).newRow(5).add("val", 5).apply(); |
| |
| key = "rt_times2"; |
| int nowInSec = FBUtilities.nowInSeconds(); |
| new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1000, nowInSec)).apply(); |
| cfs.forceBlockingFlush(); |
| |
| cfs.forceBlockingFlush(); |
| SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 999, 1000, Integer.MAX_VALUE); |
| cfs.forceMajorCompaction(); |
| sstable = cfs.getLiveSSTables().iterator().next(); |
| assertTimes(sstable.getSSTableMetadata(), 999, 1000, Integer.MAX_VALUE); |
| } |
| |
| private void assertTimes(StatsMetadata metadata, long min, long max, int localDeletionTime) |
| { |
| assertEquals(min, metadata.minTimestamp); |
| assertEquals(max, metadata.maxTimestamp); |
| assertEquals(localDeletionTime, metadata.maxLocalDeletionTime); |
| } |
| |
| @Test |
| public void test7810() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.metadata.gcGraceSeconds(2); |
| |
| String key = "7810"; |
| |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 10; i < 20; i ++) |
| builder.newRow(i).add("val", i); |
| builder.apply(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(10, 11).build().apply(); |
| cfs.forceBlockingFlush(); |
| |
| Thread.sleep(5); |
| cfs.forceMajorCompaction(); |
| assertEquals(8, Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build()).rowCount()); |
| } |
| |
| @Test |
| public void test7808_1() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.metadata.gcGraceSeconds(2); |
| |
| String key = "7808_1"; |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 40; i += 2) |
| builder.newRow(i).add("val", i); |
| builder.apply(); |
| cfs.forceBlockingFlush(); |
| |
| new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 1, 1)).apply(); |
| cfs.forceBlockingFlush(); |
| Thread.sleep(5); |
| cfs.forceMajorCompaction(); |
| } |
| |
| @Test |
| public void test7808_2() throws ExecutionException, InterruptedException |
| { |
| Keyspace ks = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(CFNAME); |
| cfs.metadata.gcGraceSeconds(2); |
| |
| String key = "7808_2"; |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 10; i < 20; i ++) |
| builder.newRow(i).add("val", i); |
| builder.apply(); |
| cfs.forceBlockingFlush(); |
| |
| new Mutation(PartitionUpdate.fullPartitionDelete(cfs.metadata, Util.dk(key), 0, 0)).apply(); |
| |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(1).newRow(5).add("val", 5).apply(); |
| |
| cfs.forceBlockingFlush(); |
| Thread.sleep(5); |
| cfs.forceMajorCompaction(); |
| assertEquals(1, Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build()).rowCount()); |
| } |
| |
| @Test |
| public void overlappingRangeTest() throws Exception |
| { |
| CompactionManager.instance.disableAutoCompaction(); |
| Keyspace keyspace = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); |
| |
| // Inserting data |
| String key = "k2"; |
| |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 20; i++) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 15).build().applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(5, 10).build().applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 2, key).addRangeTombstone(5, 8).build().applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| Partition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build()); |
| int nowInSec = FBUtilities.nowInSeconds(); |
| |
| for (int i = 0; i < 5; i++) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| for (int i = 16; i < 20; i++) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| for (int i = 5; i <= 15; i++) |
| assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| |
| // Compact everything and re-test |
| CompactionManager.instance.performMaximal(cfs, false); |
| partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build()); |
| |
| for (int i = 0; i < 5; i++) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds())); |
| for (int i = 16; i < 20; i++) |
| assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds())); |
| for (int i = 5; i <= 15; i++) |
| assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); |
| } |
| |
| @Test |
| public void reverseQueryTest() throws Exception |
| { |
| Keyspace table = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME); |
| |
| // Inserting data |
| String key = "k3"; |
| |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(2).add("val", 2).applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(0, 10).build().applyUnsafe(); |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| // Get the last value of the row |
| FilteredPartition partition = Util.getOnlyPartition(Util.cmd(cfs, key).build()); |
| assertTrue(partition.rowCount() > 0); |
| |
| int last = i(partition.unfilteredIterator(ColumnFilter.all(cfs.metadata), Slices.ALL, true).next().clustering().get(0)); |
| assertEquals("Last column should be column 1 since column 2 has been deleted", 1, last); |
| } |
| |
| @Test |
| public void testRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception |
| { |
| Keyspace table = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME); |
| ByteBuffer key = ByteBufferUtil.bytes("k5"); |
| ByteBuffer indexedColumnName = ByteBufferUtil.bytes("val"); |
| |
| cfs.truncateBlocking(); |
| cfs.disableAutoCompaction(); |
| |
| ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy(); |
| IndexMetadata indexDef = |
| IndexMetadata.fromIndexTargets(cfs.metadata, |
| Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)), |
| "test_index", |
| IndexMetadata.Kind.CUSTOM, |
| ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME, |
| StubIndex.class.getName())); |
| |
| if (!cfs.metadata.getIndexes().get("test_index").isPresent()) |
| cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef)); |
| |
| Future<?> rebuild = cfs.indexManager.addIndex(indexDef); |
| // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions |
| if (rebuild != null) |
| rebuild.get(); |
| |
| StubIndex index = (StubIndex)cfs.indexManager.listIndexes() |
| .stream() |
| .filter(i -> "test_index".equals(i.getIndexMetadata().name)) |
| .findFirst() |
| .orElseThrow(() -> new RuntimeException(new AssertionError("Index not found"))); |
| index.reset(); |
| |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 10; i++) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| assertEquals(10, index.rowsInserted.size()); |
| |
| CompactionManager.instance.performMaximal(cfs, false); |
| |
| // compacted down to single sstable |
| assertEquals(1, cfs.getLiveSSTables().size()); |
| |
| assertEquals(8, index.rowsDeleted.size()); |
| } |
| |
| @Test |
| public void testRangeTombstoneCompaction() throws Exception |
| { |
| Keyspace table = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME); |
| ByteBuffer key = ByteBufferUtil.bytes("k4"); |
| |
| // remove any existing sstables before starting |
| cfs.truncateBlocking(); |
| cfs.disableAutoCompaction(); |
| |
| UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, key).withTimestamp(0); |
| for (int i = 0; i < 10; i += 2) |
| builder.newRow(i).add("val", i); |
| builder.applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| new RowUpdateBuilder(cfs.metadata, 0, key).addRangeTombstone(0, 7).build().applyUnsafe(); |
| cfs.forceBlockingFlush(); |
| |
| // there should be 2 sstables |
| assertEquals(2, cfs.getLiveSSTables().size()); |
| |
| // compact down to single sstable |
| CompactionManager.instance.performMaximal(cfs, false); |
| assertEquals(1, cfs.getLiveSSTables().size()); |
| |
| // test the physical structure of the sstable i.e. rt & columns on disk |
| SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); |
| try (UnfilteredPartitionIterator scanner = sstable.getScanner()) |
| { |
| try (UnfilteredRowIterator iter = scanner.next()) |
| { |
| // after compaction, we should have a single RT with a single row (the row 8) |
| Unfiltered u1 = iter.next(); |
| assertTrue("Expecting open marker, got " + u1.toString(cfs.metadata), u1 instanceof RangeTombstoneMarker); |
| Unfiltered u2 = iter.next(); |
| assertTrue("Expecting close marker, got " + u2.toString(cfs.metadata), u2 instanceof RangeTombstoneMarker); |
| Unfiltered u3 = iter.next(); |
| assertTrue("Expecting row, got " + u3.toString(cfs.metadata), u3 instanceof Row); |
| } |
| } |
| } |
| |
| @Test |
| public void testOverwritesToDeletedColumns() throws Exception |
| { |
| Keyspace table = Keyspace.open(KSNAME); |
| ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME); |
| ByteBuffer key = ByteBufferUtil.bytes("k6"); |
| ByteBuffer indexedColumnName = ByteBufferUtil.bytes("val"); |
| |
| cfs.truncateBlocking(); |
| cfs.disableAutoCompaction(); |
| |
| ColumnDefinition cd = cfs.metadata.getColumnDefinition(indexedColumnName).copy(); |
| IndexMetadata indexDef = |
| IndexMetadata.fromIndexTargets(cfs.metadata, |
| Collections.singletonList(new IndexTarget(cd.name, IndexTarget.Type.VALUES)), |
| "test_index", |
| IndexMetadata.Kind.CUSTOM, |
| ImmutableMap.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME, |
| StubIndex.class.getName())); |
| |
| if (!cfs.metadata.getIndexes().get("test_index").isPresent()) |
| cfs.metadata.indexes(cfs.metadata.getIndexes().with(indexDef)); |
| |
| Future<?> rebuild = cfs.indexManager.addIndex(indexDef); |
| // If rebuild there is, wait for the rebuild to finish so it doesn't race with the following insertions |
| if (rebuild != null) |
| rebuild.get(); |
| |
| StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("test_index"); |
| index.reset(); |
| |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(0).newRow(1).add("val", 1).applyUnsafe(); |
| |
| // add a RT which hides the column we just inserted |
| new RowUpdateBuilder(cfs.metadata, 1, key).addRangeTombstone(0, 1).build().applyUnsafe(); |
| |
| // now re-insert that column |
| UpdateBuilder.create(cfs.metadata, key).withTimestamp(2).newRow(1).add("val", 1).applyUnsafe(); |
| |
| cfs.forceBlockingFlush(); |
| |
| // We should have 1 insert and 1 update to the indexed "1" column |
| // CASSANDRA-6640 changed index update to just update, not insert then delete |
| assertEquals(1, index.rowsInserted.size()); |
| assertEquals(1, index.rowsUpdated.size()); |
| } |
| |
| private static ByteBuffer bb(int i) |
| { |
| return ByteBufferUtil.bytes(i); |
| } |
| |
| private static int i(ByteBuffer bb) |
| { |
| return ByteBufferUtil.toInt(bb); |
| } |
| } |