blob: 3bfcc03441e76136349e96fff6ec2c65d278d85c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkArgument;
/** The settings regarding RocksDBs memory usage. */
public final class RocksDBMemoryConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
/** Flag whether to use the managed memory budget for RocksDB. Null is not set. */
@Nullable private Boolean useManagedMemory;
/** The total memory for all RocksDB instances at this slot. Null is not set. */
@Nullable private MemorySize fixedMemoryPerSlot;
/**
* The maximum fraction of the total shared memory consumed by the write buffers. Null if not
* set.
*/
@Nullable private Double writeBufferRatio;
/**
* The high priority pool ratio in the shared cache, used for index & filter blocks. Null if not
* set.
*/
@Nullable private Double highPriorityPoolRatio;
/** Flag whether to use partition index/filters. Null if not set. */
@Nullable private Boolean usePartitionedIndexFilters;
// ------------------------------------------------------------------------
/**
* Configures RocksDB to use the managed memory of a slot. See {@link
* RocksDBOptions#USE_MANAGED_MEMORY} for details.
*/
public void setUseManagedMemory(boolean useManagedMemory) {
this.useManagedMemory = useManagedMemory;
}
/**
* Configures RocksDB to use a fixed amount of memory shared between all instances (operators)
* in a slot. See {@link RocksDBOptions#FIX_PER_SLOT_MEMORY_SIZE} for details.
*/
public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot) {
checkArgument(
fixedMemoryPerSlot == null || fixedMemoryPerSlot.getBytes() > 0,
"Total memory per slot must be > 0");
this.fixedMemoryPerSlot = fixedMemoryPerSlot;
}
/**
* Configures RocksDB to use a fixed amount of memory shared between all instances (operators)
* in a slot. See {@link #setFixedMemoryPerSlot(MemorySize)} for details.
*/
public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr) {
setFixedMemoryPerSlot(MemorySize.parse(totalMemoryPerSlotStr));
}
/**
* Sets the fraction of the total memory to be used for write buffers. This only has an effect
* is either {@link #setUseManagedMemory(boolean)} or {@link #setFixedMemoryPerSlot(MemorySize)}
* are set.
*
* <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details.
*/
public void setWriteBufferRatio(double writeBufferRatio) {
Preconditions.checkArgument(
writeBufferRatio > 0 && writeBufferRatio < 1.0,
"Write Buffer ratio %s must be in (0, 1)",
writeBufferRatio);
this.writeBufferRatio = writeBufferRatio;
}
/**
* Sets the fraction of the total memory to be used for high priority blocks like indexes,
* dictionaries, etc. This only has an effect is either {@link #setUseManagedMemory(boolean)} or
* {@link #setFixedMemoryPerSlot(MemorySize)} are set.
*
* <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details.
*/
public void setHighPriorityPoolRatio(double highPriorityPoolRatio) {
Preconditions.checkArgument(
highPriorityPoolRatio > 0 && highPriorityPoolRatio < 1.0,
"High priority pool ratio %s must be in (0, 1)",
highPriorityPoolRatio);
this.highPriorityPoolRatio = highPriorityPoolRatio;
}
/**
* Gets whether the state backend is configured to use the managed memory of a slot for RocksDB.
* See {@link RocksDBOptions#USE_MANAGED_MEMORY} for details.
*/
public boolean isUsingManagedMemory() {
return useManagedMemory != null
? useManagedMemory
: RocksDBOptions.USE_MANAGED_MEMORY.defaultValue();
}
/**
* Gets whether the state backend is configured to use a fixed amount of memory shared between
* all RocksDB instances (in all tasks and operators) of a slot. See {@link
* RocksDBOptions#USE_MANAGED_MEMORY} for details.
*/
public boolean isUsingFixedMemoryPerSlot() {
return fixedMemoryPerSlot != null;
}
/**
* Gets the fixed amount of memory to be shared between all RocksDB instances (in all tasks and
* operators) of a slot. Null is not configured. See {@link RocksDBOptions#USE_MANAGED_MEMORY}
* for details.
*/
@Nullable
public MemorySize getFixedMemoryPerSlot() {
return fixedMemoryPerSlot;
}
/**
* Gets the fraction of the total memory to be used for write buffers. This only has an effect
* is either {@link #setUseManagedMemory(boolean)} or {@link #setFixedMemoryPerSlot(MemorySize)}
* are set.
*
* <p>See {@link RocksDBOptions#WRITE_BUFFER_RATIO} for details.
*/
public double getWriteBufferRatio() {
return writeBufferRatio != null
? writeBufferRatio
: RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue();
}
/**
* Gets the fraction of the total memory to be used for high priority blocks like indexes,
* dictionaries, etc. This only has an effect is either {@link #setUseManagedMemory(boolean)} or
* {@link #setFixedMemoryPerSlot(MemorySize)} are set.
*
* <p>See {@link RocksDBOptions#HIGH_PRIORITY_POOL_RATIO} for details.
*/
public double getHighPriorityPoolRatio() {
return highPriorityPoolRatio != null
? highPriorityPoolRatio
: RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
}
/**
* Gets whether the state backend is configured to use partitioned index/filters for RocksDB.
*
* <p>See {@link RocksDBOptions#USE_PARTITIONED_INDEX_FILTERS} for details.
*/
public Boolean isUsingPartitionedIndexFilters() {
return usePartitionedIndexFilters != null
? usePartitionedIndexFilters
: RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue();
}
// ------------------------------------------------------------------------
/** Validates if the configured options are valid with respect to one another. */
public void validate() {
// As FLINK-15512 introduce a new mechanism to calculate the cache capacity,
// the relationship of write_buffer_manager_capacity and cache_capacity has changed to:
// write_buffer_manager_capacity / cache_capacity = 2 * writeBufferRatio / (3 -
// writeBufferRatio)
// we should ensure the sum of write buffer manager capacity and high priority pool less
// than cache capacity.
// TODO change the formula once FLINK-15532 resolved.
if (writeBufferRatio != null
&& highPriorityPoolRatio != null
&& 2 * writeBufferRatio / (3 - writeBufferRatio) + highPriorityPoolRatio >= 1.0) {
throw new IllegalArgumentException(
String.format(
"Invalid configuration: writeBufferRatio %s with highPriPoolRatio %s",
writeBufferRatio, highPriorityPoolRatio));
}
}
// ------------------------------------------------------------------------
/**
* Derives a RocksDBMemoryConfiguration from another object and a configuration. The values set
* on the other object take precedence, and the values from the configuration are used if no
* values are set on the other config object.
*/
public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
RocksDBMemoryConfiguration other, ReadableConfig config) {
final RocksDBMemoryConfiguration newConfig = new RocksDBMemoryConfiguration();
newConfig.useManagedMemory =
other.useManagedMemory != null
? other.useManagedMemory
: config.get(RocksDBOptions.USE_MANAGED_MEMORY);
newConfig.fixedMemoryPerSlot =
other.fixedMemoryPerSlot != null
? other.fixedMemoryPerSlot
: config.get(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE);
newConfig.writeBufferRatio =
other.writeBufferRatio != null
? other.writeBufferRatio
: config.get(RocksDBOptions.WRITE_BUFFER_RATIO);
newConfig.highPriorityPoolRatio =
other.highPriorityPoolRatio != null
? other.highPriorityPoolRatio
: config.get(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO);
newConfig.usePartitionedIndexFilters =
other.usePartitionedIndexFilters != null
? other.usePartitionedIndexFilters
: config.get(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);
return newConfig;
}
}