| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit tests for the test pipeline verifiers""" |
| |
| # pytype: skip-file |
| |
| import logging |
| import os |
| import tempfile |
| import unittest |
| |
| from hamcrest import assert_that as hc_assert_that |
| from mock import Mock |
| from mock import patch |
| |
| from apache_beam.io.localfilesystem import LocalFileSystem |
| from apache_beam.runners.runner import PipelineResult |
| from apache_beam.runners.runner import PipelineState |
| from apache_beam.testing import pipeline_verifiers as verifiers |
| from apache_beam.testing.test_utils import patch_retry |
| |
| try: |
| # pylint: disable=wrong-import-order, wrong-import-position |
| # pylint: disable=ungrouped-imports |
| from apitools.base.py.exceptions import HttpError |
| from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem |
| except ImportError: |
| HttpError = None |
| GCSFileSystem = None # type: ignore |
| |
| |
| class PipelineVerifiersTest(unittest.TestCase): |
| def setUp(self): |
| self._mock_result = Mock() |
| patch_retry(self, verifiers) |
| |
| def test_pipeline_state_matcher_success(self): |
| """Test PipelineStateMatcher successes when using default expected state |
| and job actually finished in DONE |
| """ |
| pipeline_result = PipelineResult(PipelineState.DONE) |
| hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) |
| |
| def test_pipeline_state_matcher_given_state(self): |
| """Test PipelineStateMatcher successes when matches given state""" |
| pipeline_result = PipelineResult(PipelineState.FAILED) |
| hc_assert_that( |
| pipeline_result, verifiers.PipelineStateMatcher(PipelineState.FAILED)) |
| |
| def test_pipeline_state_matcher_fails(self): |
| """Test PipelineStateMatcher fails when using default expected state |
| and job actually finished in CANCELLED/DRAINED/FAILED/UNKNOWN |
| """ |
| failed_state = [ |
| PipelineState.CANCELLED, |
| PipelineState.DRAINED, |
| PipelineState.FAILED, |
| PipelineState.UNKNOWN |
| ] |
| |
| for state in failed_state: |
| pipeline_result = PipelineResult(state) |
| with self.assertRaises(AssertionError): |
| hc_assert_that(pipeline_result, verifiers.PipelineStateMatcher()) |
| |
| test_cases = [ |
| { |
| 'content': 'Test FileChecksumMatcher with single file', |
| 'num_files': 1, |
| 'expected_checksum': 'ebe16840cc1d0b4fe1cf71743e9d772fa31683b8' |
| }, |
| { |
| 'content': 'Test FileChecksumMatcher with multiple files', |
| 'num_files': 3, |
| 'expected_checksum': '58b3d3636de3891ac61afb8ace3b5025c3c37d44' |
| }, |
| { |
| 'content': '', |
| 'num_files': 1, |
| 'expected_checksum': 'da39a3ee5e6b4b0d3255bfef95601890afd80709' |
| }, |
| ] |
| |
| def create_temp_file(self, content, directory=None): |
| with tempfile.NamedTemporaryFile(delete=False, dir=directory) as f: |
| f.write(content.encode('utf-8')) |
| return f.name |
| |
| def test_file_checksum_matcher_success(self): |
| for case in self.test_cases: |
| temp_dir = tempfile.mkdtemp() |
| for _ in range(case['num_files']): |
| self.create_temp_file(case['content'], temp_dir) |
| matcher = verifiers.FileChecksumMatcher( |
| os.path.join(temp_dir, '*'), case['expected_checksum']) |
| hc_assert_that(self._mock_result, matcher) |
| |
| @patch.object(LocalFileSystem, 'match') |
| def test_file_checksum_matcher_read_failed(self, mock_match): |
| mock_match.side_effect = IOError('No file found.') |
| matcher = verifiers.FileChecksumMatcher( |
| os.path.join('dummy', 'path'), Mock()) |
| with self.assertRaises(IOError): |
| hc_assert_that(self._mock_result, matcher) |
| self.assertTrue(mock_match.called) |
| self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) |
| |
| @patch.object(GCSFileSystem, 'match') |
| @unittest.skipIf(HttpError is None, 'google-apitools is not installed') |
| def test_file_checksum_matcher_service_error(self, mock_match): |
| mock_match.side_effect = HttpError( |
| response={'status': '404'}, |
| url='', |
| content='Not Found', |
| ) |
| matcher = verifiers.FileChecksumMatcher('gs://dummy/path', Mock()) |
| with self.assertRaises(HttpError): |
| hc_assert_that(self._mock_result, matcher) |
| self.assertTrue(mock_match.called) |
| self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) |
| |
| def test_file_checksum_matchcer_invalid_sleep_time(self): |
| with self.assertRaises(ValueError) as cm: |
| verifiers.FileChecksumMatcher( |
| 'file_path', 'expected_checksum', 'invalid_sleep_time') |
| self.assertEqual( |
| cm.exception.args[0], |
| 'Sleep seconds, if received, must be int. ' |
| 'But received: \'invalid_sleep_time\', ' |
| '{}'.format(str)) |
| |
| @patch('time.sleep', return_value=None) |
| def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep): |
| temp_dir = tempfile.mkdtemp() |
| case = self.test_cases[0] |
| self.create_temp_file(case['content'], temp_dir) |
| matcher = verifiers.FileChecksumMatcher( |
| os.path.join(temp_dir, '*'), case['expected_checksum'], 10) |
| hc_assert_that(self._mock_result, matcher) |
| self.assertTrue(mocked_sleep.called) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |