blob: 56df9fe091ad395c44af5c16c04d75e5efa4c66c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unittest for testing utilities,"""
from __future__ import absolute_import
import logging
import os
import tempfile
import unittest
import mock
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing import test_utils as utils
class TestUtilsTest(unittest.TestCase):
def setUp(self):
utils.patch_retry(self, utils)
self.tmpdir = tempfile.mkdtemp()
def test_delete_files_succeeds(self):
path = os.path.join(self.tmpdir, 'f1')
with open(path, 'a') as f:
f.write('test')
assert FileSystems.exists(path)
utils.delete_files([path])
assert not FileSystems.exists(path)
def test_delete_files_fails_with_io_error(self):
path = os.path.join(self.tmpdir, 'f2')
with self.assertRaises(BeamIOError) as error:
utils.delete_files([path])
self.assertTrue(
error.exception.args[0].startswith('Delete operation failed'))
self.assertEqual(list(error.exception.exception_details.keys()), [path])
def test_delete_files_fails_with_invalid_arg(self):
with self.assertRaises(RuntimeError):
utils.delete_files([])
def test_temp_dir_removes_files(self):
with utils.TempDir() as tempdir:
dir_path = tempdir.get_path()
file_path = tempdir.create_temp_file()
self.assertTrue(os.path.exists(dir_path))
self.assertTrue(os.path.exists(file_path))
self.assertFalse(os.path.exists(dir_path))
self.assertFalse(os.path.exists(file_path))
def test_temp_file_field_correct(self):
with utils.TempDir() as tempdir:
filename = tempdir.create_temp_file(
suffix='.txt',
lines=['line1\n', 'line2\n', 'line3\n'])
self.assertTrue(filename.endswith('.txt'))
with open(filename, 'rb') as f:
self.assertEqual(f.readline(), 'line1\n')
self.assertEqual(f.readline(), 'line2\n')
self.assertEqual(f.readline(), 'line3\n')
@mock.patch('time.sleep', return_value=None)
def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
sub1 = mock.MagicMock()
sub1.exists.return_value = True
sub2 = mock.MagicMock()
sub2.exists.return_value = False
with self.assertRaises(RuntimeError) as error:
utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
self.assertTrue(sub1.exists.called)
self.assertTrue(sub2.exists.called)
self.assertTrue(error.exception.args[0].startswith('Timeout after'))
@mock.patch('time.sleep', return_value=None)
def test_wait_for_topics_created_fails(self, patched_time_sleep):
topic1 = mock.MagicMock()
topic1.exists.return_value = True
topic2 = mock.MagicMock()
topic2.exists.return_value = False
with self.assertRaises(RuntimeError) as error:
utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
self.assertTrue(topic1.exists.called)
self.assertTrue(topic2.exists.called)
self.assertTrue(error.exception.args[0].startswith('Timeout after'))
@mock.patch('time.sleep', return_value=None)
def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
sub1 = mock.MagicMock()
sub1.exists.return_value = True
self.assertTrue(
utils.wait_for_subscriptions_created([sub1], timeout=0.1))
@mock.patch('time.sleep', return_value=None)
def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
topic1 = mock.MagicMock()
topic1.exists.return_value = True
self.assertTrue(
utils.wait_for_subscriptions_created([topic1], timeout=0.1))
self.assertTrue(topic1.exists.called)
def test_cleanup_subscriptions(self):
mock_sub = mock.MagicMock()
mock_sub.exist.return_value = True
utils.cleanup_subscriptions([mock_sub])
self.assertTrue(mock_sub.delete.called)
def test_cleanup_topics(self):
mock_topics = mock.MagicMock()
mock_topics.exist.return_value = True
utils.cleanup_subscriptions([mock_topics])
self.assertTrue(mock_topics.delete.called)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()