| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.sstable.format.bti; |
| |
| import java.io.IOException; |
| |
| import org.apache.cassandra.db.ClusteringComparator; |
| import org.apache.cassandra.db.ClusteringPrefix; |
| import org.apache.cassandra.db.DeletionTime; |
| import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo; |
| import org.apache.cassandra.io.tries.IncrementalTrieWriter; |
| import org.apache.cassandra.io.tries.Walker; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.utils.bytecomparable.ByteComparable; |
| import org.apache.cassandra.utils.bytecomparable.ByteSource; |
| |
| /** |
| * Preparer / writer of row index tries that can be read by {@link RowIndexReader} and iterated by |
| * {@link RowIndexReverseIterator}. |
| * <p> |
| * Uses {@link IncrementalTrieWriter} to build a trie of index section separators of the shortest possible length such |
| * that {@code prevMax < separator <= nextMin}. |
| */ |
| class RowIndexWriter implements AutoCloseable |
| { |
| private final ClusteringComparator comparator; |
| private final IncrementalTrieWriter<IndexInfo> trie; |
| private ByteComparable prevMax = null; |
| private ByteComparable prevSep = null; |
| |
| RowIndexWriter(ClusteringComparator comparator, DataOutputPlus out) |
| { |
| this.comparator = comparator; |
| this.trie = IncrementalTrieWriter.open(RowIndexReader.trieSerializer, out); |
| } |
| |
| void reset() |
| { |
| prevMax = null; |
| prevSep = null; |
| trie.reset(); |
| } |
| |
| @Override |
| public void close() |
| { |
| trie.close(); |
| } |
| |
| void add(ClusteringPrefix<?> firstName, ClusteringPrefix<?> lastName, IndexInfo info) throws IOException |
| { |
| assert info.openDeletion != null; |
| ByteComparable sep = prevMax == null |
| ? ByteComparable.EMPTY |
| : ByteComparable.separatorGt(prevMax, comparator.asByteComparable(firstName)); |
| trie.add(sep, info); |
| prevSep = sep; |
| prevMax = comparator.asByteComparable(lastName); |
| } |
| |
| public long complete(long endPos) throws IOException |
| { |
| // Add a separator after the last section, so that greater inputs can be quickly rejected. |
| // To maximize its efficiency we add it with the length of the last added separator. |
| int i = 0; |
| ByteSource max = prevMax.asComparableBytes(Walker.BYTE_COMPARABLE_VERSION); |
| ByteSource sep = prevSep.asComparableBytes(Walker.BYTE_COMPARABLE_VERSION); |
| int c; |
| while ((c = max.next()) == sep.next() && c != ByteSource.END_OF_STREAM) |
| ++i; |
| assert c != ByteSource.END_OF_STREAM : "Corrupted row order, max=" + prevMax; |
| |
| trie.add(nudge(prevMax, i), new IndexInfo(endPos, DeletionTime.LIVE)); |
| |
| return trie.complete(); |
| } |
| |
| /** |
| * Produces a source that is slightly greater than argument with length at least nudgeAt. |
| */ |
| private ByteComparable nudge(ByteComparable value, int nudgeAt) |
| { |
| return version -> new ByteSource() |
| { |
| private final ByteSource v = value.asComparableBytes(version); |
| private int cur = 0; |
| |
| @Override |
| public int next() |
| { |
| int b = ByteSource.END_OF_STREAM; |
| if (cur <= nudgeAt) |
| { |
| b = v.next(); |
| if (cur == nudgeAt) |
| { |
| if (b < 0xFF) |
| ++b; |
| else |
| return b; // can't nudge here, increase next instead (eventually will be -1) |
| } |
| } |
| ++cur; |
| return b; |
| } |
| }; |
| } |
| } |