blob: b6426ae10b691808d0b0f81d9aa9e2f967e79736 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import pytest
import responses
import socket
import tempfile
import threading
import zipfile
from configparser import NoOptionError
from livy.client import HttpClient
session_id = 0
job_id = 1
# Make sure host name is lower case. See LIVY-582
base_uri = 'http://{0}:{1}'.format(socket.gethostname().lower(), 8998)
client_test = None
invoked_queued_callback = False
invoked_running_callback = False
@responses.activate
def mock_and_validate_create_new_session(defaults):
global client_test
create_session_request_mock_uri = base_uri + "/sessions/"
app_name = 'Test App'
conf_dict = {'spark.app.name': app_name}
json_data = {
u'kind': u'pyspark', u'log': [], u'proxyUser': None,
u'state': u'starting', u'owner': None, u'id': session_id
}
responses.add(responses.POST, create_session_request_mock_uri,
json=json_data, status=201, content_type='application/json')
client_test = HttpClient(base_uri, conf_dict=conf_dict,
load_defaults=defaults)
assert client_test._config.get(client_test._CONFIG_SECTION,
'spark.app.name') == app_name
if defaults:
assert client_test._config.has_option(client_test._CONFIG_SECTION,
'spark.config')
assert client_test._config.get(client_test._CONFIG_SECTION,
'spark.config') == 'override'
def mock_submit_job_and_poll_result(
job,
job_state,
result=None,
error=None
):
submit_request_mock_uri = base_uri + "/sessions/" + str(session_id) \
+ "/submit-job"
poll_request_mock_uri = base_uri + "/sessions/" + str(session_id) \
+ "/jobs/" + str(job_id)
post_json_data = {
u'state': u'SENT', u'error': None, u'id': job_id, u'result': None
}
responses.add(responses.POST, submit_request_mock_uri, status=201,
json=post_json_data, content_type='application/json')
get_json_data = {
u'state': job_state, u'error': error, u'id': job_id, u'result': result
}
responses.add(responses.GET, poll_request_mock_uri, status=200,
json=get_json_data, content_type='application/json')
submit_job_future = client_test.submit(job)
return submit_job_future
def mock_file_apis(job_command, job_func, job_func_arg):
request_uri = base_uri + "/sessions/" + str(session_id) + \
"/" + job_command
responses.add(responses.POST, request_uri, status=201, body='',
content_type='application/json')
test_file_api_future = job_func(job_func_arg)
return test_file_api_future
def simple_spark_job(context):
elements = [10, 20, 30]
sc = context.sc
return sc.parallelize(elements, 2).count()
def failure_job(context):
return "hello" + 1
def test_create_new_session_without_default_config():
mock_and_validate_create_new_session(False)
def test_create_new_session_with_default_config():
os.environ["LIVY_CLIENT_CONF_DIR"] = \
os.path.dirname(os.path.abspath(__file__)) + "/resources"
mock_and_validate_create_new_session(True)
def test_connect_to_existing_session():
reconnect_mock_request_uri = base_uri + "/sessions/" + str(session_id) + \
"/connect"
reconnect_session_uri = base_uri + "/sessions/" + str(session_id)
json_data = {
u'kind': u'pyspark', u'log': [], u'proxyUser': None,
u'state': u'starting', u'owner': None, u'id': session_id
}
with responses.RequestsMock() as rsps:
rsps.add(responses.POST, reconnect_mock_request_uri, json=json_data,
status=201, content_type='application/json')
client_reconnect = HttpClient(reconnect_session_uri,
load_defaults=False)
assert isinstance(client_reconnect, HttpClient)
with pytest.raises(NoOptionError):
client_reconnect._config.get(client_reconnect._CONFIG_SECTION,
'spark.app.name') == 'Test App'
def create_test_archive(ext):
(fd, path) = tempfile.mkstemp(suffix=ext)
os.close(fd)
zipfile.ZipFile(path, mode='w').close()
return path
@responses.activate
def test_submit_job_verify_running_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
u'STARTED')
lock = threading.Event()
def handle_job_running_callback(f):
global invoked_running_callback
invoked_running_callback = f.running()
lock.set()
submit_job_future.add_running_callback(handle_job_running_callback)
lock.wait(15)
assert invoked_running_callback
@responses.activate
def test_submit_job_verify_queued_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
u'QUEUED')
lock = threading.Event()
def handle_job_queued_callback(f):
global invoked_queued_callback
invoked_queued_callback = f.queued()
lock.set()
submit_job_future.add_queued_callback(handle_job_queued_callback)
lock.wait(15)
assert invoked_queued_callback
@responses.activate
def test_submit_job_verify_succeeded_state():
submit_job_future = mock_submit_job_and_poll_result(simple_spark_job,
u'SUCCEEDED',
result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1')
result = submit_job_future.result(15)
assert result == '[100, 200, 300, 400]'
@responses.activate
def test_submit_job_verify_failed_state():
submit_job_future = mock_submit_job_and_poll_result(failure_job, u'FAILED',
error='Error job')
exception = submit_job_future.exception(15)
assert isinstance(exception, Exception)
@responses.activate
def test_add_file():
file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \
"/resources/text_file.txt"
add_file_future = mock_file_apis('add-file', client_test.add_file,
file_uri)
add_file_future.result(15)
assert add_file_future.done()
@responses.activate
def test_upload_file():
file_path = os.path.dirname(os.path.abspath(__file__)) + \
"/resources/text_file.txt"
upload_file_future = mock_file_apis('upload-file', client_test.upload_file,
file_path)
upload_file_future.result(15)
assert upload_file_future.done()
@responses.activate
def test_add_pyfile():
file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \
"/resources/zip_file.zip"
add_file_future = mock_file_apis('add-pyfile', client_test.add_pyfile,
file_uri)
add_file_future.result(15)
assert add_file_future.done()
@responses.activate
def test_upload_pyfile():
file_path = create_test_archive('.zip')
pyfile_future = mock_file_apis('upload-pyfile', client_test.upload_pyfile,
file_path)
pyfile_future.result(15)
assert pyfile_future.done()
@responses.activate
def test_add_jar():
file_uri = 'file:' + create_test_archive('.jar')
add_file_future = mock_file_apis('add-jar', client_test.add_jar, file_uri)
add_file_future.result(15)
assert add_file_future.done()