| #!/usr/bin/env bash |
| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| # Start/stop a Flink TaskManager. |
| USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" |
| |
| STARTSTOP=$1 |
| |
| if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then |
| echo $USAGE |
| exit 1 |
| fi |
| |
| bin=`dirname "$0"` |
| bin=`cd "$bin"; pwd` |
| |
| . "$bin"/config.sh |
| |
| TYPE=taskmanager |
| |
| if [[ "${FLINK_MODE}" == "new" ]]; then |
| TYPE=taskexecutor |
| fi |
| |
| if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then |
| |
| # if memory allocation mode is lazy and no other JVM options are set, |
| # set the 'Concurrent Mark Sweep GC' |
| if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then |
| export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" |
| fi |
| |
| if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then |
| echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." |
| exit 1 |
| fi |
| |
| if [ "${FLINK_TM_HEAP}" -gt "0" ]; then |
| |
| TASK_MANAGER_RESOURCE=$(CalculateTaskManagerResource) |
| |
| TM_HEAP_SIZE=$(echo ${TASK_MANAGER_RESOURCE} | sed 's/.*TotalHeapMemory:\([0-9]*\).*/\1/g') |
| TM_YOUNG_HEAP_SIZE=$(echo ${TASK_MANAGER_RESOURCE} | sed 's/.*YoungHeapMemory:\([0-9]*\).*/\1/g') |
| TM_MAX_OFFHEAP_SIZE=$(echo ${TASK_MANAGER_RESOURCE} | sed 's/.*TotalDirectMemory:\([0-9]*\).*/\1/g') |
| |
| export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -Xmn${TM_YOUNG_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}M" |
| |
| fi |
| |
| # Add TaskManager-specific JVM options |
| export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" |
| |
| # Startup parameters |
| args=("--configDir" "${FLINK_CONF_DIR}") |
| fi |
| |
| if [[ $STARTSTOP == "start-foreground" ]]; then |
| exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}" |
| else |
| if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then |
| # Start a single TaskManager |
| "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" |
| else |
| # Example output from `numactl --show` on an AWS c4.8xlarge: |
| # policy: default |
| # preferred node: current |
| # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
| # cpubind: 0 1 |
| # nodebind: 0 1 |
| # membind: 0 1 |
| read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") |
| for NODE_ID in "${NODE_LIST[@]:1}"; do |
| # Start a TaskManager for each NUMA node |
| numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" |
| done |
| fi |
| fi |