#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


""":"
work_dir=$(dirname $0)
base_name=$(basename $0)
original_dir=$PWD
cd $work_dir

if [ $HOD_PYTHON_HOME ]; then
    exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
elif [ -e /usr/bin/python ]; then
    exec /usr/bin/python -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
elif [ -e /usr/local/bin/python ]; then
    exec /usr/local/bin/python -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
else
    exec python -u -OO $base_name ${1+"$@"} --hod.original-dir $work_dir
fi
":"""

"""The executable to be used by the user"""
import sys, os, re, pwd, threading, sys

myName          = os.path.basename(sys.argv[0])
myName          = re.sub(".*/", "", myName)
binDirectory    = os.path.realpath(sys.argv[0])
rootDirectory   = re.sub("/bin/.*", "", binDirectory)
libDirectory    = rootDirectory

sys.path.append(libDirectory)

from hodlib.Hod.hod import hodRunner
from hodlib.Common.setup import *
from hodlib.Common.descGenerator import *
from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings,\
    get_exception_error_string, hodInterrupt, \
    HOD_INTERRUPTED_MESG, HOD_INTERRUPTED_CODE,\
    TORQUE_USER_LIMITS_COMMENT_FIELD
from hodlib.Common.tcp import tcpError, tcpSocket
from hodlib.Hod.hod import hodHelp

filter_warnings()

reVersion = re.compile(".*(\d+_\d+).*")

VERSION = None
if os.path.exists("./VERSION"):
  vFile = open("./VERSION", 'r')
  VERSION = vFile.readline()
  vFile.close()

# Always look for hodrc file here unless otherwise specified with -c:   
DEFAULT_LOC = os.path.join(rootDirectory, 'conf')
DEFAULT_HOD_DIR = os.path.join(os.environ['HOME'], ".hod")

if not os.path.isdir(DEFAULT_HOD_DIR):
  os.mkdir(DEFAULT_HOD_DIR, 0777)

DEFAULT_CONFIG = os.path.join(DEFAULT_HOD_DIR, 'hodrc')
if not os.path.exists(DEFAULT_CONFIG):
  if os.environ.has_key('HOD_CONF_DIR') and os.environ['HOD_CONF_DIR'] is not None:
    DEFAULT_CONFIG = os.path.join(os.environ['HOD_CONF_DIR'], 'hodrc')

