| #!/bin/bash |
| |
| ### |
| # #%L |
| # SAMOA |
| # %% |
| # Copyright (C) 2014 - 2015 Apache Software Foundation |
| # %% |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # #L% |
| ### |
| |
| # ===================================================================================================== # |
| # Scalable Advanced Massive Online Analysis a.k.a SAMOA # |
| # ===================================================================================================== # |
| # ----------------------------------------------------------------------------------------------------- # |
| # SAMOA is a framework for mining big data streams. It contains machine learning # |
| # algorithms that run on top of distributed stream processing engines. # |
| # # |
| # Usage: # |
| # ./samoa <platform> <jar> <task & options> # |
| # Example: # |
| # ./samoa storm /path/to/SAMOA-Storm-0.0.1.jar "ClusteringTask -s (RandomRBFGeneratorEvents -K 5 -a 2)" # |
| # # |
| # ----------------------------------------------------------------------------------------------------- # |
| |
| usage() { |
| echo "Usage: $(basename $0) <platform> <jar> <task & options>" && exit 1 |
| } |
| [ "$#" -ge 3 ] || usage |
| |
| echo $0 |
| BASE_DIR=$(dirname $0) |
| PLATFORM=${1:-NONE} |
| PLATFORM=$(echo $PLATFORM | tr '[:lower:]' '[:upper:]') |
| JAR_PATH=$2 |
| JAR_FILE=$(basename $JAR_PATH) |
| JAR_DIR=$(dirname $JAR_PATH) |
| OPTIONS=$3 |
| |
| |
| if [ $PLATFORM = 'S4' ]; then |
| |
| echo "Deploying to $PLATFORM" |
| if [ -z $S4_HOME ];then |
| echo "S4_HOME is not set!" |
| echo "Please set S4_HOME to point to your S4 installation" |
| exit -1 |
| fi |
| |
| # Local functions |
| deployLocal() |
| { |
| # Make sure to kill S4 processes before re-deploying |
| echo "Starting Zookeeper..." |
| (cd $S4_HOME ; . s4 zkServer -clean -port=${ZK_SERVER_PORT} &) |
| sleep 10 |
| |
| echo "Defining cluster..." |
| (cd $S4_HOME ; . s4 newCluster -c=${CLUSTER_NAME} -nbTasks=1 -flp=${CLUSTER_PORT} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & ) |
| sleep 5 |
| |
| echo "Starting node on cluster $CLUSTER_NAME..." |
| (cd $S4_HOME ; . s4 node -c=${CLUSTER_NAME} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & ) |
| sleep 5 |
| } |
| |
| SAMOA_S4_PROPERTIES="samoa-s4.properties" |
| PROPERTY_VALUE="" |
| readProperty() { |
| property=$1 |
| PROPERTY_VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_S4_PROPERTIES | grep $property | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'` |
| } |
| # END Local functions |
| |
| curr_dir=`pwd` |
| if [ -f /tmp/httpPID ]; then |
| pid=`cat /tmp/httpPID` |
| kill -9 $pid |
| rm /tmp/httpPID |
| fi |
| |
| cd $JAR_DIR |
| # Starting HTTP Server |
| python -m SimpleHTTPServer 8000 & |
| echo $! > /tmp/httpPID |
| sleep 2 |
| cd $curr_dir |
| |
| # Reading properties file |
| readProperty 'http.server.ip' |
| APP_SERVER_IP=$PROPERTY_VALUE |
| if [ -z $APP_SERVER_IP ]; then |
| APP_SERVER_IP=$(hostname) |
| fi |
| |
| readProperty 'http.server.port' |
| APP_SERVER_PORT=$PROPERTY_VALUE |
| if [ -z $APP_SERVER_PORT ]; then |
| APP_SERVER_PORT=8000 |
| fi |
| |
| readProperty 'zookeeper.server' |
| ZK_SERVER=$PROPERTY_VALUE |
| if [ -z $ZK_SERVER ]; then |
| ZK_SERVER=$(hostname) |
| fi |
| |
| readProperty 'zookeeper.port' |
| ZK_SERVER_PORT=$PROPERTY_VALUE |
| if [ -z $ZK_SERVER_PORT ]; then |
| ZK_SERVER_PORT=2181 |
| fi |
| |
| readProperty 'cluster.name' |
| CLUSTER_NAME=$PROPERTY_VALUE |
| if [ -z $CLUSTER_NAME ]; then |
| CLUSTER_NAME="cluster" |
| fi |
| |
| readProperty 'cluster.port' |
| CLUSTER_PORT=$PROPERTY_VALUE |
| if [ -z $CLUSTER_NAME ]; then |
| CLUSTER_PORT=12000 |
| fi |
| |
| readProperty 'samoa.deploy.mode' |
| DEPLOY_MODE=$PROPERTY_VALUE |
| if [ -z $DEPLOY_MODE ]; then |
| DEPLOY_MODE=local |
| elif [ $DEPLOY_MODE = 'local' ]; then |
| deployLocal |
| fi |
| # END Reading properties file |
| |
| APP_NAME="SamoaS4App" |
| APP_CLASS="org.apache.samoa.topology.impl.S4DoTask" |
| NAMED_PARAMETERS=$(perl -MURI::Escape -e 'print uri_escape($ARGV[0]);' "$OPTIONS") |
| |
| echo "Deploying SAMOA..." |
| (cd $S4_HOME ; . s4 deploy -s4r="http://${APP_SERVER_IP}:${APP_SERVER_PORT}/${JAR_FILE}" -c="${CLUSTER_NAME}" -p=evalTask="${NAMED_PARAMETERS}" -appClass="${APP_CLASS}" -appName="${APP_NAME}" -zk="${ZK_SERVER}:${ZK_SERVER_PORT}") |
| |
| # wait for app to be deployed |
| sleep 5 |
| # END Apache S4 deployment |
| |
| elif [ $PLATFORM = 'STORM' ]; then |
| |
| echo "Deploying to $PLATFORM" |
| if [ -z $STORM_HOME ];then |
| echo "STORM_HOME is not set!" |
| echo "Please set STORM_HOME to point to your Storm installation" |
| exit -1 |
| fi |
| |
| if [ ! -f $2 ];then |
| echo "$2 does not exist!" |
| echo "Please use a valid jar file for Storm execution" |
| exit -1 |
| fi |
| |
| STORM_EXEC="$STORM_HOME/bin/storm" |
| |
| SAMOA_STORM_PROPERTIES="samoa-storm.properties" |
| MODE_OPTION="samoa.storm.mode" |
| NUM_WORKER_OPTION="samoa.storm.numworker" |
| |
| VALUE="" |
| getvalue() |
| { |
| VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_STORM_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'` |
| } |
| |
| getvalue "$NUM_WORKER_OPTION" |
| NUM_WORKER="$VALUE" |
| |
| getvalue "$MODE_OPTION" |
| MODE_ARG="$VALUE" |
| |
| COMPLETE_ARG="" |
| COUNTER=0 |
| for var in "$@" |
| do |
| COUNTER=`expr $COUNTER + 1` |
| if [ $COUNTER -gt 2 ];then |
| COMPLETE_ARG="$COMPLETE_ARG $var" |
| fi |
| done |
| |
| DEPLOYABLE=$JAR_PATH |
| if [ "$MODE_ARG" = "cluster" ]; then |
| $STORM_EXEC jar $DEPLOYABLE org.apache.samoa.topology.impl.StormDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG |
| elif [ "$MODE_ARG" = "local" ]; then |
| CLASSPATH="$CLASSPATH:$STORM_HOME/lib/*:$DEPLOYABLE" |
| java -cp $CLASSPATH org.apache.samoa.LocalStormDoTask $COMPLETE_ARG $NUM_WORKER |
| fi |
| |
| elif [ $PLATFORM = 'SAMZA' ]; then |
| echo "Deploying to SAMZA" |
| |
| SAMOA_SAMZA_PROPERTIES=samoa-samza.properties |
| MODE_OPTION='samoa.samza.mode' |
| ZOOKEEPER_HOST_OPTION='zookeeper.host' |
| ZOOKEEPER_PORT_OPTION='zookeeper.port' |
| KAFKA_BROKER_LIST_OPTION='kafka.broker.list' |
| KAFKA_REPLICATION_OPTION='kafka.replication.factor' |
| CHECKPOINT_FREQ_OPTION='checkpoint.commit.ms' |
| |
| YARN_AM_MEMORY_OPTION='yarn.am.memory' |
| YARN_CONTAINER_MEMORY_OPTION='yarn.container.memory' |
| PI_PER_CONTAINER_OPTION='max.pi.per.container' |
| |
| KRYO_REGISTER_OPTION='kryo.register.file' |
| |
| HDFS_SAMOA_HOME_OPTION='hdfs.samoa.dir' |
| |
| VALUE="" |
| getvalue() |
| { |
| VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_SAMZA_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'` |
| } |
| |
| getvalue "$ZOOKEEPER_HOST_OPTION" |
| ZOOKEEPER_HOST="$VALUE" |
| |
| getvalue "$ZOOKEEPER_PORT_OPTION" |
| ZOOKEEPER_PORT="$VALUE" |
| |
| getvalue "$KAFKA_BROKER_LIST_OPTION" |
| KAFKA_BROKER_LIST="$VALUE" |
| |
| getvalue "$MODE_OPTION" |
| MODE_ARG="$VALUE" |
| |
| getvalue "$KAFKA_REPLICATION_OPTION" |
| KAFKA_REPLICATION_FACTOR="$VALUE" |
| |
| getvalue "$CHECKPOINT_FREQ_OPTION" |
| CHECKPOINT_FREQUENCY="$VALUE" |
| |
| getvalue "$YARN_AM_MEMORY_OPTION" |
| YARN_AM_MEMORY="$VALUE" |
| |
| getvalue "$YARN_CONTAINER_MEMORY_OPTION" |
| YARN_CONTAINER_MEMORY="$VALUE" |
| |
| getvalue "$KRYO_REGISTER_OPTION" |
| KRYO_REGISTER_FILE="$VALUE" |
| |
| getvalue "$PI_PER_CONTAINER_OPTION" |
| PI_PER_CONTAINER="$VALUE" |
| |
| getvalue "$HDFS_SAMOA_HOME_OPTION" |
| HDFS_SAMOA_HOME="$VALUE" |
| |
| COMPLETE_ARG="" |
| COUNTER=0 |
| for var in "$@" |
| do |
| COUNTER=`expr $COUNTER + 1` |
| if [ $COUNTER -gt 2 ];then |
| COMPLETE_ARG="$COMPLETE_ARG $var" |
| fi |
| done |
| |
| DEPLOYABLE=$JAR_PATH |
| java -Dsamza.log.dir=$BASE_DIR/logs -Dsamza.container.name=client -cp $DEPLOYABLE org.apache.samoa.SamzaDoTask $COMPLETE_ARG --mode=$MODE_ARG \ |
| --yarn_home=$YARN_HOME/conf --zookeeper=$ZOOKEEPER_HOST:$ZOOKEEPER_PORT --kafka=$KAFKA_BROKER_LIST \ |
| --jar_package=$JAR_PATH --yarn_am_mem=$YARN_AM_MEMORY --yarn_container_mem=$YARN_CONTAINER_MEMORY \ |
| --kafka_replication_factor=$KAFKA_REPLICATION_FACTOR --checkpoint_frequency=$CHECKPOINT_FREQUENCY \ |
| --kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \ |
| --samoa_hdfs_dir=$HDFS_SAMOA_HOME |
| |
| elif [ $PLATFORM = 'FLINK' ]; then |
| |
| echo "Deploying to $PLATFORM" |
| if [ -z $FLINK_HOME ];then |
| echo "FLINK_HOME is not set!" |
| echo "Please set FLINK_HOME to point to your Flink installation" |
| exit -1 |
| fi |
| |
| if [ ! -f $2 ];then |
| echo "$2 does not exist!" |
| echo "Please use a valid jar file for Flink execution" |
| exit -1 |
| fi |
| |
| FLINK_EXEC="$FLINK_HOME/bin/flink" |
| |
| SAMOA_FLINK_PROPERTIES="samoa-flink.properties" |
| MODE_OPTION="samoa.flink.mode" |
| # NUM_WORKER_OPTION="samoa.flink.numWorker" |
| |
| VALUE="" |
| getvalue() |
| { |
| VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'` |
| } |
| |
| # getvalue "$NUM_WORKER_OPTION" |
| # NUM_WORKER="$VALUE" |
| |
| getvalue "$MODE_OPTION" |
| MODE_ARG="$VALUE" |
| |
| COMPLETE_ARG="" |
| COUNTER=0 |
| for var in "$@" |
| do |
| COUNTER=`expr $COUNTER + 1` |
| if [ $COUNTER -gt 2 ];then |
| COMPLETE_ARG="$COMPLETE_ARG $var" |
| fi |
| done |
| |
| DEPLOYABLE=$JAR_PATH |
| echo "$DEPLOYABLE" |
| if [ "$MODE_ARG" = "cluster" ]; then |
| FLINK_MASTER_OPTION="samoa.flink.flinkMaster" |
| PORT_OPTION="samoa.flink.port" |
| |
| getvalue "$FLINK_MASTER_OPTION" |
| FLINK_MASTER_OPTION="$VALUE" |
| |
| getvalue "$PORT_OPTION" |
| PORT_OPTION="$VALUE" |
| $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE $COMPLETE_ARG |
| |
| elif [ "$MODE_ARG" = "local" ]; then |
| $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG |
| fi |
| |
| elif [ $PLATFORM = 'THREADS' ]; then |
| |
| echo "Deploying to LOCAL with MULTITHREADING." |
| COMPLETE_ARG="" |
| COUNTER=0 |
| for var in "$@" |
| do |
| COUNTER=`expr $COUNTER + 1` |
| if [ $COUNTER -gt 2 ];then |
| COMPLETE_ARG="$COMPLETE_ARG $var" |
| fi |
| done |
| |
| java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalThreadsDoTask $COMPLETE_ARG |
| |
| elif [ $PLATFORM = 'LOCAL' ]; then |
| |
| echo "Deploying to $PLATFORM" |
| COMPLETE_ARG="" |
| COUNTER=0 |
| for var in "$@" |
| do |
| COUNTER=`expr $COUNTER + 1` |
| if [ $COUNTER -gt 2 ];then |
| COMPLETE_ARG="$COMPLETE_ARG $var" |
| fi |
| done |
| |
| java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG |
| else |
| echo "Specify a valid platform." |
| echo "Usage: samoa <platform> <jar> <task & options>" |
| fi |
| |