blob: 48fa17924d07a99bb6b7cb2f31b73bd9bf289f4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.raft.storage.impl;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Platform;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.Options;
import org.rocksdb.Priority;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.util.SizeUnit;
/**
* {@link LogStorageFactoryCreator} for volatile log storage.
*/
public class VolatileLogStorageFactoryCreator implements LogStorageFactoryCreator, IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(VolatileLogStorageFactoryCreator.class);
/** Database path. */
private final Path spillOutPath;
/** Database options. */
private DBOptions dbOptions;
/** Shared db instance. */
private RocksDB db;
/** Shared data column family handle. */
private ColumnFamilyHandle columnFamily;
/** Executor for spill-out RocksDB tasks. */
private final ExecutorService executorService;
/**
* Create a new instance.
*
* @param spillOutPath Path at which to put spill-out data.
*/
public VolatileLogStorageFactoryCreator(String nodeName, Path spillOutPath) {
this.spillOutPath = Objects.requireNonNull(spillOutPath);
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
NamedThreadFactory.create(nodeName, "raft-volatile-log-rocksdb-spillout-pool", LOG)
);
}
@Override
public CompletableFuture<Void> startAsync() {
try {
Files.createDirectories(spillOutPath);
} catch (IOException e) {
throw new IllegalStateException("Failed to create directory: " + this.spillOutPath, e);
}
wipeOutDb();
dbOptions = createDbOptions();
ColumnFamilyOptions cfOption = createColumnFamilyOptions();
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY, cfOption)
);
try {
db = RocksDB.open(this.dbOptions, this.spillOutPath.toString(), columnFamilyDescriptors, columnFamilyHandles);
// Setup rocks thread pools to utilize all the available cores as the database is shared among
// all the raft groups
Env env = db.getEnv();
// Setup background flushes pool
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), Priority.HIGH);
// Setup background compactions pool
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), Priority.LOW);
assert (columnFamilyHandles.size() == 1);
this.columnFamily = columnFamilyHandles.get(0);
} catch (Exception e) {
throw new RuntimeException(e);
}
return nullCompletedFuture();
}
private void wipeOutDb() {
try (var options = new Options()) {
RocksDB.destroyDB(spillOutPath.toString(), options);
} catch (RocksDBException e) {
throw new IgniteInternalException("Cannot destroy spill-out RocksDB at " + spillOutPath, e);
}
}
/**
* Creates database options.
*
* @return Default database options.
*/
private static DBOptions createDbOptions() {
return new DBOptions()
.setMaxBackgroundJobs(Runtime.getRuntime().availableProcessors() * 2)
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
}
/**
* Creates column family options.
*
* @return Default column family options.
*/
private static ColumnFamilyOptions createColumnFamilyOptions() {
var opts = new ColumnFamilyOptions();
// TODO: IGNITE-17560 - parameterize via configuration
opts.setWriteBufferSize(64 * SizeUnit.MB);
opts.setMaxWriteBufferNumber(5);
opts.setMinWriteBufferNumberToMerge(1);
opts.setLevel0FileNumCompactionTrigger(50);
opts.setLevel0SlowdownWritesTrigger(100);
opts.setLevel0StopWritesTrigger(200);
// Size of level 0 which is (in stable state) equal to
// WriteBufferSize * MinWriteBufferNumberToMerge * Level0FileNumCompactionTrigger
opts.setMaxBytesForLevelBase(3200 * SizeUnit.MB);
opts.setTargetFileSizeBase(320 * SizeUnit.MB);
if (!Platform.isWindows()) {
opts.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setCompactionStyle(CompactionStyle.LEVEL)
.optimizeLevelStyleCompaction();
}
return opts;
}
@Override
public CompletableFuture<Void> stopAsync() {
ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
try {
closeAll(columnFamily, db, dbOptions);
} catch (Exception e) {
return failedFuture(e);
}
return nullCompletedFuture();
}
@Override
public LogStorageFactory factory(LogStorageBudgetView budgetView) {
return new VolatileLogStorageFactory(budgetView, db, columnFamily, executorService);
}
}