| #!/bin/bash |
| |
| PID=$$ |
| |
| function usage() |
| { |
| echo "This tool is for manual compact specified table(app)." |
| echo "USAGE: $0 -c cluster -a app-name [-t periodic|once] [-w] [-g trigger_time] [...]" |
| echo "Options:" |
| echo " -h|--help print help message" |
| echo |
| echo " -c|--cluster <str> cluster meta server list, default is \"127.0.0.1:34601,127.0.0.1:34602\"" |
| echo |
| echo " -a|--app_name <str> target table(app) name" |
| echo |
| echo " -t|--type <str> manual compact type, should be periodic or once, default is once" |
| echo |
| echo " -w|--wait_only this option is only used when the type is once!" |
| echo " not trigger but only wait the last once compact to finish" |
| echo |
| echo " -g|--trigger_time <str> this option is only used when the type is periodic!" |
| echo " specify trigger time of periodic compact in 24-hour format," |
| echo " e.g. \"3:00,21:00\" means 3:00 and 21:00 everyday" |
| echo |
| echo " --target_level <num> number in range of [-1,num_levels], -1 means automatically, default is -1" |
| echo |
| echo " --bottommost_level_compaction <skip|force>" |
| echo " skip or force, default is skip" |
| echo " more details: https://github.com/facebook/rocksdb/wiki/Manual-Compaction" |
| echo |
| echo " --max_concurrent_running_count <num>" |
| echo " max concurrent running count limit, should be positive integer." |
| echo " if not set, means no limit." |
| echo |
| echo "for example:" |
| echo |
| echo " 1) Start once type manual compact with default options:" |
| echo |
| echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp" |
| echo |
| echo " 2) Only wait last once type manual compact to finish:" |
| echo |
| echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp -w" |
| echo |
| echo " 3) Config periodic type manual compact with specified options:" |
| echo |
| echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp -t periodic -g 3:00,21:00 \\" |
| echo " --target_level 2 --bottommost_level_compaction force" |
| echo |
| } |
| |
| # get_env cluster app_name key |
| function get_env() |
| { |
| cluster=$1 |
| app_name=$2 |
| key=$3 |
| |
| log_file="/tmp/$UID.$PID.pegasus.get_app_envs.${app_name}" |
| echo -e "use ${app_name}\n get_app_envs" | ./run.sh shell --cluster ${cluster} &>${log_file} |
| get_fail=`grep 'get app env failed' ${log_file} | wc -l` |
| if [ ${get_fail} -eq 1 ]; then |
| echo "ERROR: get app envs failed, refer to ${log_file}" |
| exit 1 |
| fi |
| grep "^${key} =" ${log_file} | awk '{print $3}' |
| } |
| |
| # set_env cluster app_name key value |
| function set_env() |
| { |
| cluster=$1 |
| app_name=$2 |
| key=$3 |
| value=$4 |
| |
| echo "set_app_envs ${key}=${value}" |
| log_file="/tmp/$UID.$PID.pegasus.set_app_envs.${app_name}" |
| echo -e "use ${app_name}\n set_app_envs ${key} ${value}" | ./run.sh shell --cluster ${cluster} &>${log_file} |
| set_fail=`grep 'set app env failed' ${log_file} | wc -l` |
| if [ ${set_fail} -eq 1 ]; then |
| echo "ERROR: set app envs failed, refer to ${log_file}" |
| exit 1 |
| fi |
| } |
| |
| # wait_manual_compact app_id trigger_time total_replica_count |
| function wait_manual_compact() |
| { |
| app_id=$1 |
| trigger_time=$2 |
| total_replica_count=$3 |
| |
| query_cmd="remote_command -t replica-server replica.query-compact ${app_id}" |
| earliest_finish_time_ms=$(date -d @${trigger_time} +"%Y-%m-%d %H:%M:%S.000") |
| echo "Checking once compact progress since [$trigger_time] [$earliest_finish_time_ms]..." |
| |
| slept=0 |
| while true |
| do |
| query_log_file="/tmp/$UID.$PID.pegasus.query_compact.${app_id}" |
| echo "${query_cmd}" | ./run.sh shell --cluster ${cluster} &>${query_log_file} |
| |
| queue_count=`grep 'recent enqueue at' ${query_log_file} | grep -v 'recent start at' | wc -l` |
| running_count=`grep 'recent start at' ${query_log_file} | wc -l` |
| processing_count=$((queue_count+running_count)) |
| finish_count=`grep "last finish at" ${query_log_file} | grep -v "recent enqueue at" | grep -v "recent start at" | grep -o 'last finish at [^,]*' | sed 's/\[/,/;s/\]//' | awk -F"," -v date="$earliest_finish_time_ms" 'BEGIN{count=0}{if(length($2)==23 && $2>=date){count++;}}END{print count}'` |
| not_finish_count=$((total_replica_count-finish_count)) |
| |
| if [ ${processing_count} -eq 0 -a ${finish_count} -eq ${total_replica_count} ]; then |
| echo "[${slept}s] $finish_count finished, $not_finish_count not finished ($queue_count in queue, $running_count in running), estimate remaining 0 seconds." |
| echo "All finished, total $total_replica_count replicas." |
| break |
| else |
| left_time="unknown" |
| if [ ${finish_count} -gt 0 ]; then |
| left_time=$((slept * not_finish_count / finish_count)) |
| fi |
| echo "[${slept}s] $finish_count finished, $not_finish_count not finished ($queue_count in queue, $running_count in running), estimate remaining $left_time seconds." |
| sleep 5 |
| slept=$((slept + 5)) |
| fi |
| done |
| echo |
| } |
| |
| # create_checkpoint cluster app_id |
| function create_checkpoint() |
| { |
| cluster=$1 |
| app_id=$2 |
| |
| echo "Start to create checkpoint..." |
| chkpt_log_file="/tmp/$UID.$PID.pegasus.trigger_checkpoint.${app_id}" |
| echo "remote_command -t replica-server replica.trigger-checkpoint ${app_id}" | ./run.sh shell --cluster ${cluster} &>${chkpt_log_file} |
| not_found_count=`grep '^ .*not found' ${chkpt_log_file} | wc -l` |
| triggered_count=`grep '^ .*triggered' ${chkpt_log_file} | wc -l` |
| ignored_count=`grep '^ .*ignored' ${chkpt_log_file} | wc -l` |
| echo "Result: total $partition_count partitions, $triggered_count triggered, $ignored_count ignored, $not_found_count not found." |
| echo |
| } |
| |
| if [ $# -eq 0 ]; then |
| usage |
| exit 0 |
| fi |
| |
| # parse parameters |
| cluster="" |
| app_name="" |
| type="once" |
| trigger_time="" |
| wait_only="false" |
| target_level="-1" |
| bottommost_level_compaction="skip" |
| max_concurrent_running_count="" |
| while [[ $# > 0 ]]; do |
| option_key="$1" |
| case ${option_key} in |
| -c|--cluster) |
| cluster="$2" |
| shift |
| ;; |
| -t|--type) |
| type="$2" |
| shift |
| ;; |
| -g|--trigger_time) |
| trigger_time="$2" |
| shift |
| ;; |
| -a|--app_name) |
| app_name="$2" |
| shift |
| ;; |
| -w|--wait_only) |
| wait_only="true" |
| ;; |
| --target_level) |
| target_level="$2" |
| shift |
| ;; |
| --bottommost_level_compaction) |
| bottommost_level_compaction="$2" |
| shift |
| ;; |
| --max_concurrent_running_count) |
| max_concurrent_running_count="$2" |
| shift |
| ;; |
| -h|--help) |
| usage |
| exit 0 |
| ;; |
| esac |
| shift |
| done |
| |
| # cd to shell dir |
| pwd="$(cd "$(dirname "$0")" && pwd)" |
| shell_dir="$(cd ${pwd}/.. && pwd )" |
| cd ${shell_dir} |
| |
| # check cluster |
| if [ "${cluster}" == "" ]; then |
| echo "ERROR: invalid cluster: ${cluster}" |
| exit 1 |
| fi |
| |
| # check app_name |
| if [ "${app_name}" == "" ]; then |
| echo "ERROR: invalid app_name: ${app_name}" |
| exit 1 |
| fi |
| |
| # check type |
| if [ "${type}" != "periodic" -a "${type}" != "once" ]; then |
| echo "ERROR: invalid type: ${type}" |
| exit 1 |
| fi |
| |
| # check wait_only |
| if [ "${wait_only}" == "true" -a "${type}" != "once" ]; then |
| echo "ERROR: can not specify wait_only when type is ${type}" |
| exit 1 |
| fi |
| |
| # check trigger_time |
| if [ "${type}" == "once" ]; then |
| if [ "${trigger_time}" != "" ]; then |
| echo "ERROR: can not specify trigger_time when type is ${type}" |
| exit 1 |
| fi |
| if [ "${wait_only}" == "true" ]; then |
| trigger_time=`get_env ${cluster} ${app_name} "manual_compact.once.trigger_time"` |
| if [ "${trigger_time}" == "" ]; then |
| echo "No once compact triggered previously, nothing to wait" |
| exit 1 |
| fi |
| else |
| trigger_time=`date +%s` |
| fi |
| else # type == periodic |
| if [ "${trigger_time}" == "" ]; then |
| echo "ERROR: should specify trigger_time when type is ${type}" |
| exit 1 |
| fi |
| fi |
| |
| # check target_level |
| expr ${target_level} + 0 &>/dev/null |
| if [ $? -ne 0 ]; then |
| echo "ERROR: invalid target_level: ${target_level}" |
| exit 1 |
| fi |
| if [ ${target_level} -lt -1 ]; then |
| echo "ERROR: invalid target_level: ${target_level}" |
| exit 1 |
| fi |
| |
| # check bottommost_level_compaction |
| if [ "${bottommost_level_compaction}" != "skip" -a "${bottommost_level_compaction}" != "force" ]; then |
| echo "ERROR: invalid bottommost_level_compaction: ${bottommost_level_compaction}" |
| exit 1 |
| fi |
| |
| # check max_concurrent_running_count |
| if [ "${max_concurrent_running_count}" != "" ]; then |
| expr ${max_concurrent_running_count} + 0 &>/dev/null |
| if [ $? -ne 0 ]; then |
| echo "ERROR: invalid max_concurrent_running_count: ${max_concurrent_running_count}" |
| exit 1 |
| fi |
| if [ ${max_concurrent_running_count} -lt 0 ]; then |
| echo "ERROR: invalid max_concurrent_running_count: ${max_concurrent_running_count}" |
| exit 1 |
| fi |
| fi |
| |
| # record start time |
| all_start_time=`date +%s` |
| echo "UID: $UID" |
| echo "PID: $PID" |
| echo "cluster: $cluster" |
| echo "app_name: $app_name" |
| echo "type: $type" |
| echo "Start time: `date -d @${all_start_time} +"%Y-%m-%d %H:%M:%S"`" |
| echo |
| |
| if [ "${type}" == "periodic" ] || [ "${type}" == "once" -a "${wait_only}" == "false" ]; then |
| # set steady |
| echo "set_meta_level steady" | ./run.sh shell --cluster ${cluster} &>/tmp/$UID.$PID.pegasus.set_meta_level |
| |
| # set manual compact envs |
| if [ "${target_level}" != "" ]; then |
| set_env ${cluster} ${app_name} "manual_compact.${type}.target_level" ${target_level} |
| fi |
| if [ "${bottommost_level_compaction}" != "" ]; then |
| set_env ${cluster} ${app_name} "manual_compact.${type}.bottommost_level_compaction" ${bottommost_level_compaction} |
| fi |
| if [ "${max_concurrent_running_count}" != "" ]; then |
| set_env ${cluster} ${app_name} "manual_compact.max_concurrent_running_count" ${max_concurrent_running_count} |
| fi |
| set_env ${cluster} ${app_name} "manual_compact.${type}.trigger_time" ${trigger_time} |
| echo |
| fi |
| |
| # only `once` manual compact will check progress |
| if [ "${type}" != "once" ]; then |
| rm -f /tmp/$UID.$PID.pegasus.* &>/dev/null |
| exit 0 |
| fi |
| |
| ls_log_file="/tmp/$UID.$PID.pegasus.ls" |
| echo ls | ./run.sh shell --cluster ${cluster} &>${ls_log_file} |
| |
| while read app_line |
| do |
| app_id=`echo ${app_line} | awk '{print $1}'` |
| status=`echo ${app_line} | awk '{print $2}'` |
| app=`echo ${app_line} | awk '{print $3}'` |
| partition_count=`echo ${app_line} | awk '{print $5}'` |
| replica_count=`echo ${app_line} | awk '{print $6}'` |
| |
| if [ "${app_name}" != "$app" ]; then |
| continue |
| fi |
| |
| if [ "$status" != "AVAILABLE" ]; then |
| echo "app ${app_name} is not available now, try to query result later" |
| exit 1 |
| fi |
| |
| wait_manual_compact ${app_id} ${trigger_time} $(($partition_count*$replica_count)) |
| |
| #create_checkpoint ${cluster} ${app_id} |
| done <${ls_log_file} |
| |
| # record finish time |
| all_finish_time=`date +%s` |
| echo "Finish time: `date -d @${all_finish_time} +"%Y-%m-%d %H:%M:%S"`" |
| echo "Manual compact done, elapsed time is $((all_finish_time - all_start_time)) seconds." |
| |
| rm -f /tmp/$UID.$PID.pegasus.* &>/dev/null |