blob: e12c452b9cd10e26c4d5e187f61d372a9fc5ed71 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2020] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import json
import os
import sys
import time
import os.path
import subprocess
import multiprocessing
from ..common.profiling import profiling
from ..common.data import MarvinData
from ..common.log import get_logger
from ..common.config import Config, load_conf_from_file
logger = get_logger('management.engine')
CLAZZES = {
"acquisitor": "AcquisitorAndCleaner",
"tpreparator": "TrainingPreparator",
"trainer": "Trainer",
"evaluator": "MetricsEvaluator",
"ppreparator": "PredictionPreparator",
"predictor": "Predictor",
"feedback": "Feedback"
}
ARTIFACTS = {
"AcquisitorAndCleaner": [],
"TrainingPreparator": ["initialdataset"],
"Trainer": ["dataset"],
"MetricsEvaluator": ["model"],
"PredictionPreparator": ["model", "metrics"],
"Predictor": ["model", "metrics"],
"Feedback": []
}
def dryrun(config, action, profiling):
# setting spark configuration directory
os.environ["SPARK_CONF_DIR"] = os.path.join(
os.environ["SPARK_HOME"], "conf")
os.environ["YARN_CONF_DIR"] = os.environ["SPARK_CONF_DIR"]
params = read_file('engine.params')
messages_file = read_file('engine.messages')
feedback_file = read_file('feedback.messages')
if action == 'all':
pipeline = ['acquisitor', 'tpreparator', 'trainer',
'evaluator', 'ppreparator', 'predictor', 'feedback']
else:
pipeline = [action]
_dryrun = MarvinDryRun(config=config, messages=[
messages_file, feedback_file])
initial_start_time = time.time()
for step in pipeline:
_dryrun.execute(clazz=CLAZZES[step],
params=params, profiling_enabled=profiling)
logger.info("Total Time : {:.2f}s".format(
time.time() - initial_start_time))
class MarvinDryRun(object):
def __init__(self, config, messages):
self.predictor_messages = messages[0]
self.feedback_messages = messages[1]
self.pmessages = []
self.package_name = config['marvin_package']
self.kwargs = None
def execute(self, clazz, params, profiling_enabled=False):
self.print_start_step(clazz)
_Step = dynamic_import("{}.{}".format(self.package_name, clazz))
if not self.kwargs:
self.kwargs = generate_kwargs(self.package_name, _Step, params)
step = _Step(**self.kwargs)
def call_online_actions(step, msg, msg_idx):
if profiling_enabled:
with profiling(output_path=".profiling", uid=clazz) as prof:
result = step.execute(input_message=msg, params=params)
prof.disable
logger.info(
"\nProfile images created in {}\n".format(prof.image_path))
else:
result = step.execute(input_message=msg, params=params)
return result
if clazz == 'PredictionPreparator':
for idx, msg in enumerate(self.predictor_messages):
self.pmessages.append(call_online_actions(step, msg, idx))
elif clazz == 'Feedback':
for idx, msg in enumerate(self.feedback_messages):
self.pmessages.append(call_online_actions(step, msg, idx))
elif clazz == 'Predictor':
self.execute("PredictionPreparator", params)
self.pmessages = self.messages if not self.pmessages else self.pmessages
for idx, msg in enumerate(self.pmessages):
call_online_actions(step, msg, idx)
else:
if profiling_enabled:
with profiling(output_path=".profiling", uid=clazz) as prof:
step.execute(params=params)
prof.disable
logger.info(
"\nProfile images created in {}\n".format(prof.image_path))
else:
step.execute(params=params)
self.print_finish_step()
def print_finish_step(self):
logger.info("STEP TAKES {:.4f} (seconds) ".format(
(time.time() - self.start_time)))
def print_start_step(self, name):
logger.info("MARVIN DRYRUN - STEP [{}]".format(name))
self.start_time = time.time()
def dynamic_import(clazz):
components = clazz.split('.')
mod = __import__(components[0])
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
def read_file(filename):
fname = os.path.join("", filename)
if os.path.exists(fname):
logger.info("Engine file {} loaded!".format(filename))
with open(fname, 'r') as fp:
return json.load(fp)
else:
logger.info("Engine file {} doesn't exists...".format(filename))
return {}
def generate_kwargs(package_name, clazz, params=None, initial_dataset='initialdataset', dataset='dataset', model='model', metrics='metrics'):
kwargs = {}
kwargs["persistence_mode"] = 'local'
kwargs["default_root_path"] = os.path.join(
os.getenv('MARVIN_DATA_PATH'), '.artifacts')
kwargs["is_remote_calling"] = True
_artifact_folder = package_name.replace(
'marvin_', '').replace('_engine', '')
_artifacts_to_load = ARTIFACTS[clazz.__name__]
if params:
kwargs["params"] = params
if dataset in _artifacts_to_load:
kwargs["dataset"] = clazz.retrieve_obj(os.path.join(kwargs["default_root_path"],
_artifact_folder, dataset))
if initial_dataset in _artifacts_to_load:
kwargs["initial_dataset"] = clazz.retrieve_obj(os.path.join(kwargs["default_root_path"],
_artifact_folder, initial_dataset))
if model in _artifacts_to_load:
kwargs["model"] = clazz.retrieve_obj(os.path.join(kwargs["default_root_path"],
_artifact_folder, model))
if metrics in _artifacts_to_load:
kwargs["metrics"] = clazz.retrieve_obj(os.path.join(kwargs["default_root_path"],
_artifact_folder, metrics))
return kwargs
class MarvinEngineServer(object):
@classmethod
def create(self, config, action, port, workers, rpc_workers, params, pipeline):
package_name = config['marvin_package']
def create_object(act):
clazz = CLAZZES[act]
_Action = dynamic_import("{}.{}".format(package_name, clazz))
kwargs = generate_kwargs(package_name, _Action, params)
return _Action(**kwargs)
root_obj = create_object(action)
previous_object = root_obj
if pipeline:
for step in list(reversed(pipeline)):
previous_object._previous_step = create_object(step)
previous_object = previous_object._previous_step
server = root_obj._prepare_remote_server(
port=port, workers=workers, rpc_workers=rpc_workers)
logger.info(
"Starting GRPC server [{}] for {} Action".format(port, action))
server.start()
return server
def engine_server(config, action, max_workers, max_rpc_workers):
logger.info("Starting server ...")
# setting spark configuration directory
os.environ["SPARK_CONF_DIR"] = os.path.join(
os.environ["SPARK_HOME"], "conf")
os.environ["YARN_CONF_DIR"] = os.environ["SPARK_CONF_DIR"]
params = read_file('engine.params')
metadata = read_file('engine.metadata')
default_actions = {action['name']
: action for action in metadata['actions']}
if action == 'all':
action = default_actions
else:
action = {action: default_actions[action]}
servers = []
for action_name in action.keys():
# initializing server configuration
engine_server = MarvinEngineServer.create(
config=config,
action=action_name,
port=action[action_name]["port"],
workers=max_workers,
rpc_workers=max_rpc_workers,
params=params,
pipeline=action[action_name]["pipeline"]
)
servers.append(engine_server)
return servers