blob: 25f726f27778d76a0f873cc3a603a3ebff075511 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.rocksdb.instance;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.toStringName;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_ROW_ID_PREFIX;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.LEASE_PREFIX;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.rocksdb.IndexIdCursor;
import org.apache.ignite.internal.storage.rocksdb.IndexIdCursor.TableAndIndexId;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
/**
* Shared RocksDB instance for multiple tables. Managed directly by the engine.
*/
public final class SharedRocksDbInstance {
/** Write options. */
public static final WriteOptions DFLT_WRITE_OPTS = new WriteOptions().setDisableWAL(true);
/**
* Class that represents a Column Family for sorted indexes and all index IDs that map to this Column Family.
*
* <p>Sorted indexes that have the same order and type of indexed columns get mapped to the same Column Family in order to share
* the same comparator.
*/
private static class SortedIndexColumnFamily implements AutoCloseable {
final ColumnFamily columnFamily;
final Map<Integer, Integer> indexIdToTableId = new ConcurrentHashMap<>();
SortedIndexColumnFamily(ColumnFamily columnFamily) {
this.columnFamily = columnFamily;
}
SortedIndexColumnFamily(ColumnFamily columnFamily, Map<Integer, Integer> indexIdToTableId) {
this.columnFamily = columnFamily;
this.indexIdToTableId.putAll(indexIdToTableId);
}
@Override
public void close() {
columnFamily.handle().close();
}
}
/** RocksDB storage engine instance. */
public final RocksDbStorageEngine engine;
/** Path for the directory that stores the data. */
public final Path path;
/** RocksDB flusher instance. */
public final RocksDbFlusher flusher;
/** Rocks DB instance. */
public final RocksDB db;
/** Meta information instance that wraps {@link ColumnFamily} instance for meta column family. */
public final RocksDbMetaStorage meta;
/** Column Family for partition data. */
public final ColumnFamily partitionCf;
/** Column Family for GC queue. */
public final ColumnFamily gcQueueCf;
/** Column Family for Hash Index data. */
private final ColumnFamily hashIndexCf;
/** Column Family instances for different types of sorted indexes, identified by the column family name. */
private final ConcurrentMap<ByteArray, SortedIndexColumnFamily> sortedIndexCfsByName = new ConcurrentHashMap<>();
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock;
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
SharedRocksDbInstance(
RocksDbStorageEngine engine,
Path path,
IgniteSpinBusyLock busyLock,
RocksDbFlusher flusher,
RocksDB db,
RocksDbMetaStorage meta,
ColumnFamily partitionCf,
ColumnFamily gcQueueCf,
ColumnFamily hashIndexCf,
List<ColumnFamily> sortedIndexCfs
) {
this.engine = engine;
this.path = path;
this.busyLock = busyLock;
this.flusher = flusher;
this.db = db;
this.meta = meta;
this.partitionCf = partitionCf;
this.gcQueueCf = gcQueueCf;
this.hashIndexCf = hashIndexCf;
recoverExistingSortedIndexes(sortedIndexCfs);
}
private void recoverExistingSortedIndexes(List<ColumnFamily> sortedIndexCfs) {
for (ColumnFamily sortedIndexCf : sortedIndexCfs) {
var indexIdToTableId = new HashMap<Integer, Integer>();
try (var sortedIndexIdCursor = new IndexIdCursor(sortedIndexCf.newIterator(), null)) {
for (TableAndIndexId tableAndIndexId : sortedIndexIdCursor) {
indexIdToTableId.put(tableAndIndexId.indexId(), tableAndIndexId.tableId());
}
}
if (indexIdToTableId.isEmpty()) {
destroyColumnFamily(sortedIndexCf);
} else {
this.sortedIndexCfsByName.put(
new ByteArray(sortedIndexCf.nameBytes()),
new SortedIndexColumnFamily(sortedIndexCf, indexIdToTableId)
);
}
}
}
/**
* Utility method that performs range-deletion in the column family.
*/
public static void deleteByPrefix(WriteBatch writeBatch, ColumnFamily columnFamily, byte[] prefix) throws RocksDBException {
byte[] upperBound = incrementPrefix(prefix);
writeBatch.deleteRange(columnFamily.handle(), prefix, upperBound);
}
/**
* Stops the instance, freeing all allocated resources.
*/
public void stop() {
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
List<AutoCloseable> resources = new ArrayList<>();
resources.add(meta.columnFamily().handle());
resources.add(partitionCf.handle());
resources.add(gcQueueCf.handle());
resources.add(hashIndexCf.handle());
resources.addAll(sortedIndexCfsByName.values());
resources.add(db);
resources.add(flusher::stop);
try {
Collections.reverse(resources);
closeAll(resources);
} catch (Exception e) {
throw new StorageException("Failed to stop RocksDB storage: " + path, e);
}
}
/**
* Returns the Column Family containing all hash indexes.
*/
public ColumnFamily hashIndexCf() {
return hashIndexCf;
}
/**
* Returns a collection of all hash index IDs that currently exist in the storage.
*/
public Collection<Integer> hashIndexIds(int tableId) {
try (
var readOptions = new ReadOptions();
var upperBound = tableId == -1 ? null : new Slice(intToBytes(tableId + 1))
) {
// Using total order seek, because the cursor only uses table ID + index ID as the prefix.
readOptions
.setTotalOrderSeek(true)
.setIterateUpperBound(upperBound);
RocksIterator it = hashIndexCf.newIterator(readOptions);
try (var hashIndexIdCursor = new IndexIdCursor(it, tableId)) {
return hashIndexIdCursor.stream().map(TableAndIndexId::indexId).collect(toList());
}
}
}
/**
* Returns an "index ID - Column Family" mapping for all sorted indexes that currently exist in the storage.
*/
public List<IndexColumnFamily> sortedIndexes(int targetTableId) {
var result = new ArrayList<IndexColumnFamily>();
for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
indexCf.indexIdToTableId.forEach((indexId, tableId) -> {
if (tableId == targetTableId) {
result.add(new IndexColumnFamily(indexId, indexCf.columnFamily));
}
});
}
return result;
}
/**
* Returns Column Family instance with the desired name. Creates it if it doesn't exist. Tracks every created index by its
* {@code indexId}.
*/
public ColumnFamily getOrCreateSortedIndexCf(byte[] cfName, int indexId, int tableId) {
if (!busyLock.enterBusy()) {
throw new StorageClosedException();
}
try {
SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new ByteArray(cfName), (unused, sortedIndexCf) -> {
if (sortedIndexCf == null) {
sortedIndexCf = new SortedIndexColumnFamily(createSortedIndexCf(cfName));
}
sortedIndexCf.indexIdToTableId.put(indexId, tableId);
return sortedIndexCf;
});
return result.columnFamily;
} finally {
busyLock.leaveBusy();
}
}
/**
* Removes the given sorted index from this instance. This prevents this index to be returned by {@link #sortedIndexes} call.
*/
public void removeSortedIndex(int indexId, ColumnFamily cf) {
var cfNameBytes = new ByteArray(cf.nameBytes());
sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> {
indexCf.indexIdToTableId.remove(indexId);
return indexCf;
});
}
/**
* Schedules a drop of a column family after destroying an index, if it was the last index managed by that CF.
*/
public CompletableFuture<Void> scheduleIndexCfsDestroyIfNeeded(List<ColumnFamily> columnFamilies) {
assert !columnFamilies.isEmpty();
return flusher.awaitFlush(false)
.thenRunAsync(() -> {
if (!busyLock.enterBusy()) {
throw new StorageClosedException();
}
try {
columnFamilies.forEach(this::destroySortedIndexCfIfNeeded);
} finally {
busyLock.leaveBusy();
}
}, engine.threadPool());
}
void destroySortedIndexCfIfNeeded(ColumnFamily columnFamily) {
var cfNameBytes = new ByteArray(columnFamily.nameBytes());
sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf) -> {
if (!indexCf.indexIdToTableId.isEmpty()) {
return indexCf;
}
destroyColumnFamily(indexCf.columnFamily);
return null;
});
}
/**
* Removes all data associated with the given table ID in this storage.
*/
public void destroyTable(int targetTableId) {
try (WriteBatch writeBatch = new WriteBatch()) {
byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES)
.order(KEY_BYTE_ORDER)
.putInt(targetTableId)
.array();
deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_META_PREFIX, tableIdBytes));
deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes));
deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes));
deleteByPrefix(writeBatch, meta.columnFamily(), metaPrefix(LEASE_PREFIX, tableIdBytes));
var cfsToRemove = new ArrayList<ColumnFamily>();
for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
Iterator<Integer> it = indexCf.indexIdToTableId.values().iterator();
while (it.hasNext()) {
int tableId = it.next();
if (targetTableId == tableId) {
it.remove();
deleteByPrefix(writeBatch, indexCf.columnFamily, tableIdBytes);
cfsToRemove.add(indexCf.columnFamily);
}
}
}
db.write(DFLT_WRITE_OPTS, writeBatch);
if (!cfsToRemove.isEmpty()) {
scheduleIndexCfsDestroyIfNeeded(cfsToRemove);
}
} catch (RocksDBException e) {
throw new StorageException("Failed to destroy table data. [tableId={}]", e, targetTableId);
}
}
private static byte[] metaPrefix(byte[] metaPrefix, byte[] tableIdBytes) {
return ByteBuffer.allocate(metaPrefix.length + tableIdBytes.length)
.order(KEY_BYTE_ORDER)
.put(metaPrefix)
.put(tableIdBytes)
.array();
}
private ColumnFamily createSortedIndexCf(byte[] cfName) {
ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
ColumnFamily columnFamily;
try {
columnFamily = ColumnFamily.create(db, cfDescriptor);
} catch (RocksDBException e) {
throw new StorageException("Failed to create new RocksDB column family: " + toStringName(cfName), e);
}
flusher.addColumnFamily(columnFamily.handle());
return columnFamily;
}
private void destroyColumnFamily(ColumnFamily columnFamily) {
flusher.removeColumnFamily(columnFamily.handle());
try {
columnFamily.destroy();
} catch (RocksDBException e) {
throw new StorageException(
"Failed to destroy RocksDB Column Family. [cfName={}, path={}]",
e, columnFamily.name(), path
);
}
}
}