blob: 5e22eaecd2bb83d6d125a53b1c82ba5d81a13def [file] [log] [blame]
#!/bin/bash
###
# #%L
# SAMOA
# %%
# Copyright (C) 2014 - 2015 Apache Software Foundation
# %%
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #L%
###
# ===================================================================================================== #
# Scalable Advanced Massive Online Analysis a.k.a SAMOA #
# ===================================================================================================== #
# ----------------------------------------------------------------------------------------------------- #
# SAMOA is a framework for mining big data streams. It contains machine learning #
# algorithms that run on top of distributed stream processing engines. #
# #
# Usage: #
# ./samoa <platform> <jar> <task & options> #
# Example: #
# ./samoa storm /path/to/SAMOA-Storm-0.0.1.jar "ClusteringTask -s (RandomRBFGeneratorEvents -K 5 -a 2)" #
# #
# ----------------------------------------------------------------------------------------------------- #
usage() {
echo "Usage: $(basename $0) <platform> <jar> <task & options>" && exit 1
}
[ "$#" -ge 3 ] || usage
echo $0
BASE_DIR=$(dirname $0)
PLATFORM=${1:-NONE}
PLATFORM=$(echo $PLATFORM | tr '[:lower:]' '[:upper:]')
JAR_PATH=$2
JAR_FILE=$(basename $JAR_PATH)
JAR_DIR=$(dirname $JAR_PATH)
OPTIONS=$3
#JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
if [ $PLATFORM = 'S4' ]; then
echo "Deploying to $PLATFORM"
if [ -z $S4_HOME ];then
echo "S4_HOME is not set!"
echo "Please set S4_HOME to point to your S4 installation"
exit -1
fi
# Local functions
deployLocal()
{
# Make sure to kill S4 processes before re-deploying
echo "Starting Zookeeper..."
(cd $S4_HOME ; . s4 zkServer -clean -port=${ZK_SERVER_PORT} &)
sleep 10
echo "Defining cluster..."
(cd $S4_HOME ; . s4 newCluster -c=${CLUSTER_NAME} -nbTasks=1 -flp=${CLUSTER_PORT} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & )
sleep 5
echo "Starting node on cluster $CLUSTER_NAME..."
(cd $S4_HOME ; . s4 node -c=${CLUSTER_NAME} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & )
sleep 5
}
SAMOA_S4_PROPERTIES="samoa-s4.properties"
PROPERTY_VALUE=""
readProperty() {
property=$1
PROPERTY_VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_S4_PROPERTIES | grep $property | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
}
# END Local functions
curr_dir=`pwd`
if [ -f /tmp/httpPID ]; then
pid=`cat /tmp/httpPID`
kill -9 $pid
rm /tmp/httpPID
fi
cd $JAR_DIR
# Starting HTTP Server
python -m SimpleHTTPServer 8000 &
echo $! > /tmp/httpPID
sleep 2
cd $curr_dir
# Reading properties file
readProperty 'http.server.ip'
APP_SERVER_IP=$PROPERTY_VALUE
if [ -z $APP_SERVER_IP ]; then
APP_SERVER_IP=$(hostname)
fi
readProperty 'http.server.port'
APP_SERVER_PORT=$PROPERTY_VALUE
if [ -z $APP_SERVER_PORT ]; then
APP_SERVER_PORT=8000
fi
readProperty 'zookeeper.server'
ZK_SERVER=$PROPERTY_VALUE
if [ -z $ZK_SERVER ]; then
ZK_SERVER=$(hostname)
fi
readProperty 'zookeeper.port'
ZK_SERVER_PORT=$PROPERTY_VALUE
if [ -z $ZK_SERVER_PORT ]; then
ZK_SERVER_PORT=2181
fi
readProperty 'cluster.name'
CLUSTER_NAME=$PROPERTY_VALUE
if [ -z $CLUSTER_NAME ]; then
CLUSTER_NAME="cluster"
fi
readProperty 'cluster.port'
CLUSTER_PORT=$PROPERTY_VALUE
if [ -z $CLUSTER_NAME ]; then
CLUSTER_PORT=12000
fi
readProperty 'samoa.deploy.mode'
DEPLOY_MODE=$PROPERTY_VALUE
if [ -z $DEPLOY_MODE ]; then
DEPLOY_MODE=local
elif [ $DEPLOY_MODE = 'local' ]; then
deployLocal
fi
# END Reading properties file
APP_NAME="SamoaS4App"
APP_CLASS="org.apache.samoa.topology.impl.S4DoTask"
NAMED_PARAMETERS=$(perl -MURI::Escape -e 'print uri_escape($ARGV[0]);' "$OPTIONS")
echo "Deploying SAMOA..."
(cd $S4_HOME ; . s4 deploy -s4r="http://${APP_SERVER_IP}:${APP_SERVER_PORT}/${JAR_FILE}" -c="${CLUSTER_NAME}" -p=evalTask="${NAMED_PARAMETERS}" -appClass="${APP_CLASS}" -appName="${APP_NAME}" -zk="${ZK_SERVER}:${ZK_SERVER_PORT}")
# wait for app to be deployed
sleep 5
# END Apache S4 deployment
elif [ $PLATFORM = 'STORM' ]; then
echo "Deploying to $PLATFORM"
if [ -z $STORM_HOME ];then
echo "STORM_HOME is not set!"
echo "Please set STORM_HOME to point to your Storm installation"
exit -1
fi
if [ ! -f $2 ];then
echo "$2 does not exist!"
echo "Please use a valid jar file for Storm execution"
exit -1
fi
STORM_EXEC="$STORM_HOME/bin/storm"
SAMOA_STORM_PROPERTIES="samoa-storm.properties"
MODE_OPTION="samoa.storm.mode"
NUM_WORKER_OPTION="samoa.storm.numworker"
VALUE=""
getvalue()
{
VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_STORM_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
}
getvalue "$NUM_WORKER_OPTION"
NUM_WORKER="$VALUE"
getvalue "$MODE_OPTION"
MODE_ARG="$VALUE"
COMPLETE_ARG=""
COUNTER=0
for var in "$@"
do
COUNTER=`expr $COUNTER + 1`
if [ $COUNTER -gt 2 ];then
COMPLETE_ARG="$COMPLETE_ARG $var"
fi
done
DEPLOYABLE=$JAR_PATH
if [ "$MODE_ARG" = "cluster" ]; then
$STORM_EXEC jar $DEPLOYABLE org.apache.samoa.topology.impl.StormDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG
elif [ "$MODE_ARG" = "local" ]; then
CLASSPATH="$CLASSPATH:$STORM_HOME/lib/*:$BASE_DIR/:$DEPLOYABLE"
java -cp $CLASSPATH org.apache.samoa.LocalStormDoTask $COMPLETE_ARG $NUM_WORKER
fi
elif [ $PLATFORM = 'SAMZA' ]; then
echo "Deploying to SAMZA"
SAMOA_SAMZA_PROPERTIES=samoa-samza.properties
MODE_OPTION='samoa.samza.mode'
ZOOKEEPER_HOST_OPTION='zookeeper.host'
ZOOKEEPER_PORT_OPTION='zookeeper.port'
KAFKA_BROKER_LIST_OPTION='kafka.broker.list'
KAFKA_REPLICATION_OPTION='kafka.replication.factor'
CHECKPOINT_FREQ_OPTION='checkpoint.commit.ms'
YARN_AM_MEMORY_OPTION='yarn.am.memory'
YARN_CONTAINER_MEMORY_OPTION='yarn.container.memory'
PI_PER_CONTAINER_OPTION='max.pi.per.container'
KRYO_REGISTER_OPTION='kryo.register.file'
HDFS_SAMOA_HOME_OPTION='hdfs.samoa.dir'
VALUE=""
getvalue()
{
VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_SAMZA_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
}
getvalue "$ZOOKEEPER_HOST_OPTION"
ZOOKEEPER_HOST="$VALUE"
getvalue "$ZOOKEEPER_PORT_OPTION"
ZOOKEEPER_PORT="$VALUE"
getvalue "$KAFKA_BROKER_LIST_OPTION"
KAFKA_BROKER_LIST="$VALUE"
getvalue "$MODE_OPTION"
MODE_ARG="$VALUE"
getvalue "$KAFKA_REPLICATION_OPTION"
KAFKA_REPLICATION_FACTOR="$VALUE"
getvalue "$CHECKPOINT_FREQ_OPTION"
CHECKPOINT_FREQUENCY="$VALUE"
getvalue "$YARN_AM_MEMORY_OPTION"
YARN_AM_MEMORY="$VALUE"
getvalue "$YARN_CONTAINER_MEMORY_OPTION"
YARN_CONTAINER_MEMORY="$VALUE"
getvalue "$KRYO_REGISTER_OPTION"
KRYO_REGISTER_FILE="$VALUE"
getvalue "$PI_PER_CONTAINER_OPTION"
PI_PER_CONTAINER="$VALUE"
getvalue "$HDFS_SAMOA_HOME_OPTION"
HDFS_SAMOA_HOME="$VALUE"
COMPLETE_ARG=""
COUNTER=0
for var in "$@"
do
COUNTER=`expr $COUNTER + 1`
if [ $COUNTER -gt 2 ];then
COMPLETE_ARG="$COMPLETE_ARG $var"
fi
done
DEPLOYABLE=$JAR_PATH
#HADOOP_CLASSPATH=`hadoop classpath`
YARN_CLASSPATH=`yarn classpath`
java -Dsamza.log.dir=$BASE_DIR/logs -Dsamza.container.name=client \
-cp $YARN_CLASSPATH:$DEPLOYABLE org.apache.samoa.SamzaDoTask $COMPLETE_ARG \
--mode=$MODE_ARG --yarn_home=$YARN_HOME/conf --zookeeper=$ZOOKEEPER_HOST:$ZOOKEEPER_PORT --kafka=$KAFKA_BROKER_LIST \
--jar_package=$JAR_PATH --yarn_am_mem=$YARN_AM_MEMORY --yarn_container_mem=$YARN_CONTAINER_MEMORY \
--kafka_replication_factor=$KAFKA_REPLICATION_FACTOR --checkpoint_frequency=$CHECKPOINT_FREQUENCY \
--kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \
--samoa_hdfs_dir=$HDFS_SAMOA_HOME
elif [ $PLATFORM = 'FLINK' ]; then
echo "Deploying to $PLATFORM"
if [ -z $FLINK_HOME ];then
echo "FLINK_HOME is not set!"
echo "Please set FLINK_HOME to point to your Flink installation"
exit -1
fi
if [ ! -f $2 ];then
echo "$2 does not exist!"
echo "Please use a valid jar file for Flink execution"
exit -1
fi
FLINK_EXEC="$FLINK_HOME/bin/flink"
SAMOA_FLINK_PROPERTIES="samoa-flink.properties"
MODE_OPTION="samoa.flink.mode"
# NUM_WORKER_OPTION="samoa.flink.numWorker"
VALUE=""
getvalue()
{
VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
}
# getvalue "$NUM_WORKER_OPTION"
# NUM_WORKER="$VALUE"
getvalue "$MODE_OPTION"
MODE_ARG="$VALUE"
COMPLETE_ARG=""
COUNTER=0
for var in "$@"
do
COUNTER=`expr $COUNTER + 1`
if [ $COUNTER -gt 2 ];then
COMPLETE_ARG="$COMPLETE_ARG $var"
fi
done
DEPLOYABLE=$JAR_PATH
echo "$DEPLOYABLE"
if [ "$MODE_ARG" = "cluster" ]; then
FLINK_MASTER_OPTION="samoa.flink.flinkMaster"
PORT_OPTION="samoa.flink.port"
getvalue "$FLINK_MASTER_OPTION"
FLINK_MASTER_OPTION="$VALUE"
getvalue "$PORT_OPTION"
PORT_OPTION="$VALUE"
$FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE $COMPLETE_ARG
elif [ "$MODE_ARG" = "local" ]; then
$FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
fi
elif [ $PLATFORM = 'APEX' ]; then
if [ ! -f $2 ];then
echo "$2 does not exist!"
echo "Please use a valid jar file for Apex execution"
exit -1
fi
VALUE=""
getvalue()
{
VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_APEX_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
}
SAMOA_APEX_PROPERTIES="samoa-apex.properties"
MODE_OPTION="samoa.apex.mode"
DT_DFS_DIR_OPTION="dt.dfsRootDirectory"
DT_SITE_OPTION="dt.site.path"
DEFAULT_FS_OPTION="fs.defaultFS"
RM_OPTION="yarn.resourcemanager.address"
HADOOP_HDFS_HOME_OPTION="hadoop.hdfs.home"
HADOOP_YARN_HOME_OPTION="hadoop.yarn.home"
HADOOP_COMMON_HOME_OPTION="hadoop.common.home"
HADOOP_CONF_DIR_OPTION="hadoop.conf.dir"
getvalue "$MODE_OPTION"
MODE_ARG="$VALUE"
if [[ "$MODE_ARG" = "cluster" && -z $HADOOP_HOME ]];then
echo "HADOOP_HOME is not set!"
echo "Please set HADOOP_HOME to point to your Hadoop installation"
exit -1
fi
getvalue "$DT_DFS_DIR_OPTION"
DT_DFS_DIR="$VALUE"
getvalue "$DEFAULT_FS_OPTION"
DEFAULT_FS="$VALUE"
getvalue "$DT_SITE_OPTION"
DT_SITE=`echo $(eval echo "$VALUE")`
getvalue "$RM_OPTION"
RM="$VALUE"
getvalue "$HADOOP_HDFS_HOME_OPTION"
HADOOP_HDFS_HOME=`echo $(eval echo "$VALUE")`
getvalue "$HADOOP_YARN_HOME_OPTION"
HADOOP_YARN_HOME=`echo $(eval echo "$VALUE")`
getvalue "$HADOOP_COMMON_HOME_OPTION"
HADOOP_COMMON_HOME=`echo $(eval echo "$VALUE")`
getvalue "$HADOOP_CONF_DIR_OPTION"
HADOOP_CONF_DIR=`echo $(eval echo "$VALUE")`
DEPLOYABLE=$JAR_PATH
echo "Deployable: " $DEPLOYABLE
#LIB_PREFIX=/opt/cloudera/parcels/CDH/lib/
YARN_CLASSPATH=$HADOOP_HDFS_HOME/*:$HADOOP_YARN_HOME/*:$HADOOP_COMMON_HOME/*:$HADOOP_CONF_DIR
echo "YARN CLASSPATH: " $YARN_CLASSPATH
if [ "$MODE_ARG" = "cluster" ]; then
CMD="java -cp .:$YARN_CLASSPATH:$DEPLOYABLE -D$DT_DFS_DIR_OPTION=$DT_DFS_DIR -D$DEFAULT_FS_OPTION=$DEFAULT_FS -D$DT_SITE_OPTION=$DT_SITE -D$RM_OPTION=$RM org.apache.samoa.apex.ApexDoTask ${OPTIONS}"
echo Command: $CMD
${CMD}
elif [ "$MODE_ARG" = "local" ]
then
CMD="java -cp .:$DEPLOYABLE -D$DT_SITE_OPTION=$DT_SITE org.apache.samoa.apex.LocalApexDoTask ${OPTIONS}"
echo Command: $CMD
${CMD}
else
echo "Wrong mode argument. Check property samoa.apex.mode in samoa-apex.properties file"
fi
elif [ $PLATFORM = 'THREADS' ]; then
echo "Deploying to LOCAL with MULTITHREADING."
COMPLETE_ARG=""
COUNTER=0
for var in "$@"
do
COUNTER=`expr $COUNTER + 1`
if [ $COUNTER -gt 2 ];then
COMPLETE_ARG="$COMPLETE_ARG $var"
fi
done
java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalThreadsDoTask $COMPLETE_ARG
elif [ $PLATFORM = 'LOCAL' ]; then
echo "Deploying to $PLATFORM"
COMPLETE_ARG=""
COUNTER=0
for var in "$@"
do
COUNTER=`expr $COUNTER + 1`
if [ $COUNTER -gt 2 ];then
COMPLETE_ARG="$COMPLETE_ARG $var"
fi
done
java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG
else
echo "Specify a valid platform."
echo "Usage: samoa <platform> <jar> <task & options>"
fi