blob: ad0b2469e861b87652a2c2bba0d83f4297eb0228 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from datetime import datetime
from mock import MagicMock, patch
from dateutil.tz import tzlocal
from airflow import configuration, AirflowException
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
DESCRIBE_JOB_STEP_RUNNING_RETURN = {
'ResponseMetadata': {
'HTTPStatusCode': 200,
'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
},
'Step': {
'ActionOnFailure': 'CONTINUE',
'Config': {
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
],
'Jar': 'command-runner.jar',
'Properties': {}
},
'Id': 's-VK57YR1Z9Z5N',
'Name': 'calculate_pi',
'Status': {
'State': 'RUNNING',
'StateChangeReason': {},
'Timeline': {
'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
}
DESCRIBE_JOB_STEP_CANCELLED_RETURN = {
'ResponseMetadata': {
'HTTPStatusCode': 200,
'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
},
'Step': {
'ActionOnFailure': 'CONTINUE',
'Config': {
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
],
'Jar': 'command-runner.jar',
'Properties': {}
},
'Id': 's-VK57YR1Z9Z5N',
'Name': 'calculate_pi',
'Status': {
'State': 'CANCELLED',
'StateChangeReason': {},
'Timeline': {
'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
}
DESCRIBE_JOB_STEP_INTERRUPTED_RETURN = {
'ResponseMetadata': {
'HTTPStatusCode': 200,
'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
},
'Step': {
'ActionOnFailure': 'CONTINUE',
'Config': {
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
],
'Jar': 'command-runner.jar',
'Properties': {}
},
'Id': 's-VK57YR1Z9Z5N',
'Name': 'calculate_pi',
'Status': {
'State': 'INTERRUPTED',
'StateChangeReason': {},
'Timeline': {
'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
}
DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
'ResponseMetadata': {
'HTTPStatusCode': 200,
'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
},
'Step': {
'ActionOnFailure': 'CONTINUE',
'Config': {
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
],
'Jar': 'command-runner.jar',
'Properties': {}
},
'Id': 's-VK57YR1Z9Z5N',
'Name': 'calculate_pi',
'Status': {
'State': 'COMPLETED',
'StateChangeReason': {},
'Timeline': {
'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
}
class TestEmrStepSensor(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
self.emr_client_mock = MagicMock()
self.sensor = EmrStepSensor(
task_id='test_task',
poke_interval=1,
job_flow_id='j-8989898989',
step_id='s-VK57YR1Z9Z5N',
aws_conn_id='aws_default',
)
mock_emr_session = MagicMock()
mock_emr_session.client.return_value = self.emr_client_mock
# Mock out the emr_client creator
self.boto3_session_mock = MagicMock(return_value=mock_emr_session)
def test_step_completed(self):
self.emr_client_mock.describe_step.side_effect = [
DESCRIBE_JOB_STEP_RUNNING_RETURN,
DESCRIBE_JOB_STEP_COMPLETED_RETURN
]
with patch('boto3.session.Session', self.boto3_session_mock):
self.sensor.execute(None)
self.assertEqual(self.emr_client_mock.describe_step.call_count, 2)
self.emr_client_mock.describe_step.assert_called_with(
ClusterId='j-8989898989',
StepId='s-VK57YR1Z9Z5N'
)
def test_step_cancelled(self):
self.emr_client_mock.describe_step.side_effect = [
DESCRIBE_JOB_STEP_RUNNING_RETURN,
DESCRIBE_JOB_STEP_CANCELLED_RETURN
]
self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
with patch('boto3.session.Session', self.boto3_session_mock):
self.assertRaises(AirflowException, self.sensor.execute, None)
def test_step_interrupted(self):
self.emr_client_mock.describe_step.side_effect = [
DESCRIBE_JOB_STEP_RUNNING_RETURN,
DESCRIBE_JOB_STEP_INTERRUPTED_RETURN
]
self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
with patch('boto3.session.Session', self.boto3_session_mock):
self.assertRaises(AirflowException, self.sensor.execute, None)
if __name__ == '__main__':
unittest.main()