blob: 58cd207f6e11cb8dcbef3f4fe2001742ae162ebe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.rocksdb;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbProfileView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
import org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.RocksDB;
/**
* Storage engine implementation based on RocksDB.
*/
public class RocksDbStorageEngine implements StorageEngine {
/** Engine name. */
// TODO: KKK db vs Db
public static final String ENGINE_NAME = "rocksDb";
private static final IgniteLogger LOG = Loggers.forClass(RocksDbStorageEngine.class);
private static class RocksDbStorage implements ManuallyCloseable {
final RocksDbDataRegion dataRegion;
final SharedRocksDbInstance rocksDbInstance;
RocksDbStorage(RocksDbDataRegion dataRegion, SharedRocksDbInstance rocksDbInstance) {
this.dataRegion = dataRegion;
this.rocksDbInstance = rocksDbInstance;
}
@Override
public void close() throws Exception {
IgniteUtils.closeAllManually(rocksDbInstance::stop, dataRegion::stop);
}
}
static {
RocksDB.loadLibrary();
}
private final RocksDbStorageEngineConfiguration engineConfig;
private final StorageConfiguration storageConfiguration;
private final Path storagePath;
private final ExecutorService threadPool;
private final ScheduledExecutorService scheduledPool;
/**
* Mapping from the data region name to the shared RocksDB instance. Most likely, the association of shared
* instances with regions will be removed/revisited in the future.
*/
// TODO IGNITE-19762 Think of proper way to use regions and storages.
private final Map<String, RocksDbStorage> storageByRegionName = new ConcurrentHashMap<>();
private final LogSyncer logSyncer;
/**
* Constructor.
*
* @param nodeName Node name.
* @param engineConfig RocksDB storage engine configuration.
* @param storageConfiguration Storage configuration.
* @param storagePath Storage path.
* @param logSyncer Write-ahead log synchronizer.
*/
public RocksDbStorageEngine(String nodeName, RocksDbStorageEngineConfiguration engineConfig,
StorageConfiguration storageConfiguration, Path storagePath, LogSyncer logSyncer) {
this.engineConfig = engineConfig;
this.storageConfiguration = storageConfiguration;
this.storagePath = storagePath;
this.logSyncer = logSyncer;
threadPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
NamedThreadFactory.create(nodeName, "rocksdb-storage-engine-pool", LOG)
);
scheduledPool = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(nodeName, "rocksdb-storage-engine-scheduled-pool", LOG)
);
}
/**
* Returns a RocksDB storage engine configuration.
*/
public RocksDbStorageEngineConfiguration configuration() {
return engineConfig;
}
/**
* Returns a common thread pool for async operations.
*/
public ExecutorService threadPool() {
return threadPool;
}
/**
* Returns a scheduled thread pool for async operations.
*/
public ScheduledExecutorService scheduledPool() {
return scheduledPool;
}
public LogSyncer logSyncer() {
return logSyncer;
}
@Override
public String name() {
return ENGINE_NAME;
}
@Override
public void start() throws StorageException {
// TODO: IGNITE-17066 Add handling deleting/updating data regions configuration
storageConfiguration.profiles().value().stream().forEach(p -> {
if (p instanceof RocksDbProfileView) {
registerDataRegion((RocksDbProfileView) p);
}
});
}
private void registerDataRegion(RocksDbProfileView dataRegionView) {
String regionName = dataRegionView.name();
var region = new RocksDbDataRegion(dataRegionView);
region.start();
SharedRocksDbInstance rocksDbInstance = newRocksDbInstance(regionName, region);
RocksDbStorage previousStorage = storageByRegionName.put(regionName, new RocksDbStorage(region, rocksDbInstance));
assert previousStorage == null : regionName;
}
private SharedRocksDbInstance newRocksDbInstance(String regionName, RocksDbDataRegion region) {
try {
return new SharedRocksDbInstanceCreator().create(this, region, storagePath.resolve("rocksdb-" + regionName));
} catch (Exception e) {
throw new StorageException("Failed to create new RocksDB instance", e);
}
}
@Override
public void stop() throws StorageException {
try {
IgniteUtils.closeAllManually(storageByRegionName.values());
} catch (Exception e) {
throw new StorageException("Error when stopping the rocksdb engine", e);
}
IgniteUtils.shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS);
IgniteUtils.shutdownAndAwaitTermination(scheduledPool, 10, TimeUnit.SECONDS);
}
@Override
public boolean isVolatile() {
return false;
}
@Override
public RocksDbTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier
) throws StorageException {
String regionName = tableDescriptor.getStorageProfile();
RocksDbStorage storage = storageByRegionName.get(regionName);
assert storage != null :
String.format("RocksDB instance has not yet been created for [tableId=%d, region=%s]", tableDescriptor.getId(), regionName);
var tableStorage = new RocksDbTableStorage(storage.rocksDbInstance, tableDescriptor, indexDescriptorSupplier);
tableStorage.start();
return tableStorage;
}
@Override
public void dropMvTable(int tableId) {
for (RocksDbStorage rocksDbStorage : storageByRegionName.values()) {
rocksDbStorage.rocksDbInstance.destroyTable(tableId);
}
}
}