blob: 66bfa43e36453bb09595a84e215872079a825cbf [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
#if [ -z $1 ] || [ -z $2 ]; then
# echo "Usage: ./test_rocksdb_state_memory_control.sh "
# exit 1
#fi
source "$(dirname "$0")"/common.sh
PARALLELISM=2
CHECKPOINT_DIR="$TEST_DATA_DIR/test_rocksdb_state_memory_control-dir"
mkdir -p $CHECKPOINT_DIR
CHECKPOINT_DIR_URI="file://$CHECKPOINT_DIR"
# The managed memory is fixed at 300m which gives an allowed cache size of about 157,000,000b,
# via the compensation logic for RocksDBs memory exceeding the cache.
#
# Due to RocksDB's lenient (non-strict) memory accounting, we add num-states * arena-size MBs in extra tolerance
# (because in corner cases, especially with slow I/O, cache footprint can temporarily rise)
# which brings this to about 190,000,000 bytes, rounded up to 200,000,000 for a safety/stability margin.
#
# With unrestricted memory use, this test would use more than 400m of RocksDB memory, so we are
# well below this limit, thus testing that the memory limiting is actually active.
EXPECTED_MAX_MEMORY_USAGE=200000000
set_config_key "taskmanager.numberOfTaskSlots" "$PARALLELISM"
set_config_key "taskmanager.memory.process.size" "1184m"
set_config_key "taskmanager.memory.managed.size" "300m"
set_config_key "state.backend.rocksdb.memory.managed" "true"
set_config_key "state.backend.rocksdb.memory.write-buffer-ratio" "0.8"
set_config_key "state.backend.rocksdb.metrics.size-all-mem-tables" "true"
set_config_key "state.backend.rocksdb.metrics.cur-size-active-mem-table" "true"
set_config_key "state.backend.rocksdb.metrics.num-immutable-mem-table" "true"
set_config_key "state.backend.rocksdb.metrics.block-cache-usage" "true"
set_config_key "state.backend.rocksdb.metrics.estimate-table-readers-mem" "true"
set_config_key "metrics.fetcher.update-interval" "1000"
setup_flink_slf4j_metric_reporter
start_cluster
echo "Running RocksDB state backend memory control test"
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-rocksdb-state-memory-control-test/target/RocksDBStateMemoryControlTestProgram.jar
function buildBaseJobCmd {
local flink_args="$1"
echo "$FLINK_DIR/bin/flink run -d ${flink_args} -p $PARALLELISM $TEST_PROGRAM_JAR \
--environment.parallelism $PARALLELISM \
--environment.checkpoint_interval 600000 \
--state_backend rocks \
--state_backend.checkpoint_directory $CHECKPOINT_DIR_URI \
--state_backend.rocks.incremental true \
--sequence_generator_source.sleep_time 1 \
--sequence_generator_source.keyspace 1000000 \
--sequence_generator_source.payload_size 50000 \
--useValueState true \
--useListState true \
--useMapState true \
--useWindow true \
--sequence_generator_source.sleep_after_elements 1"
}
function find_max_block_cache_usage() {
OPERATOR=$1
JOB_NAME="${2:-General purpose test job}"
N=$(grep ".${JOB_NAME}.$OPERATOR.rocksdb.block-cache-usage:" $FLINK_LOG_DIR/*taskexecutor*.log | sed 's/.* //g' | sort -rn | head -n 1)
if [ -z $N ]; then
N=0
fi
echo $N
}
function memory_under_limit() {
local MAX_BLOCK_CACHE_USAGE=$1
local EXPECTED_MAX_MEMORY_USAGE=$2
echo "[INFO] Current block cache usage for RocksDB instance in slot was $MAX_BLOCK_CACHE_USAGE"
if [ "$MAX_BLOCK_CACHE_USAGE" -gt "$EXPECTED_MAX_MEMORY_USAGE" ]; then
echo "[ERROR] Current block cache usage $MAX_BLOCK_CACHE_USAGE larger than expected memory limit $EXPECTED_MAX_MEMORY_USAGE"
exit 1
fi
}
JOB_CMD=`buildBaseJobCmd `
DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running $DATASTREAM_JOB
wait_oper_metric_num_in_records TumblingWindowOperator.0 10000 'RocksDB test job'
cancel_job $DATASTREAM_JOB
MAX_BLOCK_CACHE_0_USAGE=$(find_max_block_cache_usage 'TumblingWindowOperator.0.window-contents' 'RocksDB test job')
MAX_BLOCK_CACHE_1_USAGE=$(find_max_block_cache_usage 'TumblingWindowOperator.1.window-contents' 'RocksDB test job')
memory_under_limit $MAX_BLOCK_CACHE_0_USAGE $EXPECTED_MAX_MEMORY_USAGE
memory_under_limit $MAX_BLOCK_CACHE_1_USAGE $EXPECTED_MAX_MEMORY_USAGE