#!/bin/bash

###
# #%L
# SAMOA
# %%
# Copyright (C) 2014 - 2015 Apache Software Foundation
# %%
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#      http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# #L%
###

# =====================================================================================================	#
# 			Scalable Advanced Massive Online Analysis a.k.a SAMOA				#
# =====================================================================================================	#
# -----------------------------------------------------------------------------------------------------	#
# SAMOA is a framework for mining big data streams. It contains machine learning			#
# algorithms that run on top of distributed stream processing engines.					#
# 													#
# Usage:												#
# ./samoa <platform> <jar> <task & options>								#
# Example:												#
# ./samoa storm /path/to/SAMOA-Storm-0.0.1.jar "ClusteringTask -s (RandomRBFGeneratorEvents -K 5 -a 2)"	#
#													#
# -----------------------------------------------------------------------------------------------------	#

usage() {
echo "Usage: $(basename $0) <platform> <jar> <task & options>" && exit 1
}
[ "$#" -ge 3 ] || usage

echo $0
BASE_DIR=$(dirname $0)
PLATFORM=${1:-NONE}
PLATFORM=$(echo $PLATFORM | tr '[:lower:]' '[:upper:]')
JAR_PATH=$2
JAR_FILE=$(basename $JAR_PATH)
JAR_DIR=$(dirname $JAR_PATH)
OPTIONS=$3


if [ $PLATFORM = 'S4' ]; then

	echo "Deploying to $PLATFORM"
        if [ -z $S4_HOME ];then
            echo "S4_HOME is not set!"
            echo "Please set S4_HOME to point to your S4 installation"
            exit -1
        fi

	# Local functions
	deployLocal() 
	{
		# Make sure to kill S4 processes before re-deploying
		echo "Starting Zookeeper..."
		(cd $S4_HOME ; . s4 zkServer -clean -port=${ZK_SERVER_PORT} &)
		sleep 10

		echo "Defining cluster..."
		(cd $S4_HOME ; . s4 newCluster -c=${CLUSTER_NAME} -nbTasks=1 -flp=${CLUSTER_PORT} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & )
		sleep 5

		echo "Starting node on cluster $CLUSTER_NAME..."
		(cd $S4_HOME ; . s4 node -c=${CLUSTER_NAME} -zk=${ZK_SERVER}:${ZK_SERVER_PORT} & )
		sleep 5
	}

	SAMOA_S4_PROPERTIES="samoa-s4.properties"
	PROPERTY_VALUE=""
	readProperty() {
	    property=$1
	    PROPERTY_VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_S4_PROPERTIES | grep $property  | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
	}
	# END Local functions

	curr_dir=`pwd`
	if [ -f /tmp/httpPID ]; then
		pid=`cat /tmp/httpPID`
		kill -9 $pid
		rm /tmp/httpPID
	fi

	cd $JAR_DIR
	# Starting HTTP Server
	python -m SimpleHTTPServer 8000 &
	echo $! > /tmp/httpPID
	sleep 2
	cd $curr_dir
	
	# Reading properties file
	readProperty 'http.server.ip'
	APP_SERVER_IP=$PROPERTY_VALUE
	if [ -z $APP_SERVER_IP ]; then
		APP_SERVER_IP=$(hostname)
	fi
	
	readProperty 'http.server.port'
	APP_SERVER_PORT=$PROPERTY_VALUE
	if [ -z $APP_SERVER_PORT ]; then
		APP_SERVER_PORT=8000
	fi
	
	readProperty 'zookeeper.server'
	ZK_SERVER=$PROPERTY_VALUE
	if [ -z $ZK_SERVER ]; then
		ZK_SERVER=$(hostname)
	fi
	
	readProperty 'zookeeper.port'
	ZK_SERVER_PORT=$PROPERTY_VALUE
	if [ -z $ZK_SERVER_PORT ]; then
		ZK_SERVER_PORT=2181
	fi
	
	readProperty 'cluster.name'
	CLUSTER_NAME=$PROPERTY_VALUE
	if [ -z $CLUSTER_NAME ]; then
		CLUSTER_NAME="cluster"
	fi
	
	readProperty 'cluster.port'
	CLUSTER_PORT=$PROPERTY_VALUE
	if [ -z $CLUSTER_NAME ]; then
		CLUSTER_PORT=12000
	fi

	readProperty 'samoa.deploy.mode'
	DEPLOY_MODE=$PROPERTY_VALUE
	if [ -z $DEPLOY_MODE ]; then
	  DEPLOY_MODE=local
	elif [ $DEPLOY_MODE = 'local' ]; then
	  deployLocal
	fi
	# END Reading properties file

	APP_NAME="SamoaS4App"
	APP_CLASS="org.apache.samoa.topology.impl.S4DoTask"
	NAMED_PARAMETERS=$(perl -MURI::Escape -e 'print uri_escape($ARGV[0]);' "$OPTIONS")

	echo "Deploying SAMOA..."
	(cd $S4_HOME ; . s4 deploy -s4r="http://${APP_SERVER_IP}:${APP_SERVER_PORT}/${JAR_FILE}" -c="${CLUSTER_NAME}" -p=evalTask="${NAMED_PARAMETERS}" -appClass="${APP_CLASS}" -appName="${APP_NAME}" -zk="${ZK_SERVER}:${ZK_SERVER_PORT}")	

	# wait for app to be deployed
	sleep 5
	# END Apache S4 deployment
	
