blob: 85bb44d58466f9bb142501b0582e207311690364 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* Enable which RocksDB metrics to forward to Flink's metrics reporter. All metrics report at the
* column family level and return unsigned long values.
*
* <p>Properties and doc comments are taken from RocksDB documentation. See <a
* href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429">
* db.h</a> for more information.
*/
public class RocksDBNativeMetricOptions implements Serializable {
private static final long serialVersionUID = 1L;
public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY =
"state.backend.rocksdb.metrics" + ".column-family-as-variable";
public static final ConfigOption<Boolean> MONITOR_NUM_IMMUTABLE_MEM_TABLES =
ConfigOptions.key(RocksDBProperty.NumImmutableMemTable.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of immutable memtables in RocksDB.");
public static final ConfigOption<Boolean> MONITOR_MEM_TABLE_FLUSH_PENDING =
ConfigOptions.key(RocksDBProperty.MemTableFlushPending.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of pending memtable flushes in RocksDB.");
public static final ConfigOption<Boolean> TRACK_COMPACTION_PENDING =
ConfigOptions.key(RocksDBProperty.CompactionPending.getConfigKey())
.defaultValue(false)
.withDescription(
"Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise.");
public static final ConfigOption<Boolean> MONITOR_BACKGROUND_ERRORS =
ConfigOptions.key(RocksDBProperty.BackgroundErrors.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of background errors in RocksDB.");
public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE =
ConfigOptions.key(RocksDBProperty.CurSizeActiveMemTable.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the approximate size of the active memtable in bytes.");
public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ALL_MEM_TABLE =
ConfigOptions.key(RocksDBProperty.CurSizeAllMemTables.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the approximate size of the active and unflushed immutable memtables"
+ " in bytes.");
public static final ConfigOption<Boolean> MONITOR_SIZE_ALL_MEM_TABLES =
ConfigOptions.key(RocksDBProperty.SizeAllMemTables.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the approximate size of the active, unflushed immutable, "
+ "and pinned immutable memtables in bytes.");
public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE =
ConfigOptions.key(RocksDBProperty.NumEntriesActiveMemTable.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the total number of entries in the active memtable.");
public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_IMM_MEM_TABLES =
ConfigOptions.key(RocksDBProperty.NumEntriesImmMemTables.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the total number of entries in the unflushed immutable memtables.");
public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE =
ConfigOptions.key(RocksDBProperty.NumDeletesActiveMemTable.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the total number of delete entries in the active memtable.");
public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_IMM_MEM_TABLE =
ConfigOptions.key(RocksDBProperty.NumDeletesImmMemTables.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the total number of delete entries in the unflushed immutable memtables.");
public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYS =
ConfigOptions.key(RocksDBProperty.EstimateNumKeys.getConfigKey())
.defaultValue(false)
.withDescription("Estimate the number of keys in RocksDB.");
public static final ConfigOption<Boolean> ESTIMATE_TABLE_READERS_MEM =
ConfigOptions.key(RocksDBProperty.EstimateTableReadersMem.getConfigKey())
.defaultValue(false)
.withDescription(
"Estimate the memory used for reading SST tables, excluding memory"
+ " used in block cache (e.g.,filter and index blocks) in bytes.");
public static final ConfigOption<Boolean> MONITOR_NUM_SNAPSHOTS =
ConfigOptions.key(RocksDBProperty.NumSnapshots.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of unreleased snapshots of the database.");
public static final ConfigOption<Boolean> MONITOR_NUM_LIVE_VERSIONS =
ConfigOptions.key(RocksDBProperty.NumLiveVersions.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor number of live versions. Version is an internal data structure. "
+ "See RocksDB file version_set.h for details. More live versions often mean more SST files are held "
+ "from being deleted, by iterators or unfinished compactions.");
public static final ConfigOption<Boolean> ESTIMATE_LIVE_DATA_SIZE =
ConfigOptions.key(RocksDBProperty.EstimateLiveDataSize.getConfigKey())
.defaultValue(false)
.withDescription(
"Estimate of the amount of live data in bytes (usually smaller than sst files size due to space amplification).");
public static final ConfigOption<Boolean> MONITOR_TOTAL_SST_FILES_SIZE =
ConfigOptions.key(RocksDBProperty.TotalSstFilesSize.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the total size (bytes) of all SST files of all versions."
+ "WARNING: may slow down online queries if there are too many files.");
public static final ConfigOption<Boolean> MONITOR_LIVE_SST_FILES_SIZE =
ConfigOptions.key(RocksDBProperty.LiveSstFilesSize.getConfigKey())
.booleanType()
.defaultValue(false)
.withDescription(
"Monitor the total size (bytes) of all SST files belonging to the latest version."
+ "WARNING: may slow down online queries if there are too many files.");
public static final ConfigOption<Boolean> ESTIMATE_PENDING_COMPACTION_BYTES =
ConfigOptions.key(RocksDBProperty.EstimatePendingCompactionBytes.getConfigKey())
.defaultValue(false)
.withDescription(
"Estimated total number of bytes compaction needs to rewrite to get all levels "
+ "down to under target size. Not valid for other compactions than level-based.");
public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_COMPACTIONS =
ConfigOptions.key(RocksDBProperty.NumRunningCompactions.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of currently running compactions.");
public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_FLUSHES =
ConfigOptions.key(RocksDBProperty.NumRunningFlushes.getConfigKey())
.defaultValue(false)
.withDescription("Monitor the number of currently running flushes.");
public static final ConfigOption<Boolean> MONITOR_ACTUAL_DELAYED_WRITE_RATE =
ConfigOptions.key(RocksDBProperty.ActualDelayedWriteRate.getConfigKey())
.defaultValue(false)
.withDescription(
"Monitor the current actual delayed write rate. 0 means no delay.");
public static final ConfigOption<Boolean> IS_WRITE_STOPPED =
ConfigOptions.key(RocksDBProperty.IsWriteStopped.getConfigKey())
.booleanType()
.defaultValue(false)
.withDescription(
"Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise.");
public static final ConfigOption<Boolean> BLOCK_CACHE_CAPACITY =
ConfigOptions.key(RocksDBProperty.BlockCacheCapacity.getConfigKey())
.booleanType()
.defaultValue(false)
.withDescription("Monitor block cache capacity.");
public static final ConfigOption<Boolean> BLOCK_CACHE_USAGE =
ConfigOptions.key(RocksDBProperty.BlockCacheUsage.getConfigKey())
.booleanType()
.defaultValue(false)
.withDescription(
"Monitor the memory size for the entries residing in block cache.");
public static final ConfigOption<Boolean> BLOCK_CACHE_PINNED_USAGE =
ConfigOptions.key(RocksDBProperty.BlockCachePinnedUsage.getConfigKey())
.booleanType()
.defaultValue(false)
.withDescription(
"Monitor the memory size for the entries being pinned in block cache.");
public static final ConfigOption<Boolean> COLUMN_FAMILY_AS_VARIABLE =
ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY)
.defaultValue(false)
.withDescription("Whether to expose the column family as a variable.");
/** Creates a {@link RocksDBNativeMetricOptions} based on an external configuration. */
public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config) {
RocksDBNativeMetricOptions options = new RocksDBNativeMetricOptions();
if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) {
options.enableNumImmutableMemTable();
}
if (config.get(MONITOR_MEM_TABLE_FLUSH_PENDING)) {
options.enableMemTableFlushPending();
}
if (config.get(TRACK_COMPACTION_PENDING)) {
options.enableCompactionPending();
}
if (config.get(MONITOR_BACKGROUND_ERRORS)) {
options.enableBackgroundErrors();
}
if (config.get(MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE)) {
options.enableCurSizeActiveMemTable();
}
if (config.get(MONITOR_CUR_SIZE_ALL_MEM_TABLE)) {
options.enableCurSizeAllMemTables();
}
if (config.get(MONITOR_SIZE_ALL_MEM_TABLES)) {
options.enableSizeAllMemTables();
}
if (config.get(MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE)) {
options.enableNumEntriesActiveMemTable();
}
if (config.get(MONITOR_NUM_ENTRIES_IMM_MEM_TABLES)) {
options.enableNumEntriesImmMemTables();
}
if (config.get(MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE)) {
options.enableNumDeletesActiveMemTable();
}
if (config.get(MONITOR_NUM_DELETES_IMM_MEM_TABLE)) {
options.enableNumDeletesImmMemTables();
}
if (config.get(ESTIMATE_NUM_KEYS)) {
options.enableEstimateNumKeys();
}
if (config.get(ESTIMATE_TABLE_READERS_MEM)) {
options.enableEstimateTableReadersMem();
}
if (config.get(MONITOR_NUM_SNAPSHOTS)) {
options.enableNumSnapshots();
}
if (config.get(MONITOR_NUM_LIVE_VERSIONS)) {
options.enableNumLiveVersions();
}
if (config.get(ESTIMATE_LIVE_DATA_SIZE)) {
options.enableEstimateLiveDataSize();
}
if (config.get(MONITOR_TOTAL_SST_FILES_SIZE)) {
options.enableTotalSstFilesSize();
}
if (config.get(MONITOR_LIVE_SST_FILES_SIZE)) {
options.enableLiveSstFilesSize();
}
if (config.get(ESTIMATE_PENDING_COMPACTION_BYTES)) {
options.enableEstimatePendingCompactionBytes();
}
if (config.get(MONITOR_NUM_RUNNING_COMPACTIONS)) {
options.enableNumRunningCompactions();
}
if (config.get(MONITOR_NUM_RUNNING_FLUSHES)) {
options.enableNumRunningFlushes();
}
if (config.get(MONITOR_ACTUAL_DELAYED_WRITE_RATE)) {
options.enableActualDelayedWriteRate();
}
if (config.get(IS_WRITE_STOPPED)) {
options.enableIsWriteStopped();
}
if (config.get(BLOCK_CACHE_CAPACITY)) {
options.enableBlockCacheCapacity();
}
if (config.get(BLOCK_CACHE_USAGE)) {
options.enableBlockCacheUsage();
}
if (config.get(BLOCK_CACHE_PINNED_USAGE)) {
options.enableBlockCachePinnedUsage();
}
options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE));
return options;
}
private final Set<String> properties;
private boolean columnFamilyAsVariable = COLUMN_FAMILY_AS_VARIABLE.defaultValue();
public RocksDBNativeMetricOptions() {
this.properties = new HashSet<>();
}
/** Returns number of immutable memtables that have not yet been flushed. */
public void enableNumImmutableMemTable() {
this.properties.add(RocksDBProperty.NumImmutableMemTable.getRocksDBProperty());
}
/** Returns 1 if a memtable flush is pending; otherwise, returns 0. */
public void enableMemTableFlushPending() {
this.properties.add(RocksDBProperty.MemTableFlushPending.getRocksDBProperty());
}
/** Returns 1 if at least one compaction is pending; otherwise, returns 0. */
public void enableCompactionPending() {
this.properties.add(RocksDBProperty.CompactionPending.getRocksDBProperty());
}
/** Returns accumulated number of background errors. */
public void enableBackgroundErrors() {
this.properties.add(RocksDBProperty.BackgroundErrors.getRocksDBProperty());
}
/** Returns approximate size of active memtable (bytes). */
public void enableCurSizeActiveMemTable() {
this.properties.add(RocksDBProperty.CurSizeActiveMemTable.getRocksDBProperty());
}
/** Returns approximate size of active and unflushed immutable memtables (bytes). */
public void enableCurSizeAllMemTables() {
this.properties.add(RocksDBProperty.CurSizeAllMemTables.getRocksDBProperty());
}
/**
* Returns approximate size of active, unflushed immutable, and pinned immutable memtables
* (bytes).
*/
public void enableSizeAllMemTables() {
this.properties.add(RocksDBProperty.SizeAllMemTables.getRocksDBProperty());
}
/** Returns total number of entries in the active memtable. */
public void enableNumEntriesActiveMemTable() {
this.properties.add(RocksDBProperty.NumEntriesActiveMemTable.getRocksDBProperty());
}
/** Returns total number of entries in the unflushed immutable memtables. */
public void enableNumEntriesImmMemTables() {
this.properties.add(RocksDBProperty.NumEntriesImmMemTables.getRocksDBProperty());
}
/** Returns total number of delete entries in the active memtable. */
public void enableNumDeletesActiveMemTable() {
this.properties.add(RocksDBProperty.NumDeletesActiveMemTable.getRocksDBProperty());
}
/** Returns total number of delete entries in the unflushed immutable memtables. */
public void enableNumDeletesImmMemTables() {
this.properties.add(RocksDBProperty.NumDeletesImmMemTables.getRocksDBProperty());
}
/**
* Returns estimated number of total keys in the active and unflushed immutable memtables and
* storage.
*/
public void enableEstimateNumKeys() {
this.properties.add(RocksDBProperty.EstimateNumKeys.getRocksDBProperty());
}
/**
* Returns estimated memory used for reading SST tables, excluding memory used in block cache
* (e.g.,filter and index blocks).
*/
public void enableEstimateTableReadersMem() {
this.properties.add(RocksDBProperty.EstimateTableReadersMem.getRocksDBProperty());
}
/** Returns number of unreleased snapshots of the database. */
public void enableNumSnapshots() {
this.properties.add(RocksDBProperty.NumSnapshots.getRocksDBProperty());
}
/**
* Returns number of live versions. `Version` is an internal data structure. See version_set.h
* for details. More live versions often mean more SST files are held from being deleted, by
* iterators or unfinished compactions.
*/
public void enableNumLiveVersions() {
this.properties.add(RocksDBProperty.NumLiveVersions.getRocksDBProperty());
}
/** Returns an estimate of the amount of live data in bytes. */
public void enableEstimateLiveDataSize() {
this.properties.add(RocksDBProperty.EstimateLiveDataSize.getRocksDBProperty());
}
/**
* Returns total size (bytes) of all SST files. <strong>WARNING</strong>: may slow down online
* queries if there are too many files.
*/
public void enableTotalSstFilesSize() {
this.properties.add(RocksDBProperty.TotalSstFilesSize.getRocksDBProperty());
}
public void enableLiveSstFilesSize() {
this.properties.add(RocksDBProperty.LiveSstFilesSize.getRocksDBProperty());
}
/**
* Returns estimated total number of bytes compaction needs to rewrite to get all levels down to
* under target size. Not valid for other compactions than level-based.
*/
public void enableEstimatePendingCompactionBytes() {
this.properties.add(RocksDBProperty.EstimatePendingCompactionBytes.getRocksDBProperty());
}
/** Returns the number of currently running compactions. */
public void enableNumRunningCompactions() {
this.properties.add(RocksDBProperty.NumRunningCompactions.getRocksDBProperty());
}
/** Returns the number of currently running flushes. */
public void enableNumRunningFlushes() {
this.properties.add(RocksDBProperty.NumRunningFlushes.getRocksDBProperty());
}
/** Returns the current actual delayed write rate. 0 means no delay. */
public void enableActualDelayedWriteRate() {
this.properties.add(RocksDBProperty.ActualDelayedWriteRate.getRocksDBProperty());
}
/** Returns 1 if write has been stopped. */
public void enableIsWriteStopped() {
this.properties.add(RocksDBProperty.IsWriteStopped.getRocksDBProperty());
}
/** Returns block cache capacity. */
public void enableBlockCacheCapacity() {
this.properties.add(RocksDBProperty.BlockCacheCapacity.getRocksDBProperty());
}
/** Returns the memory size for the entries residing in block cache. */
public void enableBlockCacheUsage() {
this.properties.add(RocksDBProperty.BlockCacheUsage.getRocksDBProperty());
}
/** Returns the memory size for the entries being pinned in block cache. */
public void enableBlockCachePinnedUsage() {
this.properties.add(RocksDBProperty.BlockCachePinnedUsage.getRocksDBProperty());
}
/** Returns the column family as variable. */
public void setColumnFamilyAsVariable(boolean columnFamilyAsVariable) {
this.columnFamilyAsVariable = columnFamilyAsVariable;
}
/** @return the enabled RocksDB metrics */
public Collection<String> getProperties() {
return Collections.unmodifiableCollection(properties);
}
/**
* {{@link RocksDBNativeMetricMonitor}} is enabled is any property is set.
*
* @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false otherwise.
*/
public boolean isEnabled() {
return !properties.isEmpty();
}
/**
* {{@link RocksDBNativeMetricMonitor}} Whether to expose the column family as a variable..
*
* @return true is column family to expose variable, false otherwise.
*/
public boolean isColumnFamilyAsVariable() {
return this.columnFamilyAsVariable;
}
}