blob: 00ab26fe2f44d5bdf465be5ca550081e66a19032 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2020] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import json
import time
import wget
import click
import pickle
import pathlib
import subprocess
from cookiecutter.main import cookiecutter
from shutil import which
from ..communication.remote_calls import RemoteCalls
from ..utils.misc import package_folder, extract_folder, get_version
from ..utils.misc import call_logs, package_to_name, get_executor_path_or_download
from ..utils.misc import generate_timestamp, persist_process
from ..utils.git import git_init, bump_version
from ..utils.log import get_logger
from ..utils.benchmark import benchmark_thread, create_poi, make_graph
logger = get_logger('engine')
@click.group("engine")
def cli():
pass
def _validate_project_name(name):
return ''.join(c for c in name if c.isalnum())
TEMPLATE_FOLDER = os.path.join(pathlib.Path(__file__).parent.absolute(), 'template')
@cli.command("project-generate", help="Generate engine project")
@click.option('--name', '-n', prompt='Project name', help='Engine name')
@click.option('--description', '-d', prompt='Project description', help='Engine description')
@click.option('--url', '-u', prompt='Project URL', help='Engine URL Address', default='marvin.apache.org')
@click.option('--maintainer', '-m', prompt='Maintainer', help='Engine Maintainer')
@click.option('--email', '-e', prompt='Maintainer E-mail', help='Engine maintainer e-mail')
@click.option('--dest', '-f', envvar='MARVIN_HOME', type=click.Path(exists=True), help='Root folder path for the creation')
@click.option('--template', '-t', help='Base template for engine.', default=TEMPLATE_FOLDER, type=click.Path(exists=True))
def generate(name, description, url, maintainer, email, dest, template):
_processed_name = _validate_project_name(name)
_extras_dir = {
'project_name': _processed_name,
'project_package': 'marvin_' + _processed_name.lower(),
'project_url': url,
'project_description': description,
'maintainer_name': maintainer,
'maintainer_email': email
}
print(TEMPLATE_FOLDER)
_init_dir = os.path.join(dest, _processed_name)
cookiecutter(template, output_dir=dest, extra_context=_extras_dir, no_input=True)
git_init(_init_dir)
logger.info("Engine {0} created".format(_processed_name))
EXPORT_PATH = os.path.join(os.environ['MARVIN_DATA_PATH'], 'exports')
@cli.command("project-export", help="Export engine project to a archive file.")
@click.option('--dest', '-d', default=EXPORT_PATH, type=click.Path(exists=True), help='Output folder.')
@click.pass_context
def export(ctx, dest):
filename = os.path.join(dest, ctx.obj['package_name']
+ '-' + get_version(ctx.obj['package_name']) + ".tar.gz")
package_folder(os.getcwd(), filename)
@cli.command("project-import", help="Import engine project from archive file.")
@click.option('--file', '-f', type=click.Path(exists=True), help='Compressed Engine file.')
@click.option('--dest', '-d', envvar='MARVIN_HOME', type=click.Path(exists=True), help='Path to extract')
def import_project(file, dest):
extract_folder(file, dest)
@cli.command("engine-dryrun", help="Run engines in a standalone way.")
@click.option('--grpchost', '-gh', help='gRPC Host Address', default='localhost')
@click.option('--grpcport', '-gp', help='gRPC Port', default='50057')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--profiling', '-p', default=False, is_flag=True, help='Deterministic profiling of user code.')
def dryrun(grpchost, grpcport, action, profiling):
rc = RemoteCalls(grpchost, grpcport)
rc.run_dryrun(action, profiling)
@cli.command("engine-grpcserver", help="Run gRPC of given actions.")
@click.option('--grpchost', '-gh', help='gRPC Host Address', default='localhost')
@click.option('--grpcport', '-gp', help='gRPC Port', default='50057')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--max-workers', '-w', help='Max Workers', default=None)
@click.option('--max-rpc-workers', '-rw', help='Max gRPC Workers', default=None)
def grpc(grpchost, grpcport, action, max_workers, max_rpc_workers):
rc = RemoteCalls(grpchost, grpcport)
rc.run_grpc(action, max_workers, max_rpc_workers)
try:
while(True):
time.sleep(100)
except KeyboardInterrupt:
rc.stop_grpc()
logger.info("gRPC server terminated!")
@cli.command("engine-logs", help="Show daemon execution.")
@click.option('--follow', '-f', is_flag=True)
@click.option('--tail', '-t', default=True, is_flag=True)
@click.option('--buffer', '-b', default=20)
@click.pass_context
def docker_logs(ctx, follow, tail, buffer):
p_logs = call_logs(ctx.obj['package_name'], follow, buffer)
if follow:
persist_process(p_logs)
@cli.command("engine-httpserver", help="Run executor HTTP server.")
@click.option('--grpchost', '-gh', help='gRPC Host Address', default='localhost')
@click.option('--grpcport', '-gp', help='gRPC Port', default='50057')
@click.option('--host', '-h', prompt='API host', help='REST API Host', default='localhost')
@click.option('--port', '-p', prompt='API port', help='REST API Port', default='8000')
@click.option('--protocol', '-pr', help='Marvin protocol to be loaded during initialization.', default='')
@click.option(
'--action',
'-a',
default='all',
type=click.Choice(['all', 'acquisitor', 'tpreparator', 'trainer', 'evaluator', 'ppreparator', 'predictor', 'feedback']),
help='Marvin engine action name')
@click.option('--max-workers', '-w', help='Max Workers', default=None)
@click.option('--max-rpc-workers', '-rw', help='Max gRPC Workers', default=None)
@click.option('--executor-path', '-e', help='Marvin engine executor jar path', type=click.Path(exists=True))
@click.option('--extra-executor-parameters', '-jvm', help='Use to send extra JVM parameters to engine executor process')
@click.option('--benchmark', '-b', default=False, is_flag=True, help='Run benchmark.')
@click.pass_context
def http(ctx, grpchost, grpcport, host, port, protocol, action, max_workers,
max_rpc_workers, executor_path, extra_executor_parameters, benchmark):
rc = RemoteCalls(grpchost, grpcport)
try:
rc.run_grpc(action, max_workers, max_rpc_workers)
time.sleep(3)
except:
print("Could not start grpc server!")
sys.exit(1)
bench_thread = None
try:
if not executor_path:
executor_path = get_executor_path_or_download(ctx.obj['executor_url'])
command_list = ['java']
command_list.append('-DmarvinConfig.engineHome={}'.format(os.getcwd()))
command_list.append('-DmarvinConfig.ipAddress={}'.format(host))
command_list.append('-DmarvinConfig.port={}'.format(port))
command_list.append('-DmarvinConfig.protocol={}'.format(protocol))
if extra_executor_parameters:
command_list.append(extra_executor_parameters)
command_list.append('-jar')
command_list.append(executor_path)
if benchmark:
logger.info("Init benchmark...")
timestamp = generate_timestamp()
bench_thread = benchmark_thread(ctx.obj['package_name'], timestamp)
bench_thread.start()
httpserver = subprocess.Popen(command_list)
except:
logger.error("Could not start http server!")
rc.stop_grpc()
if benchmark:
bench_thread.terminate()
sys.exit(1)
try:
while True:
time.sleep(100)
except KeyboardInterrupt:
logger.info("Terminating http and grpc servers...")
rc.stop_grpc()
httpserver.terminate() if httpserver else None
logger.info("Http and grpc servers terminated!")
if benchmark:
bench_thread.terminate()
logger.info("Benchmark terminated!")
sys.exit(0)
@cli.command('engine-bumpversion', help='Bump, commit and tag engine version.')
@click.option('--verbose', is_flag=True)
@click.option('--dry-run', is_flag=True)
@click.option(
'--part',
'-p',
default='patch',
type=click.Choice(['major', 'minor', 'patch']),
help='The part of the version to increase.')
def bumpversion(verbose, dry_run, part):
bump_version(part, verbose, dry_run)
POI_LABELS = {
'acquisitor': 'ac',
'tpreparator': 'tp',
'trainer': 't',
'evaluator': 'e'
}
def _sleep(sec):
logger.info("Sleeping for {0} seconds...".format(sec))
time.sleep(5)
@cli.command("benchmark", help="Collect engine benchmark stats.")
@click.option('--grpchost', '-gh', help='gRPC Host Address', default='localhost')
@click.option('--grpcport', '-gp', help='gRPC Port', default='50057')
@click.option('--profiling', '-p', default=False, is_flag=True, help='Deterministic profiling of user code.')
@click.option('--delay', '-d', default=False, is_flag=True, help='Delay the benchmark for 5 seconds.')
@click.pass_context
def btest(ctx, grpchost, grpcport, profiling, delay):
timestamp = generate_timestamp()
b_thread = benchmark_thread(ctx.obj['package_name'], timestamp)
actions = ['acquisitor', 'tpreparator', 'trainer',
'evaluator']
rc = RemoteCalls(grpchost, grpcport)
initial_time = time.time()
b_thread.start()
if delay:
_sleep(5)
for action in actions:
logger.info("Executing {0} action...".format(action))
create_poi('{0}-i'.format(POI_LABELS[action]),
time.time() - initial_time,
timestamp)
rc.run_dryrun(action, profiling)
create_poi('{0}-e'.format(POI_LABELS[action]),
time.time() - initial_time,
timestamp)
if delay:
_sleep(5)
b_thread.terminate()
sys.exit(0)
PLOTS = {
'cpu': 'CPU Usage',
'memory': 'Memory Usage',
'r_disk': 'Disk read',
'w_disk': 'Disk write',
'r_net': 'Network received',
't_net': 'Network transfered'
}
@cli.command("benchmark-plot", help="Plot engine benchmark stats.")
@click.option('--protocol', '-p', prompt='Protocol',
help='Unique protocol from poi and benchmark files.')
def plot(protocol):
options = (
'cpu',
'memory',
'r_disk',
'w_disk',
'r_net',
't_net'
)
try:
while(True):
print('1 - CPU Usage')
print('2 - Memory Usage')
print('3 - Disk read')
print('3 - Disk write')
print('4 - Network received')
print('5 - Network transfered')
print('Press Ctrl-c to end.')
option = int(input('Option:'))
if option < 1 or option > 5:
logger.error('Option not available.')
sys.exit(1)
op_name = options[option - 1]
label = PLOTS[op_name]
make_graph(op_name, label, protocol)
except KeyboardInterrupt:
sys.exit(0)