blob: d3023c04280e911224e747f1266e20e5c005f77d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import base64
import json
import time
from urlparse import urlparse
import requests
import cloudpickle
import pytest
import httplib
from flaky import flaky
global session_id, job_id
session_id = None
job_id = None
livy_end_point = os.environ.get("LIVY_END_POINT")
add_file_url = os.environ.get("ADD_FILE_URL")
add_pyfile_url = os.environ.get("ADD_PYFILE_URL")
upload_file_url = os.environ.get("UPLOAD_FILE_URL")
upload_pyfile_url = os.environ.get("UPLOAD_PYFILE_URL")
@pytest.fixture(scope="module", autouse=True)
def after_all(request):
request.addfinalizer(stop_session)
def process_job(job, expected_result, is_error_job=False):
global job_id
pickled_job = cloudpickle.dumps(job)
base64_pickled_job = base64.b64encode(pickled_job).decode('utf-8')
base64_pickled_job_json = json.dumps({'job': base64_pickled_job, 'jobType': 'pyspark'})
request_url = livy_end_point + "/sessions/" + str(session_id) + "/submit-job"
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header, data=base64_pickled_job_json)
assert response.status_code == httplib.CREATED
job_id = response.json()['id']
poll_time = 1
max_poll_time = 30
poll_response = None
while (poll_response is None or poll_response.json()['state'] == 'STARTED') and poll_time < \
max_poll_time:
time.sleep(poll_time)
poll_request_uri = livy_end_point + "/sessions/" + str(session_id) + \
"/jobs/" + str(job_id)
poll_header = {'X-Requested-By': 'livy'}
poll_response = requests.request('GET', poll_request_uri, headers=poll_header)
poll_time *= 2
assert poll_response.json()['id'] == job_id
assert poll_response.status_code == httplib.OK
if not is_error_job:
assert poll_response.json()['error'] is None
result = poll_response.json()['result']
b64_decoded = base64.b64decode(result)
b64_decoded_decoded = base64.b64decode(b64_decoded)
deserialized_object = cloudpickle.loads(b64_decoded_decoded)
assert deserialized_object == expected_result
else:
error = poll_response.json()['error']
assert expected_result in error
def delay_rerun(*args):
time.sleep(10)
return True
def stop_session():
global session_id
request_url = livy_end_point + "/sessions/" + str(session_id)
headers = {'X-Requested-By': 'livy'}
response = requests.request('DELETE', request_url, headers=headers)
assert response.status_code == httplib.OK
def test_create_session():
global session_id
request_url = livy_end_point + "/sessions"
uri = urlparse(request_url)
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
json_data = json.dumps({'kind': 'pyspark', 'conf': {'livy.uri': uri.geturl()}})
response = requests.request('POST', request_url, headers=header, data=json_data)
assert response.status_code == httplib.CREATED
session_id = response.json()['id']
@flaky(max_runs=6, rerun_filter=delay_rerun)
def test_wait_for_session_to_become_idle():
request_url = livy_end_point + "/sessions/" + str(session_id)
header = {'X-Requested-By': 'livy'}
response = requests.request('GET', request_url, headers=header)
assert response.status_code == httplib.OK
session_state = response.json()['state']
assert session_state == 'idle'
def test_spark_job():
def simple_spark_job(context):
elements = [10, 20, 30]
sc = context.sc
return sc.parallelize(elements, 2).count()
process_job(simple_spark_job, 3)
def test_error_job():
def error_job(context):
return "hello" + 1
process_job(error_job,
"TypeError: cannot concatenate 'str' and 'int' objects", True)
def test_reconnect():
global session_id
request_url = livy_end_point + "/sessions/" + str(session_id) + "/connect"
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header)
assert response.status_code == httplib.OK
assert session_id == response.json()['id']
def test_add_file():
add_file_name = os.path.basename(add_file_url)
json_data = json.dumps({'uri': add_file_url})
request_url = livy_end_point + "/sessions/" + str(session_id) + "/add-file"
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header, data=json_data)
assert response.status_code == httplib.OK
def add_file_job(context):
from pyspark import SparkFiles
with open(SparkFiles.get(add_file_name)) as testFile:
file_val = testFile.readline()
return file_val
process_job(add_file_job, "hello from addfile")
def test_add_pyfile():
add_pyfile_name_with_ext = os.path.basename(add_pyfile_url)
add_pyfile_name = add_pyfile_name_with_ext.rsplit('.', 1)[0]
json_data = json.dumps({'uri': add_pyfile_url})
request_url = livy_end_point + "/sessions/" + str(session_id) + "/add-pyfile"
header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'}
response_add_pyfile = requests.request('POST', request_url, headers=header, data=json_data)
assert response_add_pyfile.status_code == httplib.OK
def add_pyfile_job(context):
pyfile_module = __import__ (add_pyfile_name)
return pyfile_module.test_add_pyfile()
process_job(add_pyfile_job, "hello from addpyfile")
def test_upload_file():
upload_file = open(upload_file_url)
upload_file_name = os.path.basename(upload_file.name)
request_url = livy_end_point + "/sessions/" + str(session_id) + "/upload-file"
files = {'file': upload_file}
header = {'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header, files=files)
assert response.status_code == httplib.OK
def upload_file_job(context):
from pyspark import SparkFiles
with open(SparkFiles.get(upload_file_name)) as testFile:
file_val = testFile.readline()
return file_val
process_job(upload_file_job, "hello from uploadfile")
def test_upload_pyfile():
upload_pyfile = open(upload_pyfile_url)
upload_pyfile_name_with_ext = os.path.basename(upload_pyfile.name)
upload_pyfile_name = upload_pyfile_name_with_ext.rsplit('.', 1)[0]
request_url = livy_end_point + "/sessions/" + str(session_id) + "/upload-pyfile"
files = {'file': upload_pyfile}
header = {'X-Requested-By': 'livy'}
response = requests.request('POST', request_url, headers=header, files=files)
assert response.status_code == httplib.OK
def upload_pyfile_job(context):
pyfile_module = __import__ (upload_pyfile_name)
return pyfile_module.test_upload_pyfile()
process_job(upload_pyfile_job, "hello from uploadpyfile")
if __name__ == '__main__':
value = pytest.main([os.path.dirname(__file__)])
if value != 0:
raise Exception("One or more test cases have failed.")