blob: 89c9ed68fbb6904767b8d9e2177777cafd0e14ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.rocksdb.index;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
import static org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.HashUtils;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchWithIndex;
/**
* {@link HashIndexStorage} implementation based on RocksDB.
*
* <p>This storage uses the following format for keys:
* <pre>
* Table ID - 4 bytes
* Index ID - 4 bytes
* Partition ID - 2 bytes
* Tuple hash - 4 bytes
* Tuple value - variable length
* Row ID (UUID) - 16 bytes
* </pre>
*
* <p>We use an empty array as values, because all required information can be extracted from the key.
*/
public class RocksDbHashIndexStorage extends AbstractRocksDbIndexStorage implements HashIndexStorage {
/** Length of the fixed part of the key: Table ID + Index ID + Partition ID + Hash. */
public static final int FIXED_PREFIX_LENGTH = PREFIX_WITH_IDS_LENGTH + Integer.BYTES;
private final StorageHashIndexDescriptor descriptor;
private final ColumnFamily indexCf;
/** Constant prefix of every index key. */
private final byte[] constantPrefix;
/**
* Creates a new Hash Index storage.
*
* @param descriptor Index descriptor.
* @param tableId Table ID.
* @param partitionId Partition ID.
* @param indexCf Column family that stores the index data.
* @param indexMetaStorage Index meta storage.
*/
public RocksDbHashIndexStorage(
StorageHashIndexDescriptor descriptor,
int tableId,
int partitionId,
ColumnFamily indexCf,
RocksDbMetaStorage indexMetaStorage
) {
super(tableId, descriptor.id(), partitionId, indexMetaStorage, descriptor.isPk());
this.descriptor = descriptor;
this.indexCf = indexCf;
this.constantPrefix = ByteBuffer.allocate(PREFIX_WITH_IDS_LENGTH)
.order(KEY_BYTE_ORDER)
.putInt(tableId)
.putInt(descriptor.id())
.putShort((short) partitionId)
.array();
}
@Override
public StorageHashIndexDescriptor indexDescriptor() {
return descriptor;
}
@Override
public Cursor<RowId> get(BinaryTuple key) {
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
throwExceptionIfIndexNotBuilt();
byte[] rangeStart = rocksPrefix(key);
byte[] rangeEnd = incrementPrefix(rangeStart);
return new UpToDatePeekCursor<RowId>(rangeEnd, indexCf, rangeStart) {
@Override
protected RowId map(ByteBuffer byteBuffer) {
// RowId UUID is located at the last 16 bytes of the key
long mostSignificantBits = byteBuffer.getLong(rangeStart.length);
long leastSignificantBits = byteBuffer.getLong(rangeStart.length + Long.BYTES);
return new RowId(partitionId, mostSignificantBits, leastSignificantBits);
}
};
});
}
@Override
public void put(IndexRow row) {
busyNonDataRead(() -> {
try {
WriteBatchWithIndex writeBatch = PartitionDataHelper.requireWriteBatch();
writeBatch.put(indexCf.handle(), rocksKey(row), BYTE_EMPTY_ARRAY);
return null;
} catch (RocksDBException e) {
throw new StorageException("Unable to insert data into hash index. Index ID: " + descriptor.id(), e);
}
});
}
@Override
public void remove(IndexRow row) {
busyNonDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
try {
WriteBatchWithIndex writeBatch = PartitionDataHelper.requireWriteBatch();
writeBatch.delete(indexCf.handle(), rocksKey(row));
return null;
} catch (RocksDBException e) {
throw new StorageException("Unable to remove data from hash index. Index ID: " + descriptor.id(), e);
}
});
}
private byte[] rocksPrefix(BinaryTuple prefix) {
return rocksPrefix(prefix, 0).array();
}
private ByteBuffer rocksPrefix(BinaryTuple prefix, int extraLength) {
ByteBuffer keyBytes = prefix.byteBuffer();
return ByteBuffer.allocate(FIXED_PREFIX_LENGTH + keyBytes.remaining() + extraLength)
.order(KEY_BYTE_ORDER)
.put(constantPrefix)
.putInt(HashUtils.hash32(keyBytes))
.put(keyBytes);
}
private byte[] rocksKey(IndexRow row) {
RowId rowId = row.rowId();
// We don't store the Partition ID as it is already a part of the key.
return rocksPrefix(row.indexColumns(), ROW_ID_SIZE)
.putLong(rowId.mostSignificantBits())
.putLong(rowId.leastSignificantBits())
.array();
}
@Override
public void clearIndex(WriteBatch writeBatch) throws RocksDBException {
deleteByPrefix(writeBatch, indexCf, constantPrefix);
}
}