blob: 1f813b4cc482652471453fddad9ff87aef345804 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for LocalFileSystem."""
from __future__ import absolute_import
import filecmp
import logging
import os
import shutil
import tempfile
import unittest
import mock
from parameterized import param
from parameterized import parameterized
from apache_beam.io import localfilesystem
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import PipelineOptions
def _gen_fake_join(separator):
"""Returns a callable that joins paths with the given separator."""
def _join(first_path, *paths):
return separator.join((first_path.rstrip(separator),) + paths)
return _join
def _gen_fake_split(separator):
"""Returns a callable that splits a with the given separator."""
def _split(path):
sep_index = path.rfind(separator)
if sep_index >= 0:
return (path[:sep_index], path[sep_index + 1:])
else:
return (path, '')
return _split
class LocalFileSystemTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
pipeline_options = PipelineOptions()
self.fs = localfilesystem.LocalFileSystem(pipeline_options)
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_scheme(self):
self.assertIsNone(self.fs.scheme())
self.assertIsNone(localfilesystem.LocalFileSystem.scheme())
@mock.patch('apache_beam.io.localfilesystem.os')
def test_unix_path_join(self, *unused_mocks):
# Test joining of Unix paths.
localfilesystem.os.path.join.side_effect = _gen_fake_join('/')
self.assertEqual('/tmp/path/to/file',
self.fs.join('/tmp/path', 'to', 'file'))
self.assertEqual('/tmp/path/to/file',
self.fs.join('/tmp/path', 'to/file'))
@mock.patch('apache_beam.io.localfilesystem.os')
def test_windows_path_join(self, *unused_mocks):
# Test joining of Windows paths.
localfilesystem.os.path.join.side_effect = _gen_fake_join('\\')
self.assertEqual(r'C:\tmp\path\to\file',
self.fs.join(r'C:\tmp\path', 'to', 'file'))
self.assertEqual(r'C:\tmp\path\to\file',
self.fs.join(r'C:\tmp\path', r'to\file'))
@mock.patch('apache_beam.io.localfilesystem.os')
def test_unix_path_split(self, os_mock):
os_mock.path.abspath.side_effect = lambda a: a
os_mock.path.split.side_effect = _gen_fake_split('/')
self.assertEqual(('/tmp/path/to', 'file'),
self.fs.split('/tmp/path/to/file'))
# Actual os.path.split will split following to '/' and 'tmp' when run in
# Unix.
self.assertEqual(('', 'tmp'),
self.fs.split('/tmp'))
@mock.patch('apache_beam.io.localfilesystem.os')
def test_windows_path_split(self, os_mock):
os_mock.path.abspath = lambda a: a
os_mock.path.split.side_effect = _gen_fake_split('\\')
self.assertEqual((r'C:\tmp\path\to', 'file'),
self.fs.split(r'C:\tmp\path\to\file'))
# Actual os.path.split will split following to 'C:\' and 'tmp' when run in
# Windows.
self.assertEqual((r'C:', 'tmp'),
self.fs.split(r'C:\tmp'))
def test_mkdirs(self):
path = os.path.join(self.tmpdir, 't1/t2')
self.fs.mkdirs(path)
self.assertTrue(os.path.isdir(path))
def test_mkdirs_failed(self):
path = os.path.join(self.tmpdir, 't1/t2')
self.fs.mkdirs(path)
# Check IOError if existing directory is created
with self.assertRaises(IOError):
self.fs.mkdirs(path)
with self.assertRaises(IOError):
self.fs.mkdirs(os.path.join(self.tmpdir, 't1'))
def test_match_file(self):
path = os.path.join(self.tmpdir, 'f1')
open(path, 'a').close()
# Match files in the temp directory
result = self.fs.match([path])[0]
files = [f.path for f in result.metadata_list]
self.assertEqual(files, [path])
def test_match_file_empty(self):
path = os.path.join(self.tmpdir, 'f2') # Does not exist
# Match files in the temp directory
result = self.fs.match([path])[0]
files = [f.path for f in result.metadata_list]
self.assertEqual(files, [])
def test_match_file_exception(self):
# Match files with None so that it throws an exception
with self.assertRaisesRegexp(BeamIOError,
r'^Match operation failed') as error:
self.fs.match([None])
self.assertEqual(list(error.exception.exception_details.keys()), [None])
@parameterized.expand([
param('*',
files=['a', 'b', 'c/x'],
expected=['a', 'b']),
param('**',
files=['a', 'b/x', 'c/x'],
expected=['a', 'b/x', 'c/x']),
param('*/*',
files=['a', 'b/x', 'c/x', 'd/x/y'],
expected=['b/x', 'c/x']),
param('**/*',
files=['a', 'b/x', 'c/x', 'd/x/y'],
expected=['b/x', 'c/x', 'd/x/y']),
])
def test_match_glob(self, pattern, files, expected):
for filename in files:
full_path = os.path.join(self.tmpdir, filename)
dirname = os.path.dirname(full_path)
if not dirname == full_path:
# Make sure we don't go outside the tmpdir
assert os.path.commonprefix([self.tmpdir, full_path]) == self.tmpdir
try:
self.fs.mkdirs(dirname)
except IOError:
# Directory exists
pass
open(full_path, 'a').close() # create empty file
# Match both the files in the directory
full_pattern = os.path.join(self.tmpdir, pattern)
result = self.fs.match([full_pattern])[0]
files = [os.path.relpath(f.path, self.tmpdir) for f in result.metadata_list]
self.assertItemsEqual(files, expected)
def test_match_directory(self):
result = self.fs.match([self.tmpdir])[0]
files = [f.path for f in result.metadata_list]
self.assertEqual(files, [self.tmpdir])
def test_match_directory_contents(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
open(path1, 'a').close()
open(path2, 'a').close()
result = self.fs.match([self.tmpdir + '/'])[0]
files = [f.path for f in result.metadata_list]
self.assertItemsEqual(files, [path1, path2])
def test_copy(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with open(path1, 'a') as f:
f.write('Hello')
self.fs.copy([path1], [path2])
self.assertTrue(filecmp.cmp(path1, path2))
def test_copy_error(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with self.assertRaisesRegexp(BeamIOError,
r'^Copy operation failed') as error:
self.fs.copy([path1], [path2])
self.assertEqual(list(error.exception.exception_details.keys()),
[(path1, path2)])
def test_copy_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
path_t2 = os.path.join(self.tmpdir, 't2')
self.fs.mkdirs(path_t1)
self.fs.mkdirs(path_t2)
path1 = os.path.join(path_t1, 'f1')
path2 = os.path.join(path_t2, 'f1')
with open(path1, 'a') as f:
f.write('Hello')
self.fs.copy([path_t1], [path_t2])
self.assertTrue(filecmp.cmp(path1, path2))
def test_rename(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with open(path1, 'a') as f:
f.write('Hello')
self.fs.rename([path1], [path2])
self.assertTrue(self.fs.exists(path2))
self.assertFalse(self.fs.exists(path1))
def test_rename_error(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with self.assertRaisesRegexp(BeamIOError,
r'^Rename operation failed') as error:
self.fs.rename([path1], [path2])
self.assertEqual(list(error.exception.exception_details.keys()),
[(path1, path2)])
def test_rename_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
path_t2 = os.path.join(self.tmpdir, 't2')
self.fs.mkdirs(path_t1)
path1 = os.path.join(path_t1, 'f1')
path2 = os.path.join(path_t2, 'f1')
with open(path1, 'a') as f:
f.write('Hello')
self.fs.rename([path_t1], [path_t2])
self.assertTrue(self.fs.exists(path_t2))
self.assertFalse(self.fs.exists(path_t1))
self.assertTrue(self.fs.exists(path2))
self.assertFalse(self.fs.exists(path1))
def test_exists(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with open(path1, 'a') as f:
f.write('Hello')
self.assertTrue(self.fs.exists(path1))
self.assertFalse(self.fs.exists(path2))
def test_checksum(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
with open(path1, 'a') as f:
f.write('Hello')
with open(path2, 'a') as f:
f.write('foo')
self.assertEquals(self.fs.checksum(path1), str(5))
self.assertEquals(self.fs.checksum(path2), str(3))
def make_tree(self, path, value, expected_leaf_count=None):
"""Create a file+directory structure from a simple dict-based DSL
:param path: root path to create directories+files under
:param value: a specification of what ``path`` should contain: ``None`` to
make it an empty directory, a string literal to make it a file with those
contents, and a ``dict`` to make it a non-empty directory and recurse
:param expected_leaf_count: only be set at the top of a recursive call
stack; after the whole tree has been created, verify the presence and
number of all files+directories, as a sanity check
"""
if value is None:
# empty directory
os.makedirs(path)
elif isinstance(value, str):
# file with string-literal contents
dir = os.path.dirname(path)
if not os.path.exists(dir):
os.makedirs(dir)
with open(path, 'a') as f:
f.write(value)
elif isinstance(value, dict):
# recurse to create a subdirectory tree
for basename, v in value.items():
self.make_tree(
os.path.join(path, basename),
v
)
else:
raise Exception(
'Unexpected value in tempdir tree: %s' % value
)
if expected_leaf_count != None:
self.assertEqual(
self.check_tree(path, value),
expected_leaf_count
)
def check_tree(self, path, value, expected_leaf_count=None):
"""Verify a directory+file structure according to the rules described in
``make_tree``
:param path: path to check under
:param value: DSL-representation of expected files+directories under
``path``
:return: number of leaf files/directories that were verified
"""
actual_leaf_count = None
if value is None:
# empty directory
self.assertTrue(os.path.exists(path), msg=path)
self.assertEqual(os.listdir(path), [])
actual_leaf_count = 1
elif isinstance(value, str):
# file with string-literal contents
with open(path, 'r') as f:
self.assertEqual(f.read(), value, msg=path)
actual_leaf_count = 1
elif isinstance(value, dict):
# recurse to check subdirectory tree
actual_leaf_count = sum(
[
self.check_tree(
os.path.join(path, basename),
v
)
for basename, v in value.items()
]
)
else:
raise Exception(
'Unexpected value in tempdir tree: %s' % value
)
if expected_leaf_count != None:
self.assertEqual(actual_leaf_count, expected_leaf_count)
return actual_leaf_count
_test_tree = {
'path1': '111',
'path2': {
'2': '222',
'emptydir': None
},
'aaa': {
'b1': 'b1',
'b2': None,
'bbb': {
'ccc': {
'ddd': 'DDD'
}
},
'c': None
}
}
def test_delete_globs(self):
dir = os.path.join(self.tmpdir, 'dir')
self.make_tree(dir, self._test_tree, expected_leaf_count=7)
self.fs.delete([
os.path.join(dir, 'path*'),
os.path.join(dir, 'aaa/b*')
])
# One empty nested directory is left
self.check_tree(
dir,
{
'aaa': {
'c': None
}
},
expected_leaf_count=1
)
def test_recursive_delete(self):
dir = os.path.join(self.tmpdir, 'dir')
self.make_tree(dir, self._test_tree, expected_leaf_count=7)
self.fs.delete([dir])
self.check_tree(
self.tmpdir,
{'': None},
expected_leaf_count=1
)
def test_delete_glob_errors(self):
dir = os.path.join(self.tmpdir, 'dir')
self.make_tree(dir, self._test_tree, expected_leaf_count=7)
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
self.fs.delete([
os.path.join(dir, 'path*'),
os.path.join(dir, 'aaa/b*'),
os.path.join(dir, 'aaa/d*') # doesn't match anything, will raise
])
self.check_tree(
dir,
{
'aaa': {
'c': None
}
},
expected_leaf_count=1
)
self.assertEqual(
list(
error
.exception
.exception_details
.keys()
),
[os.path.join(dir, 'aaa/d*')]
)
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
self.fs.delete([
os.path.join(dir, 'path*') # doesn't match anything, will raise
])
self.check_tree(
dir,
{
'aaa': {
'c': None
}
},
expected_leaf_count=1
)
self.assertEqual(
list(
error
.exception
.exception_details
.keys()
),
[os.path.join(dir, 'path*')]
)
def test_delete(self):
path1 = os.path.join(self.tmpdir, 'f1')
with open(path1, 'a') as f:
f.write('Hello')
self.assertTrue(self.fs.exists(path1))
self.fs.delete([path1])
self.assertFalse(self.fs.exists(path1))
def test_delete_error(self):
path1 = os.path.join(self.tmpdir, 'f1')
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
self.fs.delete([path1])
self.assertEqual(list(error.exception.exception_details.keys()), [path1])
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()