blob: f15dbebe607a888334504f5f6c35cd57c9a5f824 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2017] [B2W Digital]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import click
import json
import os
import sys
import time
import os.path
import re
import shutil
import subprocess
import jinja2
import six
from unidecode import unidecode
import multiprocessing
from marvin_python_toolbox.common.profiling import profiling
from marvin_python_toolbox.common.data import MarvinData
from marvin_python_toolbox.common.config import Config
from .._compatibility import iteritems
from .._logging import get_logger
logger = get_logger('management.engine')
@click.group('engine')
def cli():
pass
@cli.command('engine-dryrun', help='Marvin Dryrun Utility - Run marvin engines in a stadalone way')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--initial-dataset', '-id', help='Initial dataset file path', type=click.Path(exists=True))
@click.option('--dataset', '-d', help='Dataset file path', type=click.Path(exists=True))
@click.option('--model', '-m', help='Engine model file path', type=click.Path(exists=True))
@click.option('--metrics', '-me', help='Engine Metrics file path', type=click.Path(exists=True))
@click.option('--params-file', '-pf', default='engine.params', help='Marvin engine params file path', type=click.Path(exists=True))
@click.option('--messages-file', '-mf', default='engine.messages', help='Marvin engine predictor input messages file path', type=click.Path(exists=True))
@click.option('--feedback-file', '-ff', default='feedback.messages', help='Marvin engine feedback input messages file path', type=click.Path(exists=True))
@click.option('--response', '-r', default=True, is_flag=True, help='If enable, print responses from engine online actions (ppreparator and predictor)')
@click.option('--profiling', default=False, is_flag=True, help='Enable execute method profiling')
@click.option('--spark-conf', '-c', envvar='SPARK_CONF_DIR', type=click.Path(exists=True), help='Spark configuration folder path to be used in this session')
@click.pass_context
def dryrun_cli(ctx, action, params_file, messages_file, feedback_file, initial_dataset, dataset, model, metrics, response, spark_conf, profiling):
dryrun(ctx, action, params_file, messages_file, feedback_file, initial_dataset, dataset, model, metrics, response, spark_conf, profiling)
def dryrun(ctx, action, params_file, messages_file, feedback_file, initial_dataset, dataset, model, metrics, response, spark_conf, profiling):
print(chr(27) + "[2J")
# setting spark configuration directory
os.environ["SPARK_CONF_DIR"] = spark_conf if spark_conf else os.path.join(os.environ["SPARK_HOME"], "conf")
os.environ["YARN_CONF_DIR"] = os.environ["SPARK_CONF_DIR"]
params = read_file(params_file)
messages_file = read_file(messages_file)
feedback_file = read_file(feedback_file)
if action in ['all', 'ppreparator', 'predictor'] and not messages_file:
print('Please, set the input message to be used by the dry run process. Use --messages-file flag to informe in a json valid form.')
sys.exit("Stoping process!")
if action in ['all', 'feedback'] and not feedback_file:
print('Please, set the feedback input message to be used by the dry run process. Use --feedback-file flag to informe in a json valid form.')
sys.exit("Stoping process!")
if action == 'all':
pipeline = ['acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']
else:
pipeline = [action]
_dryrun = MarvinDryRun(ctx=ctx, messages=[messages_file, feedback_file], print_response=response)
initial_start_time = time.time()
for step in pipeline:
_dryrun.execute(clazz=CLAZZES[step], params=params, initial_dataset=initial_dataset, dataset=dataset, model=model, metrics=metrics,
profiling_enabled=profiling)
print("Total Time : {:.2f}s".format(time.time() - initial_start_time))
print("\n")
CLAZZES = {
"acquisitor": "AcquisitorAndCleaner",
"tpreparator": "TrainingPreparator",
"trainer": "Trainer",
"evaluator": "MetricsEvaluator",
"ppreparator": "PredictionPreparator",
"predictor": "Predictor",
"feedback": "Feedback"
}
class MarvinDryRun(object):
def __init__(self, ctx, messages, print_response):
self.predictor_messages = messages[0]
self.feedback_messages = messages[1]
self.pmessages = []
self.package_name = ctx.obj['package_name']
self.kwargs = None
self.print_response = print_response
def execute(self, clazz, params, initial_dataset, dataset, model, metrics, profiling_enabled=False):
self.print_start_step(clazz)
_Step = dynamic_import("{}.{}".format(self.package_name, clazz))
if not self.kwargs:
self.kwargs = generate_kwargs(_Step, params, initial_dataset, dataset, model, metrics)
step = _Step(**self.kwargs)
def call_online_actions(step, msg, msg_idx):
def print_message(result):
try:
print(json.dumps(result, indent=4, sort_keys=True))
except TypeError:
print("Unable to serialize the object returned!")
if self.print_response:
print("\nMessage {} :\n".format(msg_idx))
print_message(msg)
if profiling_enabled:
with profiling(output_path=".profiling", uid=clazz) as prof:
result = step.execute(input_message=msg, params=params)
prof.disable
print("\nProfile images created in {}\n".format(prof.image_path))
else:
result = step.execute(input_message=msg, params=params)
if self.print_response:
print("\nResult for Message {} :\n".format(msg_idx))
print_message(result)
return result
if clazz == 'PredictionPreparator':
for idx, msg in enumerate(self.predictor_messages):
self.pmessages.append(call_online_actions(step, msg, idx))
elif clazz == 'Feedback':
for idx, msg in enumerate(self.feedback_messages):
self.pmessages.append(call_online_actions(step, msg, idx))
elif clazz == 'Predictor':
self.execute("PredictionPreparator", params, initial_dataset, dataset, model, metrics)
self.pmessages = self.messages if not self.pmessages else self.pmessages
for idx, msg in enumerate(self.pmessages):
call_online_actions(step, msg, idx)
else:
if profiling_enabled:
with profiling(output_path=".profiling", uid=clazz) as prof:
step.execute(params=params)
prof.disable
print("\nProfile images created in {}\n".format(prof.image_path))
else:
step.execute(params=params)
self.print_finish_step()
def print_finish_step(self):
print("\n STEP TAKES {:.4f} (seconds) ".format((time.time() - self.start_time)))
def print_start_step(self, name):
print("\n------------------------------------------------------------------------------")
print("MARVIN DRYRUN - STEP [{}]".format(name))
print("------------------------------------------------------------------------------\n")
self.start_time = time.time()
def dynamic_import(clazz):
components = clazz.split('.')
mod = __import__(components[0])
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
def read_file(filename):
fname = os.path.join("", filename)
if os.path.exists(fname):
print("Engine file {} loaded!".format(filename))
with open(fname, 'r') as fp:
return json.load(fp)
else:
print("Engine file {} doesn't exists...".format(filename))
return {}
def generate_kwargs(clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None):
kwargs = {}
if params:
kwargs["params"] = params
if dataset:
kwargs["dataset"] = clazz.retrieve_obj(dataset)
if initial_dataset:
kwargs["initial_dataset"] = clazz.retrieve_obj(initial_dataset)
if model:
kwargs["model"] = clazz.retrieve_obj(model)
if metrics:
kwargs["metrics"] = clazz.retrieve_obj(metrics)
kwargs["persistence_mode"] = 'local'
kwargs["default_root_path"] = os.path.join(os.getenv('MARVIN_DATA_PATH'), '.artifacts')
kwargs["is_remote_calling"] = True
return kwargs
class MarvinEngineServer(object):
@classmethod
def create(self, ctx, action, port, workers, rpc_workers, params, initial_dataset, dataset, model, metrics, pipeline):
package_name = ctx.obj['package_name']
def create_object(act):
clazz = CLAZZES[act]
_Action = dynamic_import("{}.{}".format(package_name, clazz))
kwargs = generate_kwargs(_Action, params, initial_dataset, dataset, model, metrics)
return _Action(**kwargs)
root_obj = create_object(action)
previous_object = root_obj
if pipeline:
for step in list(reversed(pipeline)):
previous_object._previous_step = create_object(step)
previous_object = previous_object._previous_step
server = root_obj._prepare_remote_server(port=port, workers=workers, rpc_workers=rpc_workers)
print("Starting GRPC server [{}] for {} Action".format(port, action))
server.start()
return server
@cli.command('engine-grpcserver', help='Marvin gRPC engine action server starts')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--initial-dataset', '-id', help='Initial dataset file path', type=click.Path(exists=True))
@click.option('--dataset', '-d', help='Dataset file path', type=click.Path(exists=True))
@click.option('--model', '-m', help='Engine model file path', type=click.Path(exists=True))
@click.option('--metrics', '-me', help='Engine Metrics file path', type=click.Path(exists=True))
@click.option('--params-file', '-pf', default='engine.params', help='Marvin engine params file path', type=click.Path(exists=True))
@click.option('--metadata-file', '-mf', default='engine.metadata', help='Marvin engine metadata file path', type=click.Path(exists=True))
@click.option('--spark-conf', '-c', envvar='SPARK_CONF_DIR', type=click.Path(exists=True), help='Spark configuration path to be used')
@click.option('--max-workers', '-w', default=multiprocessing.cpu_count(), help='Max number of grpc threads workers per action')
@click.option('--max-rpc-workers', '-rw', default=multiprocessing.cpu_count(), help='Max number of grpc workers per action')
@click.pass_context
def engine_server(ctx, action, params_file, metadata_file, initial_dataset, dataset, model, metrics, spark_conf, max_workers, max_rpc_workers):
print("Starting server ...")
# setting spark configuration directory
os.environ["SPARK_CONF_DIR"] = spark_conf if spark_conf else os.path.join(os.environ["SPARK_HOME"], "conf")
os.environ["YARN_CONF_DIR"] = os.environ["SPARK_CONF_DIR"]
params = read_file(params_file)
metadata = read_file(metadata_file)
default_actions = {action['name']: action for action in metadata['actions']}
if action == 'all':
action = default_actions
else:
action = {action: default_actions[action]}
servers = []
for action_name in action.keys():
# initializing server configuration
engine_server = MarvinEngineServer.create(
ctx=ctx,
action=action_name,
port=action[action_name]["port"],
workers=max_workers,
rpc_workers=max_rpc_workers,
params=params,
initial_dataset=initial_dataset,
dataset=dataset,
model=model,
metrics=metrics,
pipeline=action[action_name]["pipeline"]
)
servers.append(engine_server)
try:
while True:
time.sleep(100)
except KeyboardInterrupt:
print("Terminating server ...")
for server in servers:
server.stop(0)
TEMPLATE_BASES = {
'python-engine': os.path.join(os.path.dirname(__file__), 'templates', 'python-engine')
}
RENAME_DIRS = [
('project_package', '{{project.package}}'),
]
IGNORE_DIRS = [
# Ignore service internal templates
'templates'
]
_orig_type = type
@cli.command('engine-generateenv', help='Generate a new marvin engine environment and install default requirements.')
@click.argument('engine-path', type=click.Path(exists=True))
@click.option('--python', '-p', default='python', help='The Python interpreter to use to create the new environment')
def generate_env(engine_path, python):
dir_ = os.path.basename(os.path.abspath(engine_path))
venv_name = _create_virtual_env(dir_, engine_path, python)
_call_make_env(venv_name)
print('\nDone!!!!')
print('Now to workon in the new engine project use: workon {}'.format(venv_name))
@cli.command('engine-generate', help='Generate a new marvin engine project and install default requirements.')
@click.option('--name', '-n', prompt='Project name', help='Project name')
@click.option('--description', '-d', prompt='Short description', default='Marvin engine', help='Library short description')
@click.option('--mantainer', '-m', prompt='Mantainer name', default='Marvin AI Community', help='Mantainer name')
@click.option('--email', '-e', prompt='Mantainer email', default='marvin-ai@googlegroups.com', help='Mantainer email')
@click.option('--package', '-p', default='', help='Package name')
@click.option('--dest', '-d', envvar='MARVIN_HOME', type=click.Path(exists=True), help='Root folder path for the creation')
@click.option('--no-env', is_flag=True, default=False, help='Don\'t create the virtual enviroment')
@click.option('--no-git', is_flag=True, default=False, help='Don\'t initialize the git repository')
@click.option('--python', '-py', default='python', help='The Python interpreter to use to create the new environment')
def generate(name, description, mantainer, email, package, dest, no_env, no_git, python):
type_ = 'python-engine'
type = _orig_type
# Process package name
package = _slugify(package or name)
# Make sure package name starts with "marvin"
if not package.startswith('marvin'):
package = 'marvin_{}'.format(package)
# Remove "lib" prefix from package name
if type_ == 'lib' and package.endswith('lib'):
package = package[:-3]
# Custom strip to remove underscores
package = package.strip('_')
# Append project type to services
if type_ == 'python-engine' and not package.endswith('engine'):
package = '{}_engine'.format(package)
# Process directory/virtualenv name
# Directory name should use '-' instead of '_'
dir_ = package.replace('_', '-')
# Remove "marvin" prefix from directory
if dir_.startswith('marvin'):
dir_ = dir_[6:]
dir_ = dir_.strip('-')
# Append "lib" to directory name if creating a lib
if type_ == 'lib' and not dir_.endswith('lib'):
dir_ = '{}-lib'.format(dir_)
dest = os.path.join(dest, dir_)
if type_ not in TEMPLATE_BASES:
print('[ERROR] Could not found template files for "{type}".'.format(type=type_))
sys.exit(1)
project = {
'name': _slugify(name),
'description': description,
'package': package,
'toolbox_version': os.getenv('TOOLBOX_VERSION'),
'type': type_
}
mantainer = {
'name': mantainer,
'email': email,
}
context = {
'project': project,
'mantainer': mantainer,
}
folder_created = False
try:
_copy_scaffold_structure(TEMPLATE_BASES[type_], dest)
folder_created = True
_copy_processed_files(TEMPLATE_BASES[type_], dest, context)
_rename_dirs(dest, RENAME_DIRS, context)
_make_data_link(dest)
venv_name = None
if not no_env:
venv_name = _create_virtual_env(dir_, dest, python)
_call_make_env(venv_name)
if not no_git:
_call_git_init(dest)
print('\nDone!!!!')
if not no_env:
print('Now to workon in the new engine project use: workon {}'.format(venv_name))
except Exception as e:
logger.info(e)
# remove project if created
if os.path.exists(dest) and folder_created:
shutil.rmtree(dest)
_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
def _slugify(text, delim='_'):
result = []
for word in _punct_re.split(text.lower()):
result.extend(unidecode(word).split())
return six.u(delim.join(result))
def _copy_scaffold_structure(src, dest):
os.mkdir(dest)
for root, dirs, files in os.walk(src):
for dir_ in dirs:
dirname = os.path.join(root, dir_)
dirname = '{dest}{dirname}'.format(dest=dest, dirname=dirname.replace(src, '')) # get dirname without source path
os.mkdir(dirname)
def _copy_processed_files(src, dest, context):
env = jinja2.Environment(loader=jinja2.FileSystemLoader(src))
print('Processing template files...')
for root, dirs, files in os.walk(src):
dirname = root.replace(src, '')[1:] # get dirname without source path
to_dirname = os.path.join(dest, dirname)
should_process = not any(root.startswith(dir_) for dir_ in IGNORE_DIRS)
for file in files:
# Ignore trash
if file == '.DS_Store' or file.endswith('.pyc'):
continue
from_ = os.path.join(dirname, file)
to_ = os.path.join(to_dirname, file)
print('Copying "{0}" to "{1}"...'.format(from_, to_))
if not should_process:
shutil.copy(os.path.join(src, from_), to_)
else:
template = env.get_template(from_)
output = template.render(**context)
with open(to_, 'w') as file:
file.write(output)
def _rename_dirs(base, dirs, context):
for dir_ in dirs:
dirname, template = dir_
oldname = os.path.join(base, dirname)
processed = jinja2.Template(template).render(**context)
newname = os.path.join(base, processed)
shutil.move(oldname, newname)
print('Renaming {0} as {1}'.format(oldname, newname))
def _create_virtual_env(name, dest, python):
venv_name = '{}-env'.format(name).replace('_', '-')
print('Creating virtualenv: {0}...'.format(venv_name))
command = ['bash', '-c', '. virtualenvwrapper.sh; mkvirtualenv -p {0} -a {1} {2};'.format(python, dest, venv_name)]
try:
result = subprocess.Popen(command, env=os.environ).wait()
if result > 0:
sys.exit(1)
except:
logger.exception('Could not create the virtualenv!')
sys.exit(1)
return venv_name
def _call_make_env(venv_name):
command = ['bash', '-c', '. virtualenvwrapper.sh; workon {}; make marvin'.format(venv_name)]
try:
subprocess.Popen(command, env=os.environ).wait()
except:
logger.exception('Could not call make marvin!')
sys.exit(1)
def _call_git_init(dest):
command = ['bash', '-c', '/usr/bin/git init {0}'.format(dest)]
print('Initializing git repository...')
try:
subprocess.Popen(command, env=os.environ).wait()
except OSError:
print('WARNING: Could not initialize repository!')
def _make_data_link(dest):
data_path = os.environ['MARVIN_DATA_PATH']
data_link = os.path.join(dest, 'notebooks/data')
os.symlink(data_path, data_link)
@cli.command('engine-httpserver', help='Marvin http api server starts')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--initial-dataset', '-id', help='Initial dataset file path', type=click.Path(exists=True))
@click.option('--dataset', '-d', help='Dataset file path', type=click.Path(exists=True))
@click.option('--model', '-m', help='Engine model file path', type=click.Path(exists=True))
@click.option('--metrics', '-me', help='Engine Metrics file path', type=click.Path(exists=True))
@click.option('--protocol', '-pr', default='', help='Marvin protocol to be loaded during initialization.')
@click.option('--params-file', '-pf', default='engine.params', help='Marvin engine params file path', type=click.Path(exists=True))
@click.option('--spark-conf', '-c', envvar='SPARK_CONF_DIR', type=click.Path(exists=True), help='Spark configuration folder path to be used in this session')
@click.option('--http-host', '-h', default='localhost', help='Engine executor http bind host')
@click.option('--http-port', '-p', default=8000, help='Engine executor http port')
@click.option('--executor-path', '-e', help='Marvin engine executor jar path', type=click.Path(exists=True))
@click.option('--max-workers', '-w', default=multiprocessing.cpu_count(), help='Max number of grpc threads workers per action')
@click.option('--max-rpc-workers', '-rw', default=multiprocessing.cpu_count(), help='Max number of grpc workers per action')
@click.option('--extra-executor-parameters', '-jvm', help='Use to send extra JVM parameters to engine executor process')
@click.pass_context
def engine_httpserver_cli(ctx, action, params_file, initial_dataset, dataset,
model, metrics, protocol, spark_conf, http_host, http_port,
executor_path, max_workers, max_rpc_workers, extra_executor_parameters):
engine_httpserver(
ctx, action, params_file, initial_dataset, dataset,
model, metrics, protocol, spark_conf, http_host, http_port,
executor_path, max_workers, max_rpc_workers, extra_executor_parameters
)
def engine_httpserver(ctx, action, params_file, initial_dataset, dataset, model, metrics, protocol, spark_conf, http_host,
http_port, executor_path, max_workers, max_rpc_workers, extra_executor_parameters):
logger.info("Starting http and grpc servers ...")
grpcserver = None
httpserver = None
def _params(**kwargs):
params = []
if kwargs is not None:
for key, value in iteritems(kwargs):
if value is not None:
params.append("-{0}".format(str(key)))
params.append(str(value))
return params
try:
optional_args = _params(id=initial_dataset, d=dataset, m=model, me=metrics, pf=params_file, c=spark_conf)
grpcserver = subprocess.Popen(['marvin', 'engine-grpcserver', '-a', action, '-w', str(max_workers), '-rw', str(max_rpc_workers)] + optional_args)
time.sleep(3)
except:
logger.exception("Could not start grpc server!")
sys.exit(1)
try:
if not (executor_path and os.path.exists(executor_path)):
executor_url = Config.get("executor_url", section="marvin")
executor_path = MarvinData.download_file(executor_url, force=False)
command_list = ['java']
command_list.append('-DmarvinConfig.engineHome={}'.format(ctx.obj['config']['inidir']))
command_list.append('-DmarvinConfig.ipAddress={}'.format(http_host))
command_list.append('-DmarvinConfig.port={}'.format(http_port))
command_list.append('-DmarvinConfig.protocol={}'.format(protocol))
if extra_executor_parameters:
command_list.append(extra_executor_parameters)
command_list.append('-jar')
command_list.append(executor_path)
httpserver = subprocess.Popen(command_list)
except:
logger.exception("Could not start http server!")
grpcserver.terminate() if grpcserver else None
sys.exit(1)
try:
while True:
time.sleep(100)
except KeyboardInterrupt:
logger.info("Terminating http and grpc servers...")
grpcserver.terminate() if grpcserver else None
httpserver.terminate() if httpserver else None
logger.info("Http and grpc servers terminated!")
sys.exit(0)
@cli.command('engine-deploy', help='Engine provisioning and deployment command')
@click.option('--provision', is_flag=True, default=False, help='Forces provisioning')
@click.option('--package', is_flag=True, default=False, help='Creates engine package')
@click.option('--skip-clean', is_flag=True, default=False, help='Skips make clean')
def engine_deploy(provision, package, skip_clean):
TOOLBOX_VERSION = os.getenv('TOOLBOX_VERSION')
if provision:
subprocess.Popen([
"fab",
"provision",
], env=os.environ).wait()
subprocess.Popen([
"fab",
"deploy:version={version}".format(version=TOOLBOX_VERSION),
], env=os.environ).wait()
elif package:
subprocess.Popen([
"fab",
"package:version={version}".format(version=TOOLBOX_VERSION),
], env=os.environ).wait()
elif skip_clean:
subprocess.Popen([
"fab",
"deploy:version={version},skip_clean=True".format(version=TOOLBOX_VERSION),
], env=os.environ).wait()
else:
subprocess.Popen([
"fab",
"deploy:version={version}".format(version=TOOLBOX_VERSION),
], env=os.environ).wait()
@cli.command('engine-httpserver-remote', help='Remote HTTP server control command')
@click.option('--http_host', '-h', default='0.0.0.0', help='Engine executor http bind host')
@click.option('--http_port', '-p', default=8000, help='Engine executor http port')
@click.argument('command', type=click.Choice(['start', 'stop', 'status']))
def engine_httpserver_remote(command, http_host, http_port):
if command == "start":
subprocess.Popen([
"fab",
"engine_start:{host},{port}".format(host=http_host, port=http_port)
], env=os.environ).wait()
elif command == "stop":
subprocess.Popen([
"fab",
"engine_stop",
], env=os.environ).wait()
elif command == "status":
subprocess.Popen([
"fab",
"engine_status",
], env=os.environ).wait()
else:
print("Usage: marvin engine-httpserver-remote [ start | stop | status ]")