blob: 6eecf896b81cc921ee5f765551e2276dd50dfcf3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import time
import os
import requests
import json
from shutil import copyfile
from subprocess import run, Popen, check_output
from os.path import join as pjoin
import pio_tests.globals as globals
def srun(command):
""" Runs a shell command given as a `str`
Raises: `subprocess.CalledProcessError` when exit code != 0
"""
return run(command, shell=True, stdout=globals.std_out(),
stderr=globals.std_err(), check=True)
def srun_out(command):
""" Runs a shell command given as a `str`
Returns: string with command's output
Raises: `subprocess.CalledProcessError` when exit code != 0
"""
return check_output(command, shell=True, universal_newlines=True,
stderr=globals.std_err())
def srun_bg(command):
""" Runs a shell command given as a `str` in the background
Returns: (obj: `subprocess.Popen`) for executed process
"""
return Popen(command, shell=True, stdout=globals.std_out(),
stderr=globals.std_err())
def repository_dirname(template):
""" Utility function getting repository name from the link
Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
"""
return template.split('/')[-1]
def obtain_template(engine_dir, template):
"""Given a directory with engines and a template downloads an engine
if necessary
Args:
engine_dir (str): directory where engines are stored
template (str): either the name of an engine from the engines directory
or a link to repository with the engine
Returns: str with the engine's path
"""
if re.match('^https?:\/\/', template):
dest_dir = pjoin(engine_dir, repository_dirname(template))
if not os.path.exists(dest_dir):
srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
return dest_dir
else:
# check if exists
dest_dir = pjoin(engine_dir, template)
if not os.path.exists(dest_dir):
raise ValueError('Engine {0} does not exist in {1}'
.format(template, engine_dir))
return dest_dir
def pio_app_list():
"""Returns: a list of dicts for every application with the following keys:
`name`, `id`, `access_key`, `allowed_events`
"""
output = srun_out('pio app list').rstrip()
return [ { 'name': line[2], 'id': int(line[4]),
'access_key': line[6], 'allowed_events': line[8] }
for line in [x.split() for x in output.split('\n')[1:-1]] ]
def get_app_eventserver_url_json(test_context):
return 'http://{}:{}/events.json'.format(
test_context.es_ip, test_context.es_port)
def get_engine_url_json(engine_ip, engine_port):
return 'http://{}:{}/queries.json'.format(
engine_ip, engine_port)
def send_event(event, test_context, access_key, channel=None):
""" Sends an event to the eventserver
Args:
event: json-like dictionary describing an event
test_context (obj: `TestContext`):
access_key: application's access key
channel (str): custom channel for storing event
Returns: `requests.Response`
"""
url = get_app_eventserver_url_json(test_context)
params = { 'accessKey': access_key }
if channel: params['channel'] = channel
return requests.post(
url,
params=params,
json=event)
def send_events_batch(events, test_context, access_key, channel=None):
""" Send events in batch via REST to the eventserver
Args:
events: a list of json-like dictionaries for events
test_context (obj: `TestContext`):
access_key: application's access key
channel (str): custom channel for storing event
Returns: `requests.Response`
Requires: Events length must not exceed length of 50
http://predictionio.apache.org/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
"""
url = 'http://{}:{}/batch/events.json'.format(
test_context.es_ip, test_context.es_port)
params = { 'accessKey': access_key }
if channel: params['channel'] = channel
return requests.post(
url,
params=params,
json=events)
def import_events_batch(events, test_context, appid, channel=None):
""" Imports events in batch from file with `pio import`
Args:
events: a list of json-like dictionaries for events
test_context (obj: `TestContext`)
appid (int): application's id
channel (str): custom channel for storing event
"""
# Writing events list to temporary file.
# `pio import` requires each line of input file to be a JSON string
# representing an event. Empty lines are not allowed.
contents = ''
for ev in events:
contents += '{}\n'.format(json.dumps(ev))
contents.rstrip('\n')
file_path = pjoin(test_context.data_directory, 'events.json.tmp')
try:
with open(file_path, 'w') as f:
f.write(contents)
srun('pio import --appid {} --input {} {} -- {}'.format(
appid,
file_path,
'--channel {}'.format(channel) if channel else '',
'--conf spark.sql.warehouse.dir=file:///tmp/spark-warehouse'))
finally:
os.remove(file_path)
def get_events(test_context, access_key, params={}):
""" Gets events for some application
Args:
test_context (obj: `TestContext`)
access_key (str):
params (dict): special parameters for eventserver's GET, e.g:
'limit', 'reversed', 'event'. See the docs
Returns: `requests.Response`
"""
url = get_app_eventserver_url_json(test_context)
return requests.get(url, params=dict({'accessKey': access_key}, **params))
def query_engine(data, engine_ip='localhost', engine_port=8000):
""" Send a query to deployed engine
Args:
data (dict): json-like dictionary being an input to an engine
access_key (str):
engine_ip (str): ip of deployed engine
engine_port (int): port of deployed engine
Returns: `requests.Response`
"""
url = get_engine_url_json(engine_ip, engine_port)
return requests.post(url, json=data)
class AppEngine:
""" This is a utility class simplifying all app related interactions.
Basically it is just a wrapper on other utility functions and shell
scripts.
"""
def __init__(self, test_context, app_context, already_created=False):
""" Args:
test_context (obj: `TestContext`)
app_context (obj: `AppContext`)
already_created (bool): True if the given app has been already added
"""
self.test_context = test_context
self.app_context = app_context
self.engine_path = obtain_template(
self.test_context.engine_directory, app_context.template)
self.deployed_process = None
if already_created:
self.__init_info()
else:
self.id = None
self.access_key = None
self.description = None
if self.app_context.engine_json_path:
self.__copy_engine_json()
def __copy_engine_json(self):
to_path = pjoin(self.engine_path, 'engine.json')
copyfile(self.app_context.engine_json_path, to_path)
def __init_info(self):
info = self.show()
self.id = info['id']
self.access_key = info['access_key']
self.description = info['description']
def new(self, id=None, description=None, access_key=None):
""" Creates a new application with given parameters """
srun('pio app new {} {} {} {}'.format(
'--id {}'.format(id) if id else '',
'--description \"{}\"'.format(description) if description else '',
'--access-key {}'.format(access_key) if access_key else '',
self.app_context.name))
self.__init_info()
def show(self):
""" Returns: application info in dictionary with the keys:
`name`: str, `id`: int, `description`: str,
`access_key`: str, `allowed_events`: str
"""
output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
lines = [x.split() for x in output.split('\n')]
return { 'name': lines[0][3],
'id': int(lines[1][4]),
'description': lines[2][3] if len(lines[2]) >= 4 else '',
'access_key': lines[3][4],
'allowed_events': lines[3][5] }
# deletes this app from pio
def delete(self):
srun('pio app delete {0} --force'.format(self.app_context.name))
def build(self, sbt_extra=None, clean=False, no_asm=True, engine_dir=None):
srun('cd {0}; pio build {1} {2} {3} {4}'.format(
self.engine_path,
'--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
'--clean' if clean else '',
'--no-asm' if no_asm else '',
'--engine-dir {}'.format(engine_dir) if engine_dir else ''))
def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
stop_after_prepare=False, engine_factory=None,
engine_params_key=None, scratch_uri=None, engine_dir=None):
srun('cd {0}; pio train {1} {2} {3} {4} {5} {6} {7} {8}'.format(
self.engine_path,
'--batch {}'.format(batch) if batch else '',
'--skip-sanity-check' if skip_sanity_check else '',
'--stop-after-read' if stop_after_read else '',
'--stop-after-prepare' if stop_after_prepare else '',
'--engine_factory {}'.format(engine_factory) if engine_factory else '',
'--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
'--scratch-uri {}'.format(scratch_uri) if scratch_uri else '',
'--engine-dir {}'.format(engine_dir) if engine_dir else ''))
def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
batch=None, scratch_uri=None, engine_dir=None):
command = 'cd {0}; pio deploy {1} {2} {3} {4} {5} {6} {7} {8} {9} {10}'.format(
self.engine_path,
'--ip {}'.format(ip) if ip else '',
'--port {}'.format(port) if port else '',
'--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
'--feedback' if feedback else '',
'--accesskey {}'.format(accesskey) if accesskey else '',
'--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
'--event-server-port {}'.format(event_server_port) if event_server_port else '',
'--batch {}'.format(bach) if batch else '',
'--scratch-uri {}'.format(scratch_uri) if scratch_uri else '',
'--engine-dir {}'.format(engine_dir) if engine_dir else '')
self.deployed_process = srun_bg(command)
time.sleep(wait_time)
if self.deployed_process.poll() is not None:
raise Exception('Application engine terminated')
self.ip = ip if ip else 'localhost'
self.port = port if port else 8000
def stop(self):
""" Kills deployed engine """
if self.deployed_process:
self.deployed_process.kill()
def new_channel(self, channel):
srun('pio app channel-new {0}'.format(channel))
def delete_channel(self, channel):
srun('pio app channel-delete {0} --force'.format(channel))
def send_event(self, event):
return send_event(event, self.test_context, self.access_key)
def send_events_batch(self, events):
return send_events_batch(events, self.test_context, self.access_key)
def import_events_batch(self, events):
return import_events_batch(events, self.test_context, self.id)
def get_events(self, params={}):
return get_events(self.test_context, self.access_key, params)
def delete_data(self, delete_all=True, channel=None):
srun('pio app data-delete {0} {1} {2} --force'
.format(
self.app_context.name,
'--all' if delete_all else '',
'--channel ' + channel if channel is not None else ''))
def query(self, data):
return query_engine(data, self.ip, self.port)