blob: e0dc7376d36c5a8b9ae1da99421737ca914f129a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.index.impl;
import static java.util.Collections.emptyNavigableSet;
import static java.util.Comparator.comparing;
import static org.apache.ignite.internal.storage.RowId.highestRowId;
import static org.apache.ignite.internal.storage.RowId.lowestRowId;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.util.TransformingIterator;
import org.jetbrains.annotations.Nullable;
/**
* Test implementation of MV sorted index storage.
*/
public class TestSortedIndexStorage extends AbstractTestIndexStorage implements SortedIndexStorage {
private final NavigableSet<IndexRow> index;
private final StorageSortedIndexDescriptor descriptor;
/** Constructor. */
public TestSortedIndexStorage(int partitionId, StorageSortedIndexDescriptor descriptor) {
super(partitionId, descriptor);
BinaryTupleComparator binaryTupleComparator = new BinaryTupleComparator(descriptor.columns());
this.descriptor = descriptor;
this.index = new ConcurrentSkipListSet<>(
comparing((IndexRow indexRow) -> indexRow.indexColumns().byteBuffer(), binaryTupleComparator)
.thenComparing(IndexRow::rowId)
);
}
@Override
public StorageSortedIndexDescriptor indexDescriptor() {
return descriptor;
}
@Override
Iterator<RowId> getRowIdIteratorForGetByBinaryTuple(BinaryTuple key) {
// These must be two different instances, because "scan" call messes up headers.
BinaryTuplePrefix lowerBound = BinaryTuplePrefix.fromBinaryTuple(key);
BinaryTuplePrefix higherBound = BinaryTuplePrefix.fromBinaryTuple(key);
PeekCursor<IndexRow> peekCursor = scan(lowerBound, higherBound, GREATER_OR_EQUAL | LESS_OR_EQUAL);
return new TransformingIterator<>(peekCursor, IndexRow::rowId);
}
@Override
public void put(IndexRow row) {
checkStorageClosed(false);
index.add(row);
}
@Override
public void remove(IndexRow row) {
checkStorageClosedOrInProcessOfRebalance(false);
index.remove(row);
}
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
return scanInternal(lowerBound, upperBound, flags, true);
}
@Override
public PeekCursor<IndexRow> tolerantScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
return scanInternal(lowerBound, upperBound, flags, false);
}
private IndexRowImpl prefixToIndexRow(BinaryTuplePrefix prefix, RowId rowId) {
var binaryTuple = new BinaryTuple(descriptor.binaryTupleSchema().elementCount(), prefix.byteBuffer());
return new IndexRowImpl(binaryTuple, rowId);
}
private static void setEqualityFlag(BinaryTuplePrefix prefix) {
ByteBuffer buffer = prefix.byteBuffer();
byte flags = buffer.get(0);
buffer.put(0, (byte) (flags | BinaryTupleCommon.EQUALITY_FLAG));
}
@Override
void clear0() {
index.clear();
}
private static final IndexRow NO_PEEKED_ROW = new IndexRowImpl(null, null);
private class ScanCursor implements PeekCursor<IndexRow> {
private final NavigableSet<IndexRow> indexSet;
@Nullable
private Boolean hasNext;
@Nullable
private IndexRow currentRow;
@Nullable
private IndexRow peekedRow = NO_PEEKED_ROW;
private ScanCursor(NavigableSet<IndexRow> indexSet) {
this.indexSet = indexSet;
}
@Override
public void close() {
pendingCursors.decrementAndGet();
}
@Override
public boolean hasNext() {
checkStorageClosedOrInProcessOfRebalance(true);
if (hasNext != null) {
return hasNext;
}
currentRow = peekedRow == NO_PEEKED_ROW ? peek() : peekedRow;
peekedRow = NO_PEEKED_ROW;
hasNext = currentRow != null;
return hasNext;
}
@Override
public IndexRow next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
this.hasNext = null;
return currentRow;
}
@Override
public @Nullable IndexRow peek() {
checkStorageClosedOrInProcessOfRebalance(true);
if (hasNext != null) {
return currentRow;
}
if (currentRow == null) {
try {
peekedRow = indexSet.first();
} catch (NoSuchElementException e) {
peekedRow = null;
}
} else {
peekedRow = indexSet.higher(this.currentRow);
}
return peekedRow;
}
}
/**
* Returns all indexed row ids.
*/
public Set<RowId> allRowsIds() {
return index.stream().map(IndexRow::rowId).collect(Collectors.toSet());
}
private PeekCursor<IndexRow> scanInternal(
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
boolean onlyBuiltIndex
) {
checkStorageClosedOrInProcessOfRebalance(true);
if (onlyBuiltIndex) {
throwExceptionIfIndexIsNotBuilt();
}
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
if (!includeLower && lowerBound != null) {
setEqualityFlag(lowerBound);
}
if (includeUpper && upperBound != null) {
setEqualityFlag(upperBound);
}
NavigableSet<IndexRow> navigableSet;
if (lowerBound == null && upperBound == null) {
navigableSet = index;
} else if (lowerBound == null) {
navigableSet = index.headSet(prefixToIndexRow(upperBound, highestRowId(partitionId)), true);
} else if (upperBound == null) {
navigableSet = index.tailSet(prefixToIndexRow(lowerBound, lowestRowId(partitionId)), true);
} else {
try {
navigableSet = index.subSet(
prefixToIndexRow(lowerBound, lowestRowId(partitionId)),
true,
prefixToIndexRow(upperBound, highestRowId(partitionId)),
true
);
} catch (IllegalArgumentException e) {
// Upper bound is below the lower bound.
navigableSet = emptyNavigableSet();
}
}
pendingCursors.incrementAndGet();
return new ScanCursor(navigableSet);
}
}