| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.raft.storage.impl; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY; |
| |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.raft.storage.LogStorageFactory; |
| import org.apache.ignite.internal.rocksdb.RocksUtils; |
| import org.apache.ignite.internal.thread.NamedThreadFactory; |
| import org.apache.ignite.raft.jraft.option.RaftOptions; |
| import org.apache.ignite.raft.jraft.storage.LogStorage; |
| import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; |
| import org.apache.ignite.raft.jraft.util.Platform; |
| import org.jetbrains.annotations.TestOnly; |
| import org.rocksdb.ColumnFamilyDescriptor; |
| import org.rocksdb.ColumnFamilyHandle; |
| import org.rocksdb.ColumnFamilyOptions; |
| import org.rocksdb.CompactionStyle; |
| import org.rocksdb.CompressionType; |
| import org.rocksdb.DBOptions; |
| import org.rocksdb.Env; |
| import org.rocksdb.Priority; |
| import org.rocksdb.RocksDB; |
| import org.rocksdb.RocksDBException; |
| import org.rocksdb.WriteBatch; |
| import org.rocksdb.util.SizeUnit; |
| |
| /** Implementation of the {@link LogStorageFactory} that creates {@link RocksDbSharedLogStorage}s. */ |
| public class DefaultLogStorageFactory implements LogStorageFactory { |
| private static final IgniteLogger LOG = Loggers.forClass(DefaultLogStorageFactory.class); |
| |
| /** Database path. */ |
| private final Path path; |
| |
| /** Executor for shared storages. */ |
| private final ExecutorService executorService; |
| |
| /** Database instance shared across log storages. */ |
| private RocksDB db; |
| |
| /** Database options. */ |
| private DBOptions dbOptions; |
| |
| /** Configuration column family handle. */ |
| private ColumnFamilyHandle confHandle; |
| |
| /** Data column family handle. */ |
| private ColumnFamilyHandle dataHandle; |
| |
| /** |
| * Thread-local batch instance, used by {@link RocksDbSharedLogStorage#appendEntriesToBatch(List)} and |
| * {@link RocksDbSharedLogStorage#commitWriteBatch()}. |
| * <br> |
| * Shared between instances to provide more efficient way of executing batch updates. |
| */ |
| @SuppressWarnings("ThreadLocalNotStaticFinal") |
| private final ThreadLocal<WriteBatch> threadLocalWriteBatch = new ThreadLocal<>(); |
| |
| /** |
| * Constructor. |
| * |
| * @param path Path to the storage. |
| */ |
| @TestOnly |
| public DefaultLogStorageFactory(Path path) { |
| this("test", path); |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param path Path to the storage. |
| */ |
| public DefaultLogStorageFactory(String nodeName, Path path) { |
| this.path = path; |
| |
| executorService = Executors.newSingleThreadExecutor( |
| NamedThreadFactory.create(nodeName, "raft-shared-log-storage-pool", LOG) |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void start() { |
| try { |
| Files.createDirectories(path); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to create directory: " + this.path, e); |
| } |
| |
| List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); |
| |
| this.dbOptions = createDbOptions(); |
| |
| ColumnFamilyOptions cfOption = createColumnFamilyOptions(); |
| |
| |
| List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of( |
| // Column family to store configuration log entry. |
| new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), cfOption), |
| // Default column family to store user data log entry. |
| new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY, cfOption) |
| ); |
| |
| try { |
| this.db = RocksDB.open(this.dbOptions, this.path.toString(), columnFamilyDescriptors, columnFamilyHandles); |
| |
| // Setup rocks thread pools to utilize all the available cores as the database is shared among |
| // all the raft groups |
| Env env = db.getEnv(); |
| // Setup background flushes pool |
| env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), Priority.HIGH); |
| // Setup background compactions pool |
| env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), Priority.LOW); |
| |
| assert (columnFamilyHandles.size() == 2); |
| this.confHandle = columnFamilyHandles.get(0); |
| this.dataHandle = columnFamilyHandles.get(1); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void close() { |
| ExecutorServiceHelper.shutdownAndAwaitTermination(executorService); |
| |
| RocksUtils.closeAll(confHandle, dataHandle, db, dbOptions); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public LogStorage createLogStorage(String groupId, RaftOptions raftOptions) { |
| return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle, groupId, raftOptions, executorService); |
| } |
| |
| @Override |
| public void sync() throws RocksDBException { |
| db.syncWal(); |
| } |
| |
| /** |
| * Returns a thread-local {@link WriteBatch} instance, attached to current factory, append data from multiple storages at the same time. |
| */ |
| WriteBatch getOrCreateThreadLocalWriteBatch() { |
| WriteBatch writeBatch = threadLocalWriteBatch.get(); |
| |
| if (writeBatch == null) { |
| writeBatch = new WriteBatch(); |
| |
| threadLocalWriteBatch.set(writeBatch); |
| } |
| |
| return writeBatch; |
| } |
| |
| /** |
| * Clears {@link WriteBatch} returned by {@link #getOrCreateThreadLocalWriteBatch()}. |
| */ |
| void clearThreadLocalWriteBatch() { |
| WriteBatch writeBatch = threadLocalWriteBatch.get(); |
| |
| if (writeBatch != null) { |
| writeBatch.close(); |
| |
| threadLocalWriteBatch.set(null); |
| } |
| } |
| |
| /** |
| * Creates database options. |
| * |
| * @return Default database options. |
| */ |
| private static DBOptions createDbOptions() { |
| return new DBOptions() |
| .setMaxBackgroundJobs(Runtime.getRuntime().availableProcessors() * 2) |
| .setCreateIfMissing(true) |
| .setCreateMissingColumnFamilies(true); |
| } |
| |
| /** |
| * Creates column family options. |
| * |
| * @return Default column family options. |
| */ |
| private static ColumnFamilyOptions createColumnFamilyOptions() { |
| var opts = new ColumnFamilyOptions(); |
| |
| opts.setWriteBufferSize(64 * SizeUnit.MB); |
| opts.setMaxWriteBufferNumber(5); |
| opts.setMinWriteBufferNumberToMerge(1); |
| opts.setLevel0FileNumCompactionTrigger(50); |
| opts.setLevel0SlowdownWritesTrigger(100); |
| opts.setLevel0StopWritesTrigger(200); |
| // Size of level 0 which is (in stable state) equal to |
| // WriteBufferSize * MinWriteBufferNumberToMerge * Level0FileNumCompactionTrigger |
| opts.setMaxBytesForLevelBase(3200 * SizeUnit.MB); |
| opts.setTargetFileSizeBase(320 * SizeUnit.MB); |
| |
| if (!Platform.isWindows()) { |
| opts.setCompressionType(CompressionType.LZ4_COMPRESSION) |
| .setCompactionStyle(CompactionStyle.LEVEL) |
| .optimizeLevelStyleCompaction(); |
| } |
| |
| return opts; |
| } |
| } |