# Definition tuple is of the form:
#  (name, type, description, help?, default value, required?, validate?, 
#   short option)
#
defList = { 'hod' : (      
             ('original-dir', 'directory', 'hod original start directory',
              False, None, True, True, 'r'),

             ('clusterdir', 'directory', 
             'Directory where cluster state information and hadoop-site.xml' +
             ' will be stored.',
              True, None, False, False, 'd'),

             ('syslog-address', 'address', 'Syslog address.',
              False, None, False, True, 'y'),
              
             ('java-home', 'directory', 'Java home directory.',
              True, None, True, True, 'j'),
            
             ('debug', 'pos_int', 'Debugging level, 0-4.',
              True, 3, True, True, 'b'),
            
             ('stream', 'bool', 'Output to stderr.',
              False, True, False, True),

             ('nodecount', 'pos_int', 
              'Number of nodes to allocate at startup. ',
              True, None, False, True, 'n'),

             ('script', 'file', 'Hadoop script to execute.',
              True, None, False, False, 's'), 

             ('userid', 'user_account', 
              'User ID the hod shell is running under.',
              False, pwd.getpwuid(os.getuid())[0], False, True, 'u'),
             
             ('allocate-wait-time', 'pos_int', 
              'Time to wait for cluster allocation.',
              False, 300, True, True, 'e'),         
              
             ('operation', 'string',
              'Initiate a hod operation. (help, allocate, deallocate ...)',
              False, None, False, True, 'o'),
             
             ('cluster-factor', 'pos_float',
              'The number of grid slots per machines', False, 1.9, False, True,
              'x'),
             
             ('cluster', 'string', 'Name of cluster being used.',
              False, None, True, True, 'w'),

             ('proxy-xrs-address', 'address', 
              'Address to Allocation Manager XML RPC proxy.',
              False, None, False, True, 'p'),
              
             ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
              False, None, True, True),

             ('client-params', 'keyval', 'Hadoop client xml key/value list',
              True, None, False, True, 'C'), 

             ('hadoop-ui-log-dir', 'directory', 'Directory to store Web UI Logs of Hadoop',
              True, None, False, True),

             ('temp-dir', 'directory', 'HOD temporary directories.',
              False, None, True, False),

             ('update-worker-info', 'bool', 'Specifies whether to update Worker Info after allocation',
              False, False, False, True),

             ('job-feasibility-attr', 'string', 'Specifies whether to check job feasibility - resource manager and/or scheduler limits, also gives the attribute value',
              False, None, False, True),

             ('title', 'string', 'Title for the current HOD allocation.',
               True, "HOD", False, True, 'N'),

             ('walltime', 'pos_int', 'Walltime in seconds for the current HOD allocation',
              True, None, False, True, 'l'),

             ('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
              True, 10, False, True, 'W'),

             ('log-rollover-count', 'pos_int', 'Specifies the number of rolled-over log files of HOD client. A zero value disables rollover.',
              True, 5, False, True, 'L'),

             ('job-status-query-interval', 'pos_int', 'Specifies the time between checking for job status', 
              False, 30, False, True),

             ('job-command-failure-interval', 'pos_int', 'Specifies the time between checking for failed job status or submission commands', 
              False, 10, False, True),

             ('job-status-query-failure-retries', 'pos_int', 'Specifies the number of times job status failure queries are retried', 
              False, 3, False, True),

             ('job-submission-failure-retries', 'pos_int', 'Specifies the number of times job submission failure queries are retried',
              False, 3, False, True)),

            'resource_manager' : (
             ('id', 'string', 'Batch scheduler ID: torque|condor.',
              False, None, True, True),
             
             ('pbs-user', 'user_account', 'User ID jobs are submitted under.',
              False, None, False, True),
              
             ('pbs-account', 'string', 'User Account jobs are submitted under.',
              True, None, False, False, 'A'),
              
             ('queue', 'string', 'Queue of the batch scheduler to query.',
              True, 'batch', False, True, 'Q'),
             
             ('batch-home', 'directory', 'Scheduler installation directory.',
              False, None, True, True),
             
             ('options', 'keyval', 'Options to pass to the scheduler.',
              False, None, False, True),

             ('env-vars', 'keyval', 'Environment variables to pass to the submitted jobs.',
              False, None, False, True)),
                            
            'ringmaster' : (
             ('work-dirs', 'list', 'hod work directories',
              False, None, True, False),

             ('temp-dir', 'directory', 'Ringmaster temporary directory.',
              False, None, True, False),
              
             ('log-dir', 'directory', 'hod logging directory.', 
              False, os.path.join(rootDirectory, 'logs'), False, False),

             ('syslog-address', 'address', 'Syslog address.',
              False, None, False, True),

             ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
              False, None, True, True),
              
             ('http-port-range', 'range', 'HTTP port range n-m.',
              False, None, True, True),
              
             ('debug', 'pos_int', 'Debugging level, 0-4.',
              False, 4, True,   True),
               
             ('register', 'bool', 'Register with service registry?',
              False, True, True, True),
               
             ('stream', 'bool', 'Output to stderr.',
              False, False, False, True),
              
             ('userid', 'user_account', 
              'User ID the hod shell is running under.',
              False, pwd.getpwuid(os.getuid())[0], False, True),
               
             ('svcrgy-addr', 'address', 'Download HTTP address.',
              False, None, False, False),             
             
             ('hadoop-tar-ball', 'uri', 'hadoop program tar ball.',
              True, None, False, False, 't'),

             ('max-connect','pos_int','max connections allowed for a single tarball server',
             False, 30, False, True),

             ('jt-poll-interval', 'pos_int', 'How often to poll the Job tracker for idleness',
             False, 120, False, True),

             ('idleness-limit', 'pos_int', 'Limit after which to deallocate the cluster',
             False, 3600, False, True),

             ('max-master-failures', 'pos_int', 
              'Defines how many times a master can fail before' \
              ' failing cluster allocation', False, 5, True, True),

             ('workers_per_ring', 'pos_int', 'Defines number of workers per service per hodring',
             False, 1, False, True)),

            'gridservice-mapred' : (
             ('external', 'bool', "Connect to an already running MapRed?",
              False, False, True, True),
              
             ('host', 'hostname', 'Mapred hostname.', 
              False, 'localhost', False, False),

             ('info_port', 'pos_int', 'Mapred info port.',
              False, None, False, False),
             
             ('tracker_port', 'pos_int', 'Mapred job tracker port.',
              False, None, False, False),
                        
             ('cmdline-params', 'keyval', 'Hadoop cmdline key/value list.',
              False, None, False, False),

             ('server-params', 'keyval', 'Hadoop xml key/value list',
              True, None, False, True, 'M'),
               
             ('envs', 'keyval', 'environment to run this package in',
              False, None, False, True),

             ('final-server-params', 'keyval', 'Hadoop final xml key/val list',
              False, None, False, True, 'F'),

             ('pkgs', 'directory', "directory where the package is installed",
              False, None, False, False)), 
               
               
            'gridservice-hdfs' : (
             ('external', 'bool', "Connect to an already running HDFS?",
              False, False, True, True),
             
             ('host', 'hostname', 'HDFS hostname.', 
              False, 'localhost', False, False),
             
             ('fs_port', 'pos_int', 'HDFS port.',
              False, None, False, False),
              
             ('info_port', 'pos_int', 'HDFS info port.',
              False, None, False, False), 
             
             ('cmdline-params', 'keyval', 'Hadoop cmdline key/value list.',
              False, None, False, False),

             ('server-params', 'keyval', 'Hadoop xml key/value list',
              False, None, False, True, 'H'),

             ('final-server-params', 'keyval', 'Hadoop final xml key/value list',
              False, None, False, True, 'S'),
           
             ('envs', 'keyval', 'Environment in which to run this package.',
              False, None, False, True),

             ('pkgs', 'directory', "directory where the package is installed",
              False, None, False, False)),           
             
             
            'hodring' : (
             ('temp-dir', 'list', 'hodring temporary directory.',
              False, None, True, False),
              
             ('log-dir', 'directory', 'hod logging directory.', 
              False, os.path.join(rootDirectory, 'logs'), False, False), 
              
             ('log-destination-uri', 'string', 
              'URI to store logs to, local://some_path or '
              + 'hdfs://host:port/some_path', 
              False, None, False, True),

             ('pkgs', 'directory', 'Path to Hadoop to use in case of uploading to HDFS',
              False, None, False, False),
              
             ('syslog-address', 'address', 'Syslog address.',
              False, None, False, True),
          
             ('java-home', 'directory', 'Java home directory.',
              False, None, True, False),
              
             ('debug', 'pos_int', 'Debugging level, 0-4.',
              False, 3, True, True),
               
             ('register', 'bool', 'Register with service registry?',
              False, True, True, True),
               
             ('stream', 'bool', 'Output to stderr.',
              False, False, False, True),

             ('userid', 'user_account', 
              'User ID the hod shell is running under.',
              False, pwd.getpwuid(os.getuid())[0], False, True),
               
             ('command', 'string', 'Command for hodring to run.',
              False, None, False, True),

             ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
              False, None, True, True),
               
             ('http-port-range', 'range', 'HTTP port range n-m.',
              False, None, True, True),
              
             ('hadoop-port-range', 'range', 'Hadoop port range n-m.',
              False, None, True, True),  
            
             ('service-id', 'string', 'Service ID.',
              False, None, False, True),
              
             ('download-addr', 'string', 'Download HTTP address.',
              False, None, False, True),
               
             ('svcrgy-addr', 'address', 'Download HTTP address.',
              False, None, False, True), 
    
             ('ringmaster-xrs-addr', 'address', 'Ringmaster XML-RPC address.',
              False, None, False, True),

             ('tarball-retry-initial-time', 'pos_float','Initial Retry time for tarball download',
              False, 1, False, True),
              
             ('tarball-retry-interval', 'pos_float','interval to spread retries for tarball download',
              False, 3, False, True),
              
             ('cmd-retry-initial-time', 'pos_float','Initial retry time for getting commands',
              False, 2, False, True),
             
             ('cmd-retry-interval', 'pos_float','interval to spread retries for getting commands',
              False, 2, False, True),

             ('mapred-system-dir-root', 'string', 'Root under which mapreduce system directory names are generated by HOD.',
              False, '/mapredsystem', False, False))
              }   

