| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.index.sai.disk.v1; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import com.google.common.base.Stopwatch; |
| import org.junit.After; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.Clustering; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.marshal.UTF8Type; |
| import org.apache.cassandra.db.rows.BTreeRow; |
| import org.apache.cassandra.db.rows.BufferCell; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.dht.Murmur3Partitioner; |
| import org.apache.cassandra.index.sai.SAITester; |
| import org.apache.cassandra.index.sai.StorageAttachedIndex; |
| import org.apache.cassandra.index.sai.disk.format.IndexComponent; |
| import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; |
| import org.apache.cassandra.index.sai.utils.IndexEntry; |
| import org.apache.cassandra.index.sai.utils.IndexIdentifier; |
| import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder; |
| import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata; |
| import org.apache.cassandra.index.sai.utils.PrimaryKey; |
| import org.apache.cassandra.index.sai.utils.TermsIterator; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.io.util.FileHandle; |
| import org.apache.cassandra.schema.ColumnMetadata; |
| import org.apache.cassandra.service.StorageService; |
| import org.apache.cassandra.utils.bytecomparable.ByteComparable; |
| import org.apache.cassandra.utils.bytecomparable.ByteSource; |
| |
| import static org.apache.cassandra.Util.dk; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| |
| public class SegmentFlushTest |
| { |
| private static long segmentRowIdOffset; |
| private static int posting1; |
| private static int posting2; |
| private static PrimaryKey minKey; |
| private static PrimaryKey maxKey; |
| private static ByteBuffer minTerm; |
| private static ByteBuffer maxTerm; |
| private static int numRows; |
| |
| @BeforeClass |
| public static void init() |
| { |
| DatabaseDescriptor.toolInitialization(); |
| DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); |
| StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); |
| } |
| |
| @After |
| public void reset() |
| { |
| SegmentBuilder.updateLastValidSegmentRowId(-1); // reset |
| } |
| |
| @Test |
| public void testFlushBetweenRowIds() throws Exception |
| { |
| // exceeds max rowId per segment |
| testFlushBetweenRowIds(0, Integer.MAX_VALUE, 2); |
| testFlushBetweenRowIds(0, Long.MAX_VALUE - 1, 2); |
| testFlushBetweenRowIds(0, SegmentBuilder.LAST_VALID_SEGMENT_ROW_ID + 1, 2); |
| testFlushBetweenRowIds(Integer.MAX_VALUE - SegmentBuilder.LAST_VALID_SEGMENT_ROW_ID - 1, Integer.MAX_VALUE - 1, 1); |
| testFlushBetweenRowIds(Long.MAX_VALUE - SegmentBuilder.LAST_VALID_SEGMENT_ROW_ID - 1, Long.MAX_VALUE - 1, 1); |
| } |
| |
| @Test |
| public void testNoFlushBetweenRowIds() throws Exception |
| { |
| // not exceeds max rowId per segment |
| testFlushBetweenRowIds(0, SegmentBuilder.LAST_VALID_SEGMENT_ROW_ID, 1); |
| testFlushBetweenRowIds(Long.MAX_VALUE - SegmentBuilder.LAST_VALID_SEGMENT_ROW_ID, Long.MAX_VALUE - 1, 1); |
| } |
| |
| private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int segments) throws Exception |
| { |
| Path tmpDir = Files.createTempDirectory("SegmentFlushTest"); |
| IndexDescriptor indexDescriptor = IndexDescriptor.create(new Descriptor(new File(tmpDir.toFile()), "ks", "cf", new SequenceBasedSSTableId(1)), |
| Murmur3Partitioner.instance, |
| SAITester.EMPTY_COMPARATOR); |
| |
| ColumnMetadata column = ColumnMetadata.regularColumn("sai", "internal", "column", UTF8Type.instance); |
| |
| StorageAttachedIndex index = SAITester.createMockIndex(column); |
| |
| SSTableIndexWriter writer = new SSTableIndexWriter(indexDescriptor, index, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> true); |
| |
| List<DecoratedKey> keys = Arrays.asList(dk("1"), dk("2")); |
| Collections.sort(keys); |
| |
| DecoratedKey key1 = keys.get(0); |
| ByteBuffer term1 = UTF8Type.instance.decompose("a"); |
| Row row1 = createRow(column, term1); |
| writer.addRow(SAITester.TEST_FACTORY.create(key1), row1, sstableRowId1); |
| |
| // expect a flush if exceed max rowId per segment |
| DecoratedKey key2 = keys.get(1); |
| ByteBuffer term2 = UTF8Type.instance.decompose("b"); |
| Row row2 = createRow(column, term2); |
| writer.addRow(SAITester.TEST_FACTORY.create(key2), row2, sstableRowId2); |
| |
| writer.complete(Stopwatch.createStarted()); |
| |
| MetadataSource source = MetadataSource.loadColumnMetadata(indexDescriptor, index.identifier()); |
| |
| List<SegmentMetadata> segmentMetadatas = SegmentMetadata.load(source, indexDescriptor.primaryKeyFactory); |
| assertEquals(segments, segmentMetadatas.size()); |
| |
| // verify segment metadata |
| SegmentMetadata segmentMetadata = segmentMetadatas.get(0); |
| segmentRowIdOffset = sstableRowId1; |
| posting1 = 0; |
| posting2 = segments == 1 ? (int) (sstableRowId2 - segmentRowIdOffset) : 0; |
| minKey = SAITester.TEST_FACTORY.create(key1.getToken()); |
| maxKey = segments == 1 ? SAITester.TEST_FACTORY.create(key2.getToken()) : minKey; |
| minTerm = term1; |
| maxTerm = segments == 1 ? term2 : term1; |
| numRows = segments == 1 ? 2 : 1; |
| verifySegmentMetadata(segmentMetadata); |
| verifyStringIndex(indexDescriptor, index.identifier(), segmentMetadata); |
| |
| if (segments > 1) |
| { |
| segmentRowIdOffset = sstableRowId2; |
| posting1 = 0; |
| posting2 = 0; |
| minKey = SAITester.TEST_FACTORY.create(key2.getToken()); |
| maxKey = minKey; |
| minTerm = term2; |
| maxTerm = term2; |
| numRows = 1; |
| |
| segmentMetadata = segmentMetadatas.get(1); |
| verifySegmentMetadata(segmentMetadata); |
| verifyStringIndex(indexDescriptor, index.identifier(), segmentMetadata); |
| } |
| } |
| |
| private void verifySegmentMetadata(SegmentMetadata segmentMetadata) |
| { |
| assertEquals(segmentRowIdOffset, segmentMetadata.rowIdOffset); |
| assertEquals(minKey, segmentMetadata.minKey); |
| assertEquals(maxKey, segmentMetadata.maxKey); |
| assertEquals(minTerm, segmentMetadata.minTerm); |
| assertEquals(maxTerm, segmentMetadata.maxTerm); |
| assertEquals(numRows, segmentMetadata.numRows); |
| } |
| |
| private void verifyStringIndex(IndexDescriptor indexDescriptor, IndexIdentifier indexIdentifier, SegmentMetadata segmentMetadata) throws IOException |
| { |
| FileHandle termsData = indexDescriptor.createPerIndexFileHandle(IndexComponent.TERMS_DATA, indexIdentifier, null); |
| FileHandle postingLists = indexDescriptor.createPerIndexFileHandle(IndexComponent.POSTING_LISTS, indexIdentifier, null); |
| |
| try (TermsIterator iterator = new TermsScanner(termsData, postingLists, segmentMetadata.componentMetadatas.get(IndexComponent.TERMS_DATA).root)) |
| { |
| assertEquals(minTerm, iterator.getMinTerm()); |
| assertEquals(maxTerm, iterator.getMaxTerm()); |
| |
| verifyTermPostings(iterator, minTerm, posting1, posting1); |
| |
| if (numRows > 1) |
| { |
| verifyTermPostings(iterator, maxTerm, posting2, posting2); |
| } |
| |
| assertFalse(iterator.hasNext()); |
| } |
| } |
| |
| private void verifyTermPostings(TermsIterator iterator, ByteBuffer expectedTerm, int minSegmentRowId, int maxSegmentRowId) |
| { |
| IndexEntry indexEntry = iterator.next(); |
| |
| assertEquals(0, ByteComparable.compare(indexEntry.term, v -> ByteSource.of(expectedTerm, v), ByteComparable.Version.OSS50)); |
| assertEquals(minSegmentRowId == maxSegmentRowId ? 1 : 2, indexEntry.postingList.size()); |
| } |
| |
| private Row createRow(ColumnMetadata column, ByteBuffer value) |
| { |
| Row.Builder builder1 = BTreeRow.sortedBuilder(); |
| builder1.newRow(Clustering.EMPTY); |
| builder1.addCell(BufferCell.live(column, 0, value)); |
| return builder1.build(); |
| } |
| } |