elif [ $PLATFORM = 'STORM' ]; then

	echo "Deploying to $PLATFORM"
	if [ -z $STORM_HOME ];then
	    echo "STORM_HOME is not set!"
	    echo "Please set STORM_HOME to point to your Storm installation"
	    exit -1
	fi

	if [ ! -f $2 ];then
	    echo "$2 does not exist!"
	    echo "Please use a valid jar file for Storm execution"
	    exit -1
	fi

	STORM_EXEC="$STORM_HOME/bin/storm"

	SAMOA_STORM_PROPERTIES="samoa-storm.properties"
	MODE_OPTION="samoa.storm.mode"
	NUM_WORKER_OPTION="samoa.storm.numworker"

	VALUE=""
	getvalue()
        {
	    VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_STORM_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
        }

	getvalue "$NUM_WORKER_OPTION"
	NUM_WORKER="$VALUE"
	
	getvalue "$MODE_OPTION"
	MODE_ARG="$VALUE"

	COMPLETE_ARG=""
	COUNTER=0
	for var in "$@"
	do
	    COUNTER=`expr $COUNTER + 1`
	    if [ $COUNTER -gt 2 ];then
	        COMPLETE_ARG="$COMPLETE_ARG $var"
	    fi
	done
	
	DEPLOYABLE=$JAR_PATH
	if [ "$MODE_ARG" = "cluster" ]; then
		$STORM_EXEC jar $DEPLOYABLE org.apache.samoa.topology.impl.StormDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG
	elif [ "$MODE_ARG" = "local" ]; then
		CLASSPATH="$CLASSPATH:$STORM_HOME/lib/*:$BASE_DIR/:$DEPLOYABLE"
		java -cp $CLASSPATH org.apache.samoa.LocalStormDoTask $COMPLETE_ARG $NUM_WORKER
	fi

elif [ $PLATFORM = 'SAMZA' ]; then
	echo "Deploying to SAMZA"

	SAMOA_SAMZA_PROPERTIES=samoa-samza.properties
	MODE_OPTION='samoa.samza.mode'
        ZOOKEEPER_HOST_OPTION='zookeeper.host'
        ZOOKEEPER_PORT_OPTION='zookeeper.port'
        KAFKA_BROKER_LIST_OPTION='kafka.broker.list'
	KAFKA_REPLICATION_OPTION='kafka.replication.factor'
	CHECKPOINT_FREQ_OPTION='checkpoint.commit.ms'

        YARN_AM_MEMORY_OPTION='yarn.am.memory'
        YARN_CONTAINER_MEMORY_OPTION='yarn.container.memory'
        PI_PER_CONTAINER_OPTION='max.pi.per.container'

	KRYO_REGISTER_OPTION='kryo.register.file'

	HDFS_SAMOA_HOME_OPTION='hdfs.samoa.dir'

        VALUE=""
        getvalue()
        {
            VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_SAMZA_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
        }

        getvalue "$ZOOKEEPER_HOST_OPTION"
        ZOOKEEPER_HOST="$VALUE"

        getvalue "$ZOOKEEPER_PORT_OPTION"
        ZOOKEEPER_PORT="$VALUE"

        getvalue "$KAFKA_BROKER_LIST_OPTION"
        KAFKA_BROKER_LIST="$VALUE"

        getvalue "$MODE_OPTION"
        MODE_ARG="$VALUE"

        getvalue "$KAFKA_REPLICATION_OPTION"
        KAFKA_REPLICATION_FACTOR="$VALUE"

	getvalue "$CHECKPOINT_FREQ_OPTION"
        CHECKPOINT_FREQUENCY="$VALUE"

        getvalue "$YARN_AM_MEMORY_OPTION"
        YARN_AM_MEMORY="$VALUE"

        getvalue "$YARN_CONTAINER_MEMORY_OPTION"
        YARN_CONTAINER_MEMORY="$VALUE"

        getvalue "$KRYO_REGISTER_OPTION"
        KRYO_REGISTER_FILE="$VALUE"

	getvalue "$PI_PER_CONTAINER_OPTION"
        PI_PER_CONTAINER="$VALUE"

	getvalue "$HDFS_SAMOA_HOME_OPTION"
	HDFS_SAMOA_HOME="$VALUE"

        COMPLETE_ARG=""
        COUNTER=0
        for var in "$@"
        do
            COUNTER=`expr $COUNTER + 1`
            if [ $COUNTER -gt 2 ];then
                COMPLETE_ARG="$COMPLETE_ARG $var"
            fi
        done

        DEPLOYABLE=$JAR_PATH
        java -Dsamza.log.dir=$BASE_DIR/logs -Dsamza.container.name=client -cp $DEPLOYABLE org.apache.samoa.SamzaDoTask $COMPLETE_ARG --mode=$MODE_ARG \
               --yarn_home=$YARN_HOME/conf --zookeeper=$ZOOKEEPER_HOST:$ZOOKEEPER_PORT --kafka=$KAFKA_BROKER_LIST \
               --jar_package=$JAR_PATH --yarn_am_mem=$YARN_AM_MEMORY --yarn_container_mem=$YARN_CONTAINER_MEMORY \
               --kafka_replication_factor=$KAFKA_REPLICATION_FACTOR --checkpoint_frequency=$CHECKPOINT_FREQUENCY \
               --kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \
	       --samoa_hdfs_dir=$HDFS_SAMOA_HOME

