blob: 651456835fb557a689be20ada544f65271649e05 [file] [log] [blame]
from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest, mock
import jira_utils
from mock import patch, mock_open, Mock
from .jira_manager import JiraManager
from datetime import datetime
MOCKED_DEP_CURRENT_VERSION = '0.1.0'
MOCKED_DEP_LATEST_VERSION = '1.0.0'
class MockedJiraIssue:
def __init__(self, key, summary, description, status):
self.key = key
self.fields = self.MockedJiraIssueFields(summary, description, status)
class MockedJiraIssueFields:
def __init__(self, summary, description, status):
self.summary = summary
self.description = description
self.status = self.MockedJiraIssueStatus(status)
self.fixVersions = [self.MockedJiraIssueFixVersions()]
if status == 'Closed':
self.resolutiondate = '1999-01-01T00:00:00'
class MockedJiraIssueStatus:
def __init__(self, status):
self.name = status
class MockedJiraIssueFixVersions:
def __init__(self):
self.name = '2.8.0'
@patch('jira_utils.jira_manager.JiraClient')
class JiraManagerTest(unittest.TestCase):
"""Tests for `jira_manager.py`."""
def setUp(self):
print("\n\nTest : " + self._testMethodName)
@patch('jira_utils.jira_manager.datetime', Mock(today=Mock(return_value=datetime.strptime('2000-01-01', '%Y-%m-%d'))))
def test_run_with_creating_new_issue(self, *args):
"""
Test JiraManager.run on creating a new issue.
Expect: jira.create_issue is called once with certain parameters.
"""
owners_yaml = """
deps:
dep0:
owners: owner0, owner1 , owner2,
"""
with patch('builtins.open', mock_open(read_data=owners_yaml)):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run('dep0',
MOCKED_DEP_CURRENT_VERSION,
MOCKED_DEP_LATEST_VERSION,
'Python')
manager.jira.create_issue.assert_called_once_with(self._get_experct_summary('dep0'),
['dependencies'],
self._get_expected_description('dep0', MOCKED_DEP_LATEST_VERSION, ['owner0', 'owner1', 'owner2']),
)
@patch('jira_utils.jira_manager.datetime', Mock(today=Mock(return_value=datetime.strptime('2000-01-01', '%Y-%m-%d'))))
def test_run_with_updating_existing_task(self, *args):
"""
Test JiraManager.run on updating an existing issue.
Expect: jira.update_issue is called once.
"""
dep_name = 'dep0'
owners_yaml = """
deps:
dep0:
owners:
"""
summary = self._get_experct_summary(dep_name)
description = self._get_expected_description(dep_name, MOCKED_DEP_LATEST_VERSION, [])
with patch('builtins.open', mock_open(read_data=owners_yaml)):
with patch('jira_utils.jira_manager.JiraManager._search_issues',
return_value=[MockedJiraIssue('BEAM-1000', summary, description, 'Open')]):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run(dep_name,
MOCKED_DEP_CURRENT_VERSION,
MOCKED_DEP_LATEST_VERSION,
'Python')
manager.jira.update_issue.assert_called_once()
@patch('jira_utils.jira_manager.datetime', Mock(today=Mock(return_value=datetime.strptime('2000-01-01', '%Y-%m-%d'))))
def test_run_with_creating_new_subtask(self, *args):
"""
Test JiraManager.run on creating a new sub-task.
Expect: jira.create_issue is called once with certain parameters.
"""
dep_name = 'group0:artifact0'
owners_yaml = """
deps:
group0:artifact0:
group: group0
artifact: artifact0
owners: owner0
"""
summary = self._get_experct_summary('group0')
description = self._get_expected_description(dep_name, MOCKED_DEP_LATEST_VERSION, [])
with patch('builtins.open', mock_open(read_data=owners_yaml)):
with patch('jira_utils.jira_manager.JiraManager._search_issues',
side_effect = [[MockedJiraIssue('BEAM-1000', summary, description, 'Open')],
[]]):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run(dep_name,
MOCKED_DEP_CURRENT_VERSION,
MOCKED_DEP_LATEST_VERSION,
'Java',
group_id='group0')
manager.jira.create_issue.assert_called_once_with(self._get_experct_summary(dep_name),
['dependencies'],
self._get_expected_description(dep_name, MOCKED_DEP_LATEST_VERSION, ['owner0']),
parent_key='BEAM-1000',
)
@patch('jira_utils.jira_manager.datetime', Mock(today=Mock(return_value=datetime.strptime('2000-01-01', '%Y-%m-%d'))))
@patch('jira_utils.jira_manager.JiraClient.create_issue', side_effect = [MockedJiraIssue('BEAM-2000', 'summary', 'description', 'Open')])
def test_run_with_reopening_existing_parent_issue(self, *args):
"""
Test JiraManager.run on reopening a parent issue.
Expect: jira.reopen_issue is called once.
"""
dep_name = 'group0:artifact0'
owners_yaml = """
deps:
group0:artifact0:
group: group0
artifact: artifact0
owners: owner0
"""
summary = self._get_experct_summary('group0')
description = self._get_expected_description(dep_name, MOCKED_DEP_LATEST_VERSION, [])
with patch('builtins.open', mock_open(read_data=owners_yaml)):
with patch('jira_utils.jira_manager.JiraManager._search_issues',
side_effect = [[MockedJiraIssue('BEAM-1000', summary, description, 'Closed')],
[]]):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run(dep_name,
MOCKED_DEP_CURRENT_VERSION,
MOCKED_DEP_LATEST_VERSION,
sdk_type='Java',
group_id='group0')
manager.jira.reopen_issue.assert_called_once()
@patch('jira_utils.jira_manager.datetime', Mock(today=Mock(return_value=datetime.strptime('2000-01-01', '%Y-%m-%d'))))
@patch.object(jira_utils.jira_manager.JiraManager,
'_get_next_release_version', side_effect=['2.8.0.dev'])
def test_run_with_reopening_issue_with_fixversions(self, *args):
"""
Test JiraManager.run on reopening an issue when JIRA fixVersions hits the release version.
Expect: jira.reopen_issue is called once.
"""
dep_name = 'dep0'
owners_yaml = """
deps:
dep0:
owners:
"""
summary = self._get_experct_summary(dep_name)
description = self._get_expected_description(dep_name, MOCKED_DEP_LATEST_VERSION, [])
with patch('builtins.open', mock_open(read_data=owners_yaml)):
with patch('jira_utils.jira_manager.JiraManager._search_issues',
side_effect = [[MockedJiraIssue('BEAM-1000', summary, description, 'Closed')],
[]]):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run(dep_name,
MOCKED_DEP_CURRENT_VERSION,
MOCKED_DEP_LATEST_VERSION,
'Python')
manager.jira.reopen_issue.assert_called_once()
@patch.object(jira_utils.jira_manager.JiraManager,
'_get_next_release_version', side_effect=['2.9.0.dev'])
def test_run_with_reopening_issue_with_new_release_available(self, *args):
"""
Test JiraManager.run that reopens an issue once 3 versions releases after 6
months since previous closure.
Expect: jira.reopen_issue is called once.
"""
dep_name = 'dep0'
issue_closed_version = '1.0'
dep_latest_version = '1.3'
owners_yaml = """
deps:
dep0:
owners:
"""
summary = self._get_experct_summary(dep_name)
description = self._get_expected_description(dep_name, issue_closed_version, [])
with patch('builtins.open', mock_open(read_data=owners_yaml)):
with patch('jira_utils.jira_manager.JiraManager._search_issues',
side_effect = [[MockedJiraIssue('BEAM-1000', summary, description, 'Closed')],
[]]):
manager = JiraManager('url', 'username', 'password', owners_yaml)
manager.run(dep_name,
MOCKED_DEP_CURRENT_VERSION,
dep_latest_version,
'Python')
manager.jira.reopen_issue.assert_called_once()
def _get_experct_summary(self, dep_name):
return 'Beam Dependency Update Request: ' + dep_name
def _get_expected_description(self, dep_name, dep_latest_version, owners):
description = """\n\n ------------------------- {0} -------------------------\n
Please consider upgrading the dependency {1}. \n
The current version is {2}. The latest version is {3} \n
cc: """.format(
"2000-01-01 00:00:00",
dep_name,
MOCKED_DEP_CURRENT_VERSION,
dep_latest_version
)
for owner in owners:
description += "[~{0}], ".format(owner)
description += ("\n Please refer to "
"[Beam Dependency Guide |https://beam.apache.org/contribute/dependencies/]"
"for more information. \n"
"Do Not Modify The Description Above. \n")
return description
if __name__ == '__main__':
unittest.main()