defOrder = [ 'hod', 'ringmaster', 'hodring', 'resource_manager', 
             'gridservice-mapred', 'gridservice-hdfs' ]

def printErrors(msgs):
  for msg in msgs:
    print msg

def op_requires_pkgs(config):
  if config['hod'].has_key('operation'):
    return config['hod']['operation'].startswith('allocate')
  else:
    return config['hod'].has_key('script')

if __name__ == '__main__':  
  try:
    confDef = definition()
    confDef.add_defs(defList, defOrder)
    hodhelp = hodHelp()
    usage = hodhelp.help()
            
    hodOptions = options(confDef, usage,
                      VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG,
                      name=myName )
    # hodConfig is a dict like object, hodConfig[section][name]
    try:
      hodConfig = config(hodOptions['config'], configDef=confDef, 
                       originalDir=hodOptions['hod']['original-dir'],
                       options=hodOptions) 
    except IOError, e:
      print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
      sys.exit(1)
  
    # Conditional validation
    statusMsgs = []

    if hodConfig.normalizeValue('gridservice-hdfs', 'external'):
      # For external HDFS
      statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
                                                'fs_port'))
      statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
                                                'info_port'))
      statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
                                                'host'))
    else:
      hodConfig['gridservice-hdfs']['fs_port'] = 0 # Dummy
      hodConfig['gridservice-hdfs']['info_port'] = 0 # Not used at all

    if hodConfig.normalizeValue('gridservice-mapred', 'external'):
      statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
                                                'tracker_port'))
      statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
                                                'info_port'))
      statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
                                                'host'))
    else:
      hodConfig['gridservice-mapred']['tracker_port'] = 0 # Dummy
      hodConfig['gridservice-mapred']['info_port'] = 0 # Not used at all

    if len(statusMsgs) != 0:
      for msg in statusMsgs:
        print >>sys.stderr, msg
      sys.exit(1)
    # End of conditional validation

    status = True
    statusMsgs = []
  
    (status,statusMsgs) = hodConfig.verify()
    if not status:
      print >>sys.stderr,"error: bin/hod failed to start."
      for msg in statusMsgs:
        print >>sys.stderr,"%s" % (msg)
      sys.exit(1)
  
    ## TODO : should move the dependency verification to hodConfig.verify
    if hodConfig['hod'].has_key('operation') and \
      hodConfig['hod'].has_key('script'):
      print "Script operation is mutually exclusive with other HOD operations"
      hodOptions.print_help(sys.stderr)
      sys.exit(1)
    
    if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
      print "HOD requires at least a script or operation be specified."
      hodOptions.print_help(sys.stderr)
      sys.exit(1)    
    
    if hodConfig['gridservice-hdfs']['external']:
      hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'], 
                               hodConfig['gridservice-hdfs']['fs_port'])
  
      hdfsSocket = tcpSocket(hdfsAddress)
        
      try:
        hdfsSocket.open()
        hdfsSocket.close()
      except tcpError:
        printErrors(hodConfig.var_error('hod', 'gridservice-hdfs', 
          "Failed to open a connection to external hdfs address: %s." % 
          hdfsAddress))
        sys.exit(1)
    else:
      hodConfig['gridservice-hdfs']['host'] = 'localhost'
  
    if hodConfig['gridservice-mapred']['external']:
      mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'], 
                                 hodConfig['gridservice-mapred']['tracker_port'])
  
      mapredSocket = tcpSocket(mapredAddress)
        
      try:
        mapredSocket.open()
        mapredSocket.close()
      except tcpError:
        printErrors(hodConfig.var_error('hod', 'gridservice-mapred', 
          "Failed to open a connection to external mapred address: %s." % 
          mapredAddress))
        sys.exit(1)
    else:
      hodConfig['gridservice-mapred']['host'] = 'localhost'
  
    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
      not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
      op_requires_pkgs(hodConfig):
      printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs', 
        "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
        + "is not defined."))
      sys.exit(1)
  
    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
      not hodConfig['gridservice-mapred'].has_key('pkgs') and \
      op_requires_pkgs(hodConfig):
      printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs', 
        "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
        + "is not defined."))
      sys.exit(1)
  
    if hodConfig['hodring'].has_key('log-destination-uri'):
      if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
        pass
      elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
        hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
        hostPort = hostPort[0]
        socket = tcpSocket(hostPort)
        try:
          socket.open()
          socket.close()
        except:
          printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
          "Unable to contact host/port specified in log destination uri: %s" % 
          hodConfig['hodring']['log-destination-uri']))
          sys.exit(1)
      else:
        printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
          "The log destiniation uri must be of type local:// or hdfs://."))
        sys.exit(1)
  
    if hodConfig['ringmaster']['workers_per_ring'] < 1:
      printErrors(hodConfig.var_error('ringmaster', 'workers_per_ring',
                "ringmaster.workers_per_ring must be a positive integer " +
                "greater than or equal to 1"))
      sys.exit(1)
                        
    ## TODO : end of should move the dependency verification to hodConfig.verif
      
    hodConfig['hod']['base-dir'] = rootDirectory
    hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
  
    dGen = DescGenerator(hodConfig)
    hodConfig = dGen.initializeDesc()
    
    os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
    
    if hodConfig['hod']['debug'] == 4:
      print ""
      print "Using Python: %s" % sys.version
      print ""
   
    hod = hodRunner(hodConfig)
  
    # Initiate signal handling
    hodInterrupt.set_log(hod.get_logger())
    hodInterrupt.init_signals()
    # Interrupts set up. Now on we handle signals only when we wish to.
  except KeyboardInterrupt:
    print HOD_INTERRUPTED_MESG
    sys.exit(HOD_INTERRUPTED_CODE)
  
  opCode = 0
  try:
    if hodConfig['hod'].has_key('script'):
      opCode = hod.script()
    else:  
      opCode = hod.operation()
  except Exception, e:
    print "Uncaught Exception : %s" % e
  finally:
    sys.exit(opCode)
