| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.contrib.streaming.state; |
| |
| import org.rocksdb.BlockBasedTableConfig; |
| import org.rocksdb.BloomFilter; |
| import org.rocksdb.ColumnFamilyOptions; |
| import org.rocksdb.CompactionStyle; |
| import org.rocksdb.DBOptions; |
| import org.rocksdb.InfoLogLevel; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| |
| /** |
| * The {@code PredefinedOptions} are configuration settings for the {@link |
| * EmbeddedRocksDBStateBackend}. The various pre-defined choices are configurations that have been |
| * empirically determined to be beneficial for performance under different settings. |
| * |
| * <p>Some of these settings are based on experiments by the Flink community, some follow guides |
| * from the RocksDB project. |
| * |
| * <p>All of them effectively disable the RocksDB log by default because this file would grow |
| * indefinitely and will be deleted with the TM anyway. |
| */ |
| public enum PredefinedOptions { |
| |
| /** |
| * Default options for all settings, except that writes are not forced to the disk. |
| * |
| * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, there is no need |
| * to sync data to stable storage. |
| * |
| * <p>The following options are set: |
| * |
| * <ul> |
| * <li>setUseFsync(false) |
| * <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| * <li>setStatsDumpPeriodSec(0) |
| * </ul> |
| */ |
| DEFAULT { |
| |
| @Override |
| public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) { |
| return new DBOptions() |
| .setUseFsync(false) |
| .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| .setStatsDumpPeriodSec(0); |
| } |
| |
| @Override |
| public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) { |
| return new ColumnFamilyOptions(); |
| } |
| }, |
| |
| /** |
| * Pre-defined options for regular spinning hard disks. |
| * |
| * <p>This constant configures RocksDB with some options that lead empirically to better |
| * performance when the machines executing the system use regular spinning hard disks. |
| * |
| * <p>The following options are set: |
| * |
| * <ul> |
| * <li>setCompactionStyle(CompactionStyle.LEVEL) |
| * <li>setLevelCompactionDynamicLevelBytes(true) |
| * <li>setIncreaseParallelism(4) |
| * <li>setUseFsync(false) |
| * <li>setDisableDataSync(true) |
| * <li>setMaxOpenFiles(-1) |
| * <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| * <li>setStatsDumpPeriodSec(0) |
| * </ul> |
| * |
| * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, there is no need |
| * to sync data to stable storage. |
| */ |
| SPINNING_DISK_OPTIMIZED { |
| |
| @Override |
| public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) { |
| return new DBOptions() |
| .setIncreaseParallelism(4) |
| .setUseFsync(false) |
| .setMaxOpenFiles(-1) |
| .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| .setStatsDumpPeriodSec(0); |
| } |
| |
| @Override |
| public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) { |
| return new ColumnFamilyOptions() |
| .setCompactionStyle(CompactionStyle.LEVEL) |
| .setLevelCompactionDynamicLevelBytes(true); |
| } |
| }, |
| |
| /** |
| * Pre-defined options for better performance on regular spinning hard disks, at the cost of a |
| * higher memory consumption. |
| * |
| * <p><b>NOTE: These settings will cause RocksDB to consume a lot of memory for block caching |
| * and compactions. If you experience out-of-memory problems related to, RocksDB, consider |
| * switching back to {@link #SPINNING_DISK_OPTIMIZED}.</b> |
| * |
| * <p>The following options are set: |
| * |
| * <ul> |
| * <li>setLevelCompactionDynamicLevelBytes(true) |
| * <li>setTargetFileSizeBase(256 MBytes) |
| * <li>setMaxBytesForLevelBase(1 GByte) |
| * <li>setWriteBufferSize(64 MBytes) |
| * <li>setIncreaseParallelism(4) |
| * <li>setMinWriteBufferNumberToMerge(3) |
| * <li>setMaxWriteBufferNumber(4) |
| * <li>setUseFsync(false) |
| * <li>setMaxOpenFiles(-1) |
| * <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| * <li>setStatsDumpPeriodSec(0) |
| * <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes) |
| * <li>BlockBasedTableConfigsetBlockSize(128 KBytes) |
| * </ul> |
| * |
| * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, there is no need |
| * to sync data to stable storage. |
| */ |
| SPINNING_DISK_OPTIMIZED_HIGH_MEM { |
| |
| @Override |
| public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) { |
| return new DBOptions() |
| .setIncreaseParallelism(4) |
| .setUseFsync(false) |
| .setMaxOpenFiles(-1) |
| .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| .setStatsDumpPeriodSec(0); |
| } |
| |
| @Override |
| public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) { |
| |
| final long blockCacheSize = 256 * 1024 * 1024; |
| final long blockSize = 128 * 1024; |
| final long targetFileSize = 256 * 1024 * 1024; |
| final long writeBufferSize = 64 * 1024 * 1024; |
| |
| BloomFilter bloomFilter = new BloomFilter(); |
| handlesToClose.add(bloomFilter); |
| |
| return new ColumnFamilyOptions() |
| .setCompactionStyle(CompactionStyle.LEVEL) |
| .setLevelCompactionDynamicLevelBytes(true) |
| .setTargetFileSizeBase(targetFileSize) |
| .setMaxBytesForLevelBase(4 * targetFileSize) |
| .setWriteBufferSize(writeBufferSize) |
| .setMinWriteBufferNumberToMerge(3) |
| .setMaxWriteBufferNumber(4) |
| .setTableFormatConfig( |
| new BlockBasedTableConfig() |
| .setBlockCacheSize(blockCacheSize) |
| .setBlockSize(blockSize) |
| .setFilter(bloomFilter)); |
| } |
| }, |
| |
| /** |
| * Pre-defined options for Flash SSDs. |
| * |
| * <p>This constant configures RocksDB with some options that lead empirically to better |
| * performance when the machines executing the system use SSDs. |
| * |
| * <p>The following options are set: |
| * |
| * <ul> |
| * <li>setIncreaseParallelism(4) |
| * <li>setUseFsync(false) |
| * <li>setDisableDataSync(true) |
| * <li>setMaxOpenFiles(-1) |
| * <li>setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| * <li>setStatsDumpPeriodSec(0) |
| * </ul> |
| * |
| * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, there is no need |
| * to sync data to stable storage. |
| */ |
| FLASH_SSD_OPTIMIZED { |
| |
| @Override |
| public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) { |
| return new DBOptions() |
| .setIncreaseParallelism(4) |
| .setUseFsync(false) |
| .setMaxOpenFiles(-1) |
| .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) |
| .setStatsDumpPeriodSec(0); |
| } |
| |
| @Override |
| public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) { |
| return new ColumnFamilyOptions(); |
| } |
| }; |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Creates the {@link DBOptions}for this pre-defined setting. |
| * |
| * @param handlesToClose The collection to register newly created {@link |
| * org.rocksdb.RocksObject}s. |
| * @return The pre-defined options object. |
| */ |
| public abstract DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose); |
| |
| /** |
| * @return The pre-defined options object. |
| * @deprecated use {@link #createColumnOptions(Collection)} instead. |
| */ |
| public DBOptions createDBOptions() { |
| return createDBOptions(new ArrayList<>()); |
| } |
| |
| /** |
| * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting. |
| * |
| * @param handlesToClose The collection to register newly created {@link |
| * org.rocksdb.RocksObject}s. |
| * @return The pre-defined options object. |
| */ |
| public abstract ColumnFamilyOptions createColumnOptions( |
| Collection<AutoCloseable> handlesToClose); |
| |
| /** |
| * @return The pre-defined options object. |
| * @deprecated use {@link #createColumnOptions(Collection)} instead. |
| */ |
| public ColumnFamilyOptions createColumnOptions() { |
| return createColumnOptions(new ArrayList<>()); |
| } |
| } |