blob: 99c9ddb109f99e01c6f013e35fe0d08b96d112ad [file] [log] [blame]
#!/bin/bash
#--------------------------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#--------------------------------------------------------------------------------
#
# run Apache MRQL in Apache Flink mode
#
#--------------------------------------------------------------------------------
MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
. "$MRQL_HOME/conf/mrql-env.sh"
GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-flink-*.jar`
FULL_JAR="/tmp/${USER}_mrql_flink.jar"
CLASS_DIR="/tmp/${USER}_mrql_classes"
export FLINK_HOME FLINK_JARS FS_DEFAULT_NAME
if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
rm -rf $CLASS_DIR
mkdir -p $CLASS_DIR
pushd $CLASS_DIR > /dev/null
$JAVA_HOME/bin/jar xf $CUP_JAR
$JAVA_HOME/bin/jar xf $JLINE_JAR
$JAVA_HOME/bin/jar xf $GEN_JAR
$JAVA_HOME/bin/jar xf $CORE_JAR
$JAVA_HOME/bin/jar xf $MRQL_JAR
cd ..
$JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
popd > /dev/null
fi
if [ "$1" == "-dist" ]; then
args=" -flink $*"
args=${args// /\!}
if ([ -a /tmp/.yarn-properties ]); then
. /tmp/.yarn-properties
host=`echo $jobManager | cut -d : -f 1`
port=`echo $jobManager | cut -d : -f 2`
fi
if ! type nc > /dev/null; then
jobManager=
elif [ "$host" == "" ] || [ "$port" == "" ] || [ "`nc -z -w5 $host $port; echo $?`" == "1" ]; then
# there is no Flink cluster running (host:port is closed)
jobManager=
fi
export FLINK_MASTER=$jobManager
if [ "$jobManager" == "" ]; then
# there is no Flink cluster running on YARN, so run this as a single job on YARN
yn=$FLINK_SLOTS
quiet="-q"
ARGS=($*)
for (( i = 0; i < $#; i++ )); do
if [ "${ARGS[i]}" = "-nodes" ]; then
yn=$(( (${ARGS[i+1]} + $FLINK_SLOTS -1) / $FLINK_SLOTS ))
fi
if [ "${ARGS[i]}" = "-info" ]; then
quiet=
fi
done
$FLINK_HOME/bin/flink run -m yarn-cluster -yn $yn -ys $FLINK_SLOTS -ytm $FLINK_TASK_MANAGER_MEMORY $quiet -c org.apache.mrql.Main $FULL_JAR args $args
else
# a long-running Flink cluster has already been started on YARN
# (using $FLINK_HOME/bin/yarn-session.sh on a separate window)
$FLINK_HOME/bin/flink run -c org.apache.mrql.Main $FULL_JAR args $args
fi
else
$JAVA_HOME/bin/java -classpath $FULL_JAR:$FLINK_JARS org.apache.mrql.Main -flink $*
fi