blob: 92dc091e8f3d2148a67ef897c939886c9aa812c6 [file] [log] [blame]
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
bin="$(dirname "${BASH_SOURCE-$0}")"
bin="$(cd "${bin}">/dev/null; pwd)"
function usage() {
echo "usage) $0 -p <port> -r <intp_port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}
function downloadInterpreterLibraries() {
mkdir -p ${LOCAL_INTERPRETER_REPO}
IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
ZEPPELIN_DOWNLOADER="org.apache.zeppelin.interpreter.remote.RemoteInterpreterDownloader"
INTERPRETER_DOWNLOAD_COMMAND+=("${ZEPPELIN_RUNNER}" "${JAVA_INTP_OPTS_ARRAY[@]}" "-cp" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "${ZEPPELIN_DOWNLOADER}" "${CALLBACK_HOST}" "${PORT}" "${INTERPRETER_SETTING_NAME}" "${LOCAL_INTERPRETER_REPO}")
echo "Interpreter download command: ${INTERPRETER_DOWNLOAD_COMMAND[@]}"
eval "${INTERPRETER_DOWNLOAD_COMMAND[@]}"
}
# pre-requisites for checking that we're running in container
if [ -f /proc/self/cgroup ] && [ -n "$(command -v getent)" ]; then
# checks if we're running in container...
if awk -F: '/cpu/ && $3 ~ /^\/$/{ c=1 } END { exit c }' /proc/self/cgroup; then
# Check whether there is a passwd entry for the container UID
myuid="$(id -u)"
mygid="$(id -g)"
# turn off -e for getent because it will return error code in anonymous uid case
set +e
uidentry="$(getent passwd "$myuid")"
set -e
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
if [ -w /etc/passwd ] ; then
echo "zeppelin:x:$myuid:$mygid:anonymous uid:$ZEPPELIN_HOME:/bin/false" >> /etc/passwd
else
echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID"
fi
fi
fi
fi
while getopts "hc:p:r:i:d:l:v:u:g:" o; do
case ${o} in
h)
usage
exit 0
;;
d)
INTERPRETER_DIR=${OPTARG}
;;
c)
CALLBACK_HOST=${OPTARG} # This will be used callback host
;;
p)
PORT=${OPTARG} # This will be used for callback port
;;
r)
INTP_PORT=${OPTARG} # This will be used for interpreter process port
;;
i)
INTP_GROUP_ID=${OPTARG} # This will be used for interpreter group id
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
;;
v)
. "${bin}/common.sh"
getZeppelinVersion
;;
u)
ZEPPELIN_IMPERSONATE_USER="${OPTARG}"
;;
g)
INTERPRETER_SETTING_NAME=${OPTARG}
;;
esac
done
if [ -z "${PORT}" ] || [ -z "${INTERPRETER_DIR}" ]; then
usage
exit 1
fi
. "${bin}/common.sh"
check_java_version
ZEPPELIN_INTERPRETER_API_JAR=$(find "${ZEPPELIN_HOME}/interpreter" -name 'zeppelin-interpreter-shaded-*.jar')
ZEPPELIN_INTP_CLASSPATH+=":${CLASSPATH}:${ZEPPELIN_INTERPRETER_API_JAR}"
# construct classpath
if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
fi
# add test classes for unittest
if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
fi
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-shaded/target"
HOSTNAME=$(hostname)
ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer
INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
if [[ "${INTERPRETER_ID}" != "flink" ]]; then
# don't add interpreter jar for flink, FlinkInterpreterLauncher will choose the right interpreter jar based
# on scala version of current FLINK_HOME.
addJarInDirForIntp "${INTERPRETER_DIR}"
fi
ZEPPELIN_PID="${ZEPPELIN_PID_DIR}/zeppelin-interpreter-${INTP_GROUP_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}-${PORT}.pid"
if [[ "${ZEPPELIN_INTERPRETER_LAUNCHER}" == "yarn" ]]; then
# {LOG_DIRS} is env name in yarn container which point to the log dirs of container
# split the log dirs to array and use the first one
IFS=','
read -ra LOG_DIRS_ARRAY <<< "${LOG_DIRS}"
ZEPPELIN_LOG_DIR=${LOG_DIRS_ARRAY[0]}
fi
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_GROUP_ID}-"
if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then
if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then
ZEPPELIN_IMPERSONATE_RUN_CMD=("ssh" "${ZEPPELIN_IMPERSONATE_USER}@localhost")
fi
else
ZEPPELIN_IMPERSONATE_RUN_CMD=$(eval "echo ${ZEPPELIN_IMPERSONATE_CMD} ")
fi
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
fi
ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
mkdir -p "${ZEPPELIN_LOG_DIR}"
fi
# set spark related env variables
if [[ "${INTERPRETER_ID}" == "spark" ]]; then
# run kinit
if [[ -n "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" ]] && [[ -n "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}" ]]; then
kinit -kt "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}"
fi
if [[ -n "${SPARK_HOME}" ]]; then
export SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit"
SPARK_APP_JAR="$(ls "${ZEPPELIN_HOME}"/interpreter/spark/spark-interpreter*.jar)"
# This will evantually passes SPARK_APP_JAR to classpath of SparkIMain
ZEPPELIN_INTP_CLASSPATH+=":${SPARK_APP_JAR}"
py4j=("${SPARK_HOME}"/python/lib/py4j-*-src.zip)
# pick the first match py4j zip - there should only be one
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="${py4j[0]}:$PYTHONPATH"
else
echo "No SPARK_HOME is specified"
exit -1
fi
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
elif [[ -d "/etc/hadoop/conf" ]]; then
export HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
fi
elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
if [[ -n "${HBASE_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}"
elif [[ -n "${HBASE_HOME}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HBASE_HOME}/conf"
else
echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded"
fi
elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
addEachJarInDirRecursiveForIntp "${FLINK_HOME}/lib"
FLINK_PYTHON_JAR=$(find "${FLINK_HOME}/opt" -name 'flink-python_*.jar')
ZEPPELIN_INTP_CLASSPATH+=":${FLINK_PYTHON_JAR}:${FLINK_APP_JAR}"
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
# Don't use `hadoop classpath` if flink-hadoop-shaded in in lib folder
flink_hadoop_shaded_jar=$(find "${FLINK_HOME}/lib" -name 'flink-shaded-hadoop-*.jar')
if [[ -n "$flink_hadoop_shaded_jar" ]]; then
echo ""
else
if [[ ! ( -x "$(command -v hadoop)" ) && ( "${ZEPPELIN_INTERPRETER_LAUNCHER}" != "yarn" ) ]]; then
echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified and no flink-shaded-hadoop jar '
exit 1
fi
ZEPPELIN_INTP_CLASSPATH+=":$(hadoop classpath)"
fi
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
elif [[ -d "/etc/hadoop/conf" ]]; then
export HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
fi
fi
downloadInterpreterLibraries
addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then
if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then
suid="$(id -u "${ZEPPELIN_IMPERSONATE_USER}")"
if [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_IMPERSONATE_RUN_CMD[@]}")
if [[ -f "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh" ]]; then
INTERPRETER_RUN_COMMAND+=("source" "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh;")
fi
fi
fi
fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
IFS=' ' read -r -a SPARK_SUBMIT_OPTIONS_ARRAY <<< "${SPARK_SUBMIT_OPTIONS}"
IFS='|' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
if [[ "${ZEPPELIN_SPARK_YARN_CLUSTER}" == "true" ]]; then
INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-java-options" "${SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF} ${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${SPARK_DRIVER_EXTRAJAVAOPTIONS_CONF} ${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
fi
elif [[ -n "${ZEPPELIN_FLINK_APPLICATION_MODE}" ]]; then
IFS='|' read -r -a ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY <<< "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF}"
INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "${ZEPPELIN_FLINK_APPLICATION_MODE}" "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}"
INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_RUNNER}" "${JAVA_INTP_OPTS_ARRAY[@]}" "${ZEPPELIN_INTP_MEM_ARRAY[@]}" "-cp" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "${ZEPPELIN_SERVER}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
fi
# Don't remove this echo, it is for diagnose, this line of output will be redirected to java log4j output.
# Output that starts with `[INFO]` will be redirected to log4j INFO output. Other outputs from interpreter.sh
# will be redirected to log4j DEBUG output.
echo "[INFO] Interpreter launch command: ${INTERPRETER_RUN_COMMAND[@]}"
exec "${INTERPRETER_RUN_COMMAND[@]}"