elif [ $PLATFORM = 'FLINK' ]; then

        echo "Deploying to $PLATFORM"
        if [ -z $FLINK_HOME ];then
            echo "FLINK_HOME is not set!"
            echo "Please set FLINK_HOME to point to your Flink installation"
            exit -1
        fi

        if [ ! -f $2 ];then
            echo "$2 does not exist!"
            echo "Please use a valid jar file for Flink execution"
            exit -1
        fi

        FLINK_EXEC="$FLINK_HOME/bin/flink"

        SAMOA_FLINK_PROPERTIES="samoa-flink.properties"
        MODE_OPTION="samoa.flink.mode"
#        NUM_WORKER_OPTION="samoa.flink.numWorker"

        VALUE=""
        getvalue()
        {
            VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
        }
        
#        getvalue "$NUM_WORKER_OPTION"
#        NUM_WORKER="$VALUE"

        getvalue "$MODE_OPTION"
        MODE_ARG="$VALUE"

        COMPLETE_ARG=""
        COUNTER=0
        for var in "$@"
        do
            COUNTER=`expr $COUNTER + 1`
            if [ $COUNTER -gt 2 ];then
                COMPLETE_ARG="$COMPLETE_ARG $var"
            fi
        done

        DEPLOYABLE=$JAR_PATH
        echo "$DEPLOYABLE"
        if [ "$MODE_ARG" = "cluster" ]; then
				FLINK_MASTER_OPTION="samoa.flink.flinkMaster"
				PORT_OPTION="samoa.flink.port"

				getvalue "$FLINK_MASTER_OPTION"
				FLINK_MASTER_OPTION="$VALUE"

				getvalue "$PORT_OPTION"
				PORT_OPTION="$VALUE"
                $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE  $COMPLETE_ARG

        elif [ "$MODE_ARG" = "local" ]; then
                $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
        fi                                                    

elif [ $PLATFORM = 'THREADS' ]; then
	
	echo "Deploying to LOCAL with MULTITHREADING."
	COMPLETE_ARG=""
        COUNTER=0
        for var in "$@"
        do
            COUNTER=`expr $COUNTER + 1`
            if [ $COUNTER -gt 2 ];then
                COMPLETE_ARG="$COMPLETE_ARG $var"
            fi
        done
        
	java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalThreadsDoTask $COMPLETE_ARG

elif [ $PLATFORM = 'LOCAL' ]; then

	echo "Deploying to $PLATFORM"
	COMPLETE_ARG=""
	COUNTER=0
	for var in "$@"
	do
	    COUNTER=`expr $COUNTER + 1`
	    if [ $COUNTER -gt 2 ];then
	        COMPLETE_ARG="$COMPLETE_ARG $var"
	    fi
	done

	java $JAVA_OPTS -cp $JAR_PATH org.apache.samoa.LocalDoTask $COMPLETE_ARG $NUM_WORKER $MODE_ARG
else
	echo "Specify a valid platform."
	echo "Usage: samoa <platform> <jar> <task & options>"
fi

