blob: 2546236b41fe40ec7dd5dee62e3996d9d0825505 [file] [log] [blame]
#!/usr/bin/env python
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------
import os
import glob
from os.path import join
import platform
import argparse
from utils import get_env_systemml_home, get_env_spark_home, find_dml_file, log4j_path, config_path
def default_jars(systemml_home):
"""
return: String
Location of systemml and jcuda jars
"""
build_dir = join(systemml_home, 'target')
lib_dir = join(build_dir, 'lib')
systemml_jar = build_dir + os.sep + "SystemML.jar"
jcuda_jars = glob.glob(lib_dir + os.sep + "jcu*.jar")
target_jars = ','.join(jcuda_jars)
return target_jars, systemml_jar
def spark_submit_entry(master, deploy_mode, driver_memory, num_executors,
executor_memory, executor_cores, conf,
nvargs, args, config, explain, debug, stats, gpu, f):
"""
This function is responsible for the execution of arguments via
subprocess call in hybrid_spark mode
"""
spark_home = get_env_spark_home()
systemml_home = get_env_systemml_home()
spark_path = join(spark_home, 'bin', 'spark-submit')
script_file = find_dml_file(systemml_home, f)
# Jars
cuda_jars, systemml_jars = default_jars(systemml_home)
# Log4j
log4j = log4j_path(systemml_home)
log4j_properties_path = 'spark.driver.extraJavaOptions=-Dlog4j.configuration=file:{}'.format(log4j)
if conf is None:
default_conf = log4j_properties_path
else:
default_conf = ' --conf '.join(conf + [log4j_properties_path])
# Config XML
if config is None:
default_config = config_path(systemml_home)
else:
default_config = ' -config '.join([config] + [config_path(systemml_home)])
if platform.system() == 'Windows':
default_conf = default_conf.replace('\\', '//')
# optional arguments
ml_options = []
if nvargs is not None:
ml_options.append('-nvargs')
ml_options.append(' '.join(nvargs))
if args is not None:
ml_options.append('-args')
ml_options.append(' '.join(args))
if explain is not None:
ml_options.append('-explain')
ml_options.append(explain)
if debug is not False:
ml_options.append('-debug')
if stats is not None:
ml_options.append('-stats')
ml_options.append(stats)
if gpu is not None:
ml_options.append('-gpu')
if gpu is not 'no_option':
ml_options.append(gpu)
if len(ml_options) < 1:
ml_options = ''
# stats, explain, target_jars
cmd_spark = [spark_path, '--class', 'org.apache.sysml.api.DMLScript',
'--master', master, '--deploy-mode', deploy_mode,
'--driver-memory', driver_memory,
'--conf', default_conf,
'--jars', cuda_jars, systemml_jars]
if num_executors is not None:
cmd_spark = cmd_spark + ['--num-executors', num_executors]
if executor_memory is not None:
cmd_spark = cmd_spark + ['--executor-memory', executor_memory]
if executor_cores is not None:
cmd_spark = cmd_spark + ['--executor-cores', executor_cores]
cmd_system_ml = ['-config', default_config,
'-exec', 'hybrid_spark', '-f', script_file, ' '.join(ml_options)]
cmd = cmd_spark + cmd_system_ml
# Debug
print(' '.join(cmd))
return_code = os.system(' '.join(cmd))
return return_code
if __name__ == '__main__':
cparser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='System-ML Spark Submit Script')
# SPARK-SUBMIT Options
cparser.add_argument('--master', default='local[*]', help='local, yarn', metavar='')
cparser.add_argument('--deploy-mode', help='client, cluster', default='client', metavar='')
cparser.add_argument('--driver-memory', default='8G', help='Memory for driver (e.g. 512M, 1G)', metavar='')
cparser.add_argument('--num-executors', nargs=1, help='Number of executors to launch', metavar='')
cparser.add_argument('--executor-memory', nargs=1, help='Memory per executor', metavar='')
cparser.add_argument('--executor-cores', nargs=1, help='Number of executor cores', metavar='')
cparser.add_argument('--conf', help='Spark configuration file', nargs='+', metavar='')
# SYSTEM-ML Options
cparser.add_argument('-nvargs', help='List of attributeName-attributeValue pairs', nargs='+', metavar='')
cparser.add_argument('-args', help='List of positional argument values', metavar='', nargs='+')
cparser.add_argument('-config', help='System-ML configuration file (e.g SystemML-config.xml)', metavar='')
cparser.add_argument('-explain', help='explains plan levels can be hops, runtime, '
'recompile_hops, recompile_runtime', nargs='?', const='runtime', metavar='')
cparser.add_argument('-debug', help='runs in debug mode', action='store_true')
cparser.add_argument('-stats', help='Monitor and report caching/recompilation statistics, '
'heavy hitter <count> is 10 unless overridden', nargs='?', const='10',
metavar='')
cparser.add_argument('-gpu', help='uses CUDA instructions when reasonable, '
'set <force> option to skip conservative memory estimates '
'and use GPU wherever possible', nargs='?', const='no_option')
cparser.add_argument('-f', required=True, help='specifies dml/pydml file to execute; '
'path can be local/hdfs/gpfs', metavar='')
args = cparser.parse_args()
arg_dict = vars(args)
return_code = spark_submit_entry(**arg_dict)
if return_code != 0:
print('Failed to run SystemML. Exit code :' + str(return_code))