| # -*- coding: utf-8 -*- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit tests for LocalFileSystem.""" |
| |
| # pytype: skip-file |
| |
| import filecmp |
| import logging |
| import os |
| import shutil |
| import tempfile |
| import unittest |
| |
| import mock |
| from parameterized import param |
| from parameterized import parameterized |
| |
| from apache_beam.io import localfilesystem |
| from apache_beam.io.filesystem import BeamIOError |
| from apache_beam.options.pipeline_options import PipelineOptions |
| |
| |
| def _gen_fake_join(separator): |
| """Returns a callable that joins paths with the given separator.""" |
| def _join(first_path, *paths): |
| return separator.join((first_path.rstrip(separator), ) + paths) |
| |
| return _join |
| |
| |
| def _gen_fake_split(separator): |
| """Returns a callable that splits a with the given separator.""" |
| def _split(path): |
| sep_index = path.rfind(separator) |
| if sep_index >= 0: |
| return (path[:sep_index], path[sep_index + 1:]) |
| else: |
| return (path, '') |
| |
| return _split |
| |
| |
| class LocalFileSystemTest(unittest.TestCase): |
| def setUp(self): |
| self.tmpdir = tempfile.mkdtemp() |
| pipeline_options = PipelineOptions() |
| self.fs = localfilesystem.LocalFileSystem(pipeline_options) |
| |
| def tearDown(self): |
| shutil.rmtree(self.tmpdir) |
| |
| def test_scheme(self): |
| self.assertIsNone(self.fs.scheme()) |
| self.assertIsNone(localfilesystem.LocalFileSystem.scheme()) |
| |
| @mock.patch('apache_beam.io.localfilesystem.os') |
| def test_unix_path_join(self, *unused_mocks): |
| # Test joining of Unix paths. |
| localfilesystem.os.path.join.side_effect = _gen_fake_join('/') |
| self.assertEqual( |
| '/tmp/path/to/file', self.fs.join('/tmp/path', 'to', 'file')) |
| self.assertEqual('/tmp/path/to/file', self.fs.join('/tmp/path', 'to/file')) |
| |
| @mock.patch('apache_beam.io.localfilesystem.os') |
| def test_windows_path_join(self, *unused_mocks): |
| # Test joining of Windows paths. |
| localfilesystem.os.path.join.side_effect = _gen_fake_join('\\') |
| self.assertEqual( |
| r'C:\tmp\path\to\file', self.fs.join(r'C:\tmp\path', 'to', 'file')) |
| self.assertEqual( |
| r'C:\tmp\path\to\file', self.fs.join(r'C:\tmp\path', r'to\file')) |
| |
| @mock.patch('apache_beam.io.localfilesystem.os') |
| def test_unix_path_split(self, os_mock): |
| os_mock.path.abspath.side_effect = lambda a: a |
| os_mock.path.split.side_effect = _gen_fake_split('/') |
| self.assertEqual(('/tmp/path/to', 'file'), |
| self.fs.split('/tmp/path/to/file')) |
| # Actual os.path.split will split following to '/' and 'tmp' when run in |
| # Unix. |
| self.assertEqual(('', 'tmp'), self.fs.split('/tmp')) |
| |
| @mock.patch('apache_beam.io.localfilesystem.os') |
| def test_windows_path_split(self, os_mock): |
| os_mock.path.abspath = lambda a: a |
| os_mock.path.split.side_effect = _gen_fake_split('\\') |
| self.assertEqual((r'C:\tmp\path\to', 'file'), |
| self.fs.split(r'C:\tmp\path\to\file')) |
| # Actual os.path.split will split following to 'C:\' and 'tmp' when run in |
| # Windows. |
| self.assertEqual((r'C:', 'tmp'), self.fs.split(r'C:\tmp')) |
| |
| def test_mkdirs(self): |
| path = os.path.join(self.tmpdir, 't1/t2') |
| self.fs.mkdirs(path) |
| self.assertTrue(os.path.isdir(path)) |
| |
| def test_mkdirs_failed(self): |
| path = os.path.join(self.tmpdir, 't1/t2') |
| self.fs.mkdirs(path) |
| |
| # Check IOError if existing directory is created |
| with self.assertRaises(IOError): |
| self.fs.mkdirs(path) |
| |
| with self.assertRaises(IOError): |
| self.fs.mkdirs(os.path.join(self.tmpdir, 't1')) |
| |
| def test_match_file(self): |
| path = os.path.join(self.tmpdir, 'f1') |
| open(path, 'a').close() |
| |
| # Match files in the temp directory |
| result = self.fs.match([path])[0] |
| files = [f.path for f in result.metadata_list] |
| self.assertEqual(files, [path]) |
| |
| def test_match_file_empty(self): |
| path = os.path.join(self.tmpdir, 'f2') # Does not exist |
| |
| # Match files in the temp directory |
| result = self.fs.match([path])[0] |
| files = [f.path for f in result.metadata_list] |
| self.assertEqual(files, []) |
| |
| def test_match_file_exception(self): |
| # Match files with None so that it throws an exception |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Match operation failed') as error: |
| self.fs.match([None]) |
| self.assertEqual(list(error.exception.exception_details.keys()), [None]) |
| |
| @parameterized.expand([ |
| param('*', files=['a', 'b', os.path.join('c', 'x')], expected=['a', 'b']), |
| param( |
| '**', |
| files=['a', os.path.join('b', 'x'), os.path.join('c', 'x')], |
| expected=['a', os.path.join('b', 'x'), os.path.join('c', 'x')]), |
| param( |
| os.path.join('*', '*'), |
| files=[ |
| 'a', |
| os.path.join('b', 'x'), |
| os.path.join('c', 'x'), |
| os.path.join('d', 'x', 'y') |
| ], |
| expected=[os.path.join('b', 'x'), os.path.join('c', 'x')]), |
| param( |
| os.path.join('**', '*'), |
| files=[ |
| 'a', |
| os.path.join('b', 'x'), |
| os.path.join('c', 'x'), |
| os.path.join('d', 'x', 'y') |
| ], |
| expected=[ |
| os.path.join('b', 'x'), |
| os.path.join('c', 'x'), |
| os.path.join('d', 'x', 'y') |
| ]), |
| ]) |
| def test_match_glob(self, pattern, files, expected): |
| for filename in files: |
| full_path = os.path.join(self.tmpdir, filename) |
| dirname = os.path.dirname(full_path) |
| if not dirname == full_path: |
| # Make sure we don't go outside the tmpdir |
| assert os.path.commonprefix([self.tmpdir, full_path]) == self.tmpdir |
| try: |
| self.fs.mkdirs(dirname) |
| except IOError: |
| # Directory exists |
| pass |
| |
| open(full_path, 'a').close() # create empty file |
| |
| # Match both the files in the directory |
| full_pattern = os.path.join(self.tmpdir, pattern) |
| result = self.fs.match([full_pattern])[0] |
| files = [os.path.relpath(f.path, self.tmpdir) for f in result.metadata_list] |
| self.assertCountEqual(files, expected) |
| |
| def test_match_directory(self): |
| result = self.fs.match([self.tmpdir])[0] |
| files = [f.path for f in result.metadata_list] |
| self.assertEqual(files, [self.tmpdir]) |
| |
| def test_match_directory_contents(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| open(path1, 'a').close() |
| open(path2, 'a').close() |
| |
| result = self.fs.match([os.path.join(self.tmpdir, '*')])[0] |
| files = [f.path for f in result.metadata_list] |
| self.assertCountEqual(files, [path1, path2]) |
| |
| def test_copy(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| |
| self.fs.copy([path1], [path2]) |
| self.assertTrue(filecmp.cmp(path1, path2)) |
| |
| def test_copy_error(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Copy operation failed') as error: |
| self.fs.copy([path1], [path2]) |
| self.assertEqual( |
| list(error.exception.exception_details.keys()), [(path1, path2)]) |
| |
| def test_copy_directory(self): |
| path_t1 = os.path.join(self.tmpdir, 't1') |
| path_t2 = os.path.join(self.tmpdir, 't2') |
| self.fs.mkdirs(path_t1) |
| self.fs.mkdirs(path_t2) |
| |
| path1 = os.path.join(path_t1, 'f1') |
| path2 = os.path.join(path_t2, 'f1') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| |
| self.fs.copy([path_t1], [path_t2]) |
| self.assertTrue(filecmp.cmp(path1, path2)) |
| |
| def test_rename(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| |
| self.fs.rename([path1], [path2]) |
| self.assertTrue(self.fs.exists(path2)) |
| self.assertFalse(self.fs.exists(path1)) |
| |
| def test_rename_error(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Rename operation failed') as error: |
| self.fs.rename([path1], [path2]) |
| self.assertEqual( |
| list(error.exception.exception_details.keys()), [(path1, path2)]) |
| |
| def test_rename_directory(self): |
| path_t1 = os.path.join(self.tmpdir, 't1') |
| path_t2 = os.path.join(self.tmpdir, 't2') |
| self.fs.mkdirs(path_t1) |
| |
| path1 = os.path.join(path_t1, 'f1') |
| path2 = os.path.join(path_t2, 'f1') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| |
| self.fs.rename([path_t1], [path_t2]) |
| self.assertTrue(self.fs.exists(path_t2)) |
| self.assertFalse(self.fs.exists(path_t1)) |
| self.assertTrue(self.fs.exists(path2)) |
| self.assertFalse(self.fs.exists(path1)) |
| |
| def test_exists(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| self.assertTrue(self.fs.exists(path1)) |
| self.assertFalse(self.fs.exists(path2)) |
| |
| def test_checksum(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| path2 = os.path.join(self.tmpdir, 'f2') |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| with open(path2, 'a') as f: |
| f.write('foo') |
| self.assertEqual(self.fs.checksum(path1), str(5)) |
| self.assertEqual(self.fs.checksum(path2), str(3)) |
| |
| def make_tree(self, path, value, expected_leaf_count=None): |
| """Create a file+directory structure from a simple dict-based DSL |
| |
| :param path: root path to create directories+files under |
| :param value: a specification of what ``path`` should contain: ``None`` to |
| make it an empty directory, a string literal to make it a file with those |
| contents, and a ``dict`` to make it a non-empty directory and recurse |
| :param expected_leaf_count: only be set at the top of a recursive call |
| stack; after the whole tree has been created, verify the presence and |
| number of all files+directories, as a sanity check |
| """ |
| if value is None: |
| # empty directory |
| os.makedirs(path) |
| elif isinstance(value, str): |
| # file with string-literal contents |
| dir = os.path.dirname(path) |
| if not os.path.exists(dir): |
| os.makedirs(dir) |
| with open(path, 'a') as f: |
| f.write(value) |
| elif isinstance(value, dict): |
| # recurse to create a subdirectory tree |
| for basename, v in value.items(): |
| self.make_tree(os.path.join(path, basename), v) |
| else: |
| raise Exception('Unexpected value in tempdir tree: %s' % value) |
| |
| if expected_leaf_count is not None: |
| self.assertEqual(self.check_tree(path, value), expected_leaf_count) |
| |
| def check_tree(self, path, value, expected_leaf_count=None): |
| """Verify a directory+file structure according to the rules described in |
| ``make_tree`` |
| |
| :param path: path to check under |
| :param value: DSL-representation of expected files+directories under |
| ``path`` |
| :return: number of leaf files/directories that were verified |
| """ |
| actual_leaf_count = None |
| if value is None: |
| # empty directory |
| self.assertTrue(os.path.exists(path), msg=path) |
| self.assertEqual(os.listdir(path), []) |
| actual_leaf_count = 1 |
| elif isinstance(value, str): |
| # file with string-literal contents |
| with open(path, 'r') as f: |
| self.assertEqual(f.read(), value, msg=path) |
| |
| actual_leaf_count = 1 |
| elif isinstance(value, dict): |
| # recurse to check subdirectory tree |
| actual_leaf_count = sum([ |
| self.check_tree(os.path.join(path, basename), v) for basename, |
| v in value.items() |
| ]) |
| else: |
| raise Exception('Unexpected value in tempdir tree: %s' % value) |
| |
| if expected_leaf_count is not None: |
| self.assertEqual(actual_leaf_count, expected_leaf_count) |
| |
| return actual_leaf_count |
| |
| _test_tree = { |
| 'path1': '111', |
| 'path2': { |
| '2': '222', 'emptydir': None |
| }, |
| 'aaa': { |
| 'b1': 'b1', 'b2': None, 'bbb': { |
| 'ccc': { |
| 'ddd': 'DDD' |
| } |
| }, 'c': None |
| } |
| } |
| |
| def test_delete_globs(self): |
| dir = os.path.join(self.tmpdir, 'dir') |
| self.make_tree(dir, self._test_tree, expected_leaf_count=7) |
| |
| self.fs.delete([os.path.join(dir, 'path*'), os.path.join(dir, 'aaa', 'b*')]) |
| |
| # One empty nested directory is left |
| self.check_tree(dir, {'aaa': {'c': None}}, expected_leaf_count=1) |
| |
| def test_recursive_delete(self): |
| dir = os.path.join(self.tmpdir, 'dir') |
| self.make_tree(dir, self._test_tree, expected_leaf_count=7) |
| |
| self.fs.delete([dir]) |
| |
| self.check_tree(self.tmpdir, {'': None}, expected_leaf_count=1) |
| |
| def test_delete_glob_errors(self): |
| dir = os.path.join(self.tmpdir, 'dir') |
| self.make_tree(dir, self._test_tree, expected_leaf_count=7) |
| |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Delete operation failed') as error: |
| self.fs.delete([ |
| os.path.join(dir, 'path*'), |
| os.path.join(dir, 'aaa', 'b*'), |
| os.path.join(dir, 'aaa', 'd*') # doesn't match anything, will raise |
| ]) |
| |
| self.check_tree(dir, {'aaa': {'c': None}}, expected_leaf_count=1) |
| |
| self.assertEqual( |
| list(error.exception.exception_details.keys()), |
| [os.path.join(dir, 'aaa', 'd*')]) |
| |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Delete operation failed') as error: |
| self.fs.delete([ |
| os.path.join(dir, 'path*') # doesn't match anything, will raise |
| ]) |
| |
| self.check_tree(dir, {'aaa': {'c': None}}, expected_leaf_count=1) |
| |
| self.assertEqual( |
| list(error.exception.exception_details.keys()), |
| [os.path.join(dir, 'path*')]) |
| |
| def test_delete(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| |
| with open(path1, 'a') as f: |
| f.write('Hello') |
| |
| self.assertTrue(self.fs.exists(path1)) |
| self.fs.delete([path1]) |
| self.assertFalse(self.fs.exists(path1)) |
| |
| def test_delete_error(self): |
| path1 = os.path.join(self.tmpdir, 'f1') |
| with self.assertRaisesRegex(BeamIOError, |
| r'^Delete operation failed') as error: |
| self.fs.delete([path1]) |
| self.assertEqual(list(error.exception.exception_details.keys()), [path1]) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |