| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from datetime import datetime, timezone, timedelta |
| import gzip |
| import os |
| import pathlib |
| import pickle |
| import sys |
| |
| import pytest |
| import weakref |
| |
| import pyarrow as pa |
| from pyarrow.tests.test_io import assert_file_not_found |
| from pyarrow.vendored.version import Version |
| |
| from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, |
| LocalFileSystem, SubTreeFileSystem, _MockFileSystem, |
| FileSystemHandler, PyFileSystem, FSSpecHandler) |
| |
| |
| class DummyHandler(FileSystemHandler): |
| def __init__(self, value=42): |
| self._value = value |
| |
| def __eq__(self, other): |
| if isinstance(other, FileSystemHandler): |
| return self._value == other._value |
| return NotImplemented |
| |
| def __ne__(self, other): |
| if isinstance(other, FileSystemHandler): |
| return self._value != other._value |
| return NotImplemented |
| |
| def get_type_name(self): |
| return "dummy" |
| |
| def normalize_path(self, path): |
| return path |
| |
| def get_file_info(self, paths): |
| info = [] |
| for path in paths: |
| if "file" in path: |
| info.append(FileInfo(path, FileType.File)) |
| elif "dir" in path: |
| info.append(FileInfo(path, FileType.Directory)) |
| elif "notfound" in path: |
| info.append(FileInfo(path, FileType.NotFound)) |
| elif "badtype" in path: |
| # Will raise when converting |
| info.append(object()) |
| else: |
| raise IOError |
| return info |
| |
| def get_file_info_selector(self, selector): |
| if selector.base_dir != "somedir": |
| if selector.allow_not_found: |
| return [] |
| else: |
| raise FileNotFoundError(selector.base_dir) |
| infos = [ |
| FileInfo("somedir/file1", FileType.File, size=123), |
| FileInfo("somedir/subdir1", FileType.Directory), |
| ] |
| if selector.recursive: |
| infos += [ |
| FileInfo("somedir/subdir1/file2", FileType.File, size=456), |
| ] |
| return infos |
| |
| def create_dir(self, path, recursive): |
| if path == "recursive": |
| assert recursive is True |
| elif path == "non-recursive": |
| assert recursive is False |
| else: |
| raise IOError |
| |
| def delete_dir(self, path): |
| assert path == "delete_dir" |
| |
| def delete_dir_contents(self, path): |
| if not path.strip("/"): |
| raise ValueError |
| assert path == "delete_dir_contents" |
| |
| def delete_root_dir_contents(self): |
| pass |
| |
| def delete_file(self, path): |
| assert path == "delete_file" |
| |
| def move(self, src, dest): |
| assert src == "move_from" |
| assert dest == "move_to" |
| |
| def copy_file(self, src, dest): |
| assert src == "copy_file_from" |
| assert dest == "copy_file_to" |
| |
| def open_input_stream(self, path): |
| if "notfound" in path: |
| raise FileNotFoundError(path) |
| data = "{0}:input_stream".format(path).encode('utf8') |
| return pa.BufferReader(data) |
| |
| def open_input_file(self, path): |
| if "notfound" in path: |
| raise FileNotFoundError(path) |
| data = "{0}:input_file".format(path).encode('utf8') |
| return pa.BufferReader(data) |
| |
| def open_output_stream(self, path): |
| if "notfound" in path: |
| raise FileNotFoundError(path) |
| return pa.BufferOutputStream() |
| |
| def open_append_stream(self, path): |
| if "notfound" in path: |
| raise FileNotFoundError(path) |
| return pa.BufferOutputStream() |
| |
| |
| class ProxyHandler(FileSystemHandler): |
| |
| def __init__(self, fs): |
| self._fs = fs |
| |
| def __eq__(self, other): |
| if isinstance(other, ProxyHandler): |
| return self._fs == other._fs |
| return NotImplemented |
| |
| def __ne__(self, other): |
| if isinstance(other, ProxyHandler): |
| return self._fs != other._fs |
| return NotImplemented |
| |
| def get_type_name(self): |
| return "proxy::" + self._fs.type_name |
| |
| def normalize_path(self, path): |
| return self._fs.normalize_path(path) |
| |
| def get_file_info(self, paths): |
| return self._fs.get_file_info(paths) |
| |
| def get_file_info_selector(self, selector): |
| return self._fs.get_file_info(selector) |
| |
| def create_dir(self, path, recursive): |
| return self._fs.create_dir(path, recursive=recursive) |
| |
| def delete_dir(self, path): |
| return self._fs.delete_dir(path) |
| |
| def delete_dir_contents(self, path): |
| return self._fs.delete_dir_contents(path) |
| |
| def delete_root_dir_contents(self): |
| return self._fs.delete_dir_contents("", accept_root_dir=True) |
| |
| def delete_file(self, path): |
| return self._fs.delete_file(path) |
| |
| def move(self, src, dest): |
| return self._fs.move(src, dest) |
| |
| def copy_file(self, src, dest): |
| return self._fs.copy_file(src, dest) |
| |
| def open_input_stream(self, path): |
| return self._fs.open_input_stream(path) |
| |
| def open_input_file(self, path): |
| return self._fs.open_input_file(path) |
| |
| def open_output_stream(self, path): |
| return self._fs.open_output_stream(path) |
| |
| def open_append_stream(self, path): |
| return self._fs.open_append_stream(path) |
| |
| |
| @pytest.fixture |
| def localfs(request, tempdir): |
| return dict( |
| fs=LocalFileSystem(), |
| pathfn=lambda p: (tempdir / p).as_posix(), |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def py_localfs(request, tempdir): |
| return dict( |
| fs=PyFileSystem(ProxyHandler(LocalFileSystem())), |
| pathfn=lambda p: (tempdir / p).as_posix(), |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def mockfs(request): |
| return dict( |
| fs=_MockFileSystem(), |
| pathfn=lambda p: p, |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def py_mockfs(request): |
| return dict( |
| fs=PyFileSystem(ProxyHandler(_MockFileSystem())), |
| pathfn=lambda p: p, |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def localfs_with_mmap(request, tempdir): |
| return dict( |
| fs=LocalFileSystem(use_mmap=True), |
| pathfn=lambda p: (tempdir / p).as_posix(), |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def subtree_localfs(request, tempdir, localfs): |
| return dict( |
| fs=SubTreeFileSystem(str(tempdir), localfs['fs']), |
| pathfn=lambda p: p, |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def s3fs(request, s3_connection, s3_server): |
| request.config.pyarrow.requires('s3') |
| from pyarrow.fs import S3FileSystem |
| |
| host, port, access_key, secret_key = s3_connection |
| bucket = 'pyarrow-filesystem/' |
| |
| fs = S3FileSystem( |
| access_key=access_key, |
| secret_key=secret_key, |
| endpoint_override='{}:{}'.format(host, port), |
| scheme='http' |
| ) |
| fs.create_dir(bucket) |
| |
| yield dict( |
| fs=fs, |
| pathfn=bucket.__add__, |
| allow_copy_file=True, |
| allow_move_dir=False, |
| allow_append_to_file=False, |
| ) |
| fs.delete_dir(bucket) |
| |
| |
| @pytest.fixture |
| def subtree_s3fs(request, s3fs): |
| prefix = 'pyarrow-filesystem/prefix/' |
| return dict( |
| fs=SubTreeFileSystem(prefix, s3fs['fs']), |
| pathfn=prefix.__add__, |
| allow_copy_file=True, |
| allow_move_dir=False, |
| allow_append_to_file=False, |
| ) |
| |
| |
| @pytest.fixture |
| def hdfs(request, hdfs_connection): |
| request.config.pyarrow.requires('hdfs') |
| if not pa.have_libhdfs(): |
| pytest.skip('Cannot locate libhdfs') |
| |
| from pyarrow.fs import HadoopFileSystem |
| |
| host, port, user = hdfs_connection |
| fs = HadoopFileSystem(host, port=port, user=user) |
| |
| return dict( |
| fs=fs, |
| pathfn=lambda p: p, |
| allow_copy_file=False, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def py_fsspec_localfs(request, tempdir): |
| fsspec = pytest.importorskip("fsspec") |
| fs = fsspec.filesystem('file') |
| return dict( |
| fs=PyFileSystem(FSSpecHandler(fs)), |
| pathfn=lambda p: (tempdir / p).as_posix(), |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def py_fsspec_memoryfs(request, tempdir): |
| fsspec = pytest.importorskip("fsspec", minversion="0.7.5") |
| if fsspec.__version__ == "0.8.5": |
| # see https://issues.apache.org/jira/browse/ARROW-10934 |
| pytest.skip("Bug in fsspec 0.8.5 for in-memory filesystem") |
| fs = fsspec.filesystem('memory') |
| return dict( |
| fs=PyFileSystem(FSSpecHandler(fs)), |
| pathfn=lambda p: p, |
| allow_copy_file=True, |
| allow_move_dir=True, |
| allow_append_to_file=True, |
| ) |
| |
| |
| @pytest.fixture |
| def py_fsspec_s3fs(request, s3_connection, s3_server): |
| s3fs = pytest.importorskip("s3fs") |
| if (sys.version_info < (3, 7) and |
| Version(s3fs.__version__) >= Version("0.5")): |
| pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7") |
| |
| host, port, access_key, secret_key = s3_connection |
| bucket = 'pyarrow-filesystem/' |
| |
| fs = s3fs.S3FileSystem( |
| key=access_key, |
| secret=secret_key, |
| client_kwargs=dict(endpoint_url='http://{}:{}'.format(host, port)) |
| ) |
| fs = PyFileSystem(FSSpecHandler(fs)) |
| fs.create_dir(bucket) |
| |
| yield dict( |
| fs=fs, |
| pathfn=bucket.__add__, |
| allow_copy_file=True, |
| allow_move_dir=False, |
| allow_append_to_file=True, |
| ) |
| fs.delete_dir(bucket) |
| |
| |
| @pytest.fixture(params=[ |
| pytest.param( |
| pytest.lazy_fixture('localfs'), |
| id='LocalFileSystem()' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('localfs_with_mmap'), |
| id='LocalFileSystem(use_mmap=True)' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('subtree_localfs'), |
| id='SubTreeFileSystem(LocalFileSystem())' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('s3fs'), |
| id='S3FileSystem' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('hdfs'), |
| id='HadoopFileSystem' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('mockfs'), |
| id='_MockFileSystem()' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('py_localfs'), |
| id='PyFileSystem(ProxyHandler(LocalFileSystem()))' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('py_mockfs'), |
| id='PyFileSystem(ProxyHandler(_MockFileSystem()))' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('py_fsspec_localfs'), |
| id='PyFileSystem(FSSpecHandler(fsspec.LocalFileSystem()))' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('py_fsspec_memoryfs'), |
| id='PyFileSystem(FSSpecHandler(fsspec.filesystem("memory")))' |
| ), |
| pytest.param( |
| pytest.lazy_fixture('py_fsspec_s3fs'), |
| id='PyFileSystem(FSSpecHandler(s3fs.S3FileSystem()))' |
| ), |
| ]) |
| def filesystem_config(request): |
| return request.param |
| |
| |
| @pytest.fixture |
| def fs(request, filesystem_config): |
| return filesystem_config['fs'] |
| |
| |
| @pytest.fixture |
| def pathfn(request, filesystem_config): |
| return filesystem_config['pathfn'] |
| |
| |
| @pytest.fixture |
| def allow_move_dir(request, filesystem_config): |
| return filesystem_config['allow_move_dir'] |
| |
| |
| @pytest.fixture |
| def allow_copy_file(request, filesystem_config): |
| return filesystem_config['allow_copy_file'] |
| |
| |
| @pytest.fixture |
| def allow_append_to_file(request, filesystem_config): |
| return filesystem_config['allow_append_to_file'] |
| |
| |
| def check_mtime(file_info): |
| assert isinstance(file_info.mtime, datetime) |
| assert isinstance(file_info.mtime_ns, int) |
| assert file_info.mtime_ns >= 0 |
| assert file_info.mtime_ns == pytest.approx( |
| file_info.mtime.timestamp() * 1e9) |
| # It's an aware UTC datetime |
| tzinfo = file_info.mtime.tzinfo |
| assert tzinfo is not None |
| assert tzinfo.utcoffset(None) == timedelta(0) |
| |
| |
| def check_mtime_absent(file_info): |
| assert file_info.mtime is None |
| assert file_info.mtime_ns is None |
| |
| |
| def check_mtime_or_absent(file_info): |
| if file_info.mtime is None: |
| check_mtime_absent(file_info) |
| else: |
| check_mtime(file_info) |
| |
| |
| def skip_fsspec_s3fs(fs): |
| if fs.type_name == "py::fsspec+s3": |
| pytest.xfail(reason="Not working with fsspec's s3fs") |
| |
| |
| def test_file_info_constructor(): |
| dt = datetime.fromtimestamp(1568799826, timezone.utc) |
| |
| info = FileInfo("foo/bar") |
| assert info.path == "foo/bar" |
| assert info.base_name == "bar" |
| assert info.type == FileType.Unknown |
| assert info.size is None |
| check_mtime_absent(info) |
| |
| info = FileInfo("foo/baz.txt", type=FileType.File, size=123, |
| mtime=1568799826.5) |
| assert info.path == "foo/baz.txt" |
| assert info.base_name == "baz.txt" |
| assert info.type == FileType.File |
| assert info.size == 123 |
| assert info.mtime_ns == 1568799826500000000 |
| check_mtime(info) |
| |
| info = FileInfo("foo", type=FileType.Directory, mtime=dt) |
| assert info.path == "foo" |
| assert info.base_name == "foo" |
| assert info.type == FileType.Directory |
| assert info.size is None |
| assert info.mtime == dt |
| assert info.mtime_ns == 1568799826000000000 |
| check_mtime(info) |
| |
| |
| def test_cannot_instantiate_base_filesystem(): |
| with pytest.raises(TypeError): |
| FileSystem() |
| |
| |
| def test_filesystem_equals(): |
| fs0 = LocalFileSystem() |
| fs1 = LocalFileSystem() |
| fs2 = _MockFileSystem() |
| |
| assert fs0.equals(fs0) |
| assert fs0.equals(fs1) |
| with pytest.raises(TypeError): |
| fs0.equals('string') |
| assert fs0 == fs0 == fs1 |
| assert fs0 != 4 |
| |
| assert fs2 == fs2 |
| assert fs2 != _MockFileSystem() |
| |
| assert SubTreeFileSystem('/base', fs0) == SubTreeFileSystem('/base', fs0) |
| assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/base', fs2) |
| assert SubTreeFileSystem('/base', fs0) != SubTreeFileSystem('/other', fs0) |
| |
| |
| def test_subtree_filesystem(): |
| localfs = LocalFileSystem() |
| |
| subfs = SubTreeFileSystem('/base', localfs) |
| assert subfs.base_path == '/base/' |
| assert subfs.base_fs == localfs |
| |
| subfs = SubTreeFileSystem('/another/base/', LocalFileSystem()) |
| assert subfs.base_path == '/another/base/' |
| assert subfs.base_fs == localfs |
| |
| |
| def test_filesystem_pickling(fs): |
| if fs.type_name.split('::')[-1] == 'mock': |
| pytest.xfail(reason='MockFileSystem is not serializable') |
| |
| serialized = pickle.dumps(fs) |
| restored = pickle.loads(serialized) |
| assert isinstance(restored, FileSystem) |
| assert restored.equals(fs) |
| |
| |
| def test_filesystem_is_functional_after_pickling(fs, pathfn): |
| if fs.type_name.split('::')[-1] == 'mock': |
| pytest.xfail(reason='MockFileSystem is not serializable') |
| skip_fsspec_s3fs(fs) |
| |
| aaa = pathfn('a/aa/aaa/') |
| bb = pathfn('a/bb') |
| c = pathfn('c.txt') |
| |
| fs.create_dir(aaa) |
| with fs.open_output_stream(bb): |
| pass # touch |
| with fs.open_output_stream(c) as fp: |
| fp.write(b'test') |
| |
| restored = pickle.loads(pickle.dumps(fs)) |
| aaa_info, bb_info, c_info = restored.get_file_info([aaa, bb, c]) |
| assert aaa_info.type == FileType.Directory |
| assert bb_info.type == FileType.File |
| assert c_info.type == FileType.File |
| |
| |
| def test_type_name(): |
| fs = LocalFileSystem() |
| assert fs.type_name == "local" |
| fs = _MockFileSystem() |
| assert fs.type_name == "mock" |
| |
| |
| def test_normalize_path(fs): |
| # Trivial path names (without separators) should generally be |
| # already normalized. Just a sanity check. |
| assert fs.normalize_path("foo") == "foo" |
| |
| |
| def test_non_path_like_input_raises(fs): |
| class Path: |
| pass |
| |
| invalid_paths = [1, 1.1, Path(), tuple(), {}, [], lambda: 1, |
| pathlib.Path()] |
| for path in invalid_paths: |
| with pytest.raises(TypeError): |
| fs.create_dir(path) |
| |
| |
| def test_get_file_info(fs, pathfn): |
| aaa = pathfn('a/aa/aaa/') |
| bb = pathfn('a/bb') |
| c = pathfn('c.txt') |
| zzz = pathfn('zzz') |
| |
| fs.create_dir(aaa) |
| with fs.open_output_stream(bb): |
| pass # touch |
| with fs.open_output_stream(c) as fp: |
| fp.write(b'test') |
| |
| aaa_info, bb_info, c_info, zzz_info = fs.get_file_info([aaa, bb, c, zzz]) |
| |
| assert aaa_info.path == aaa |
| assert 'aaa' in repr(aaa_info) |
| assert aaa_info.extension == '' |
| if fs.type_name == "py::fsspec+s3": |
| # s3fs doesn't create empty directories |
| assert aaa_info.type == FileType.NotFound |
| else: |
| assert aaa_info.type == FileType.Directory |
| assert 'FileType.Directory' in repr(aaa_info) |
| assert aaa_info.size is None |
| check_mtime_or_absent(aaa_info) |
| |
| assert bb_info.path == str(bb) |
| assert bb_info.base_name == 'bb' |
| assert bb_info.extension == '' |
| assert bb_info.type == FileType.File |
| assert 'FileType.File' in repr(bb_info) |
| assert bb_info.size == 0 |
| if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]: |
| check_mtime(bb_info) |
| |
| assert c_info.path == str(c) |
| assert c_info.base_name == 'c.txt' |
| assert c_info.extension == 'txt' |
| assert c_info.type == FileType.File |
| assert 'FileType.File' in repr(c_info) |
| assert c_info.size == 4 |
| if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]: |
| check_mtime(c_info) |
| |
| assert zzz_info.path == str(zzz) |
| assert zzz_info.base_name == 'zzz' |
| assert zzz_info.extension == '' |
| assert zzz_info.type == FileType.NotFound |
| assert zzz_info.size is None |
| assert zzz_info.mtime is None |
| assert 'FileType.NotFound' in repr(zzz_info) |
| check_mtime_absent(zzz_info) |
| |
| # with single path |
| aaa_info2 = fs.get_file_info(aaa) |
| assert aaa_info.path == aaa_info2.path |
| assert aaa_info.type == aaa_info2.type |
| |
| |
| def test_get_file_info_with_selector(fs, pathfn): |
| base_dir = pathfn('selector-dir/') |
| file_a = pathfn('selector-dir/test_file_a') |
| file_b = pathfn('selector-dir/test_file_b') |
| dir_a = pathfn('selector-dir/test_dir_a') |
| file_c = pathfn('selector-dir/test_dir_a/test_file_c') |
| dir_b = pathfn('selector-dir/test_dir_b') |
| |
| try: |
| fs.create_dir(base_dir) |
| with fs.open_output_stream(file_a): |
| pass |
| with fs.open_output_stream(file_b): |
| pass |
| fs.create_dir(dir_a) |
| with fs.open_output_stream(file_c): |
| pass |
| fs.create_dir(dir_b) |
| |
| # recursive selector |
| selector = FileSelector(base_dir, allow_not_found=False, |
| recursive=True) |
| assert selector.base_dir == base_dir |
| |
| infos = fs.get_file_info(selector) |
| if fs.type_name == "py::fsspec+s3": |
| # s3fs only lists directories if they are not empty, but depending |
| # on the s3fs/fsspec version combo, it includes the base_dir |
| # (https://github.com/dask/s3fs/issues/393) |
| assert (len(infos) == 4) or (len(infos) == 5) |
| else: |
| assert len(infos) == 5 |
| |
| for info in infos: |
| if (info.path.endswith(file_a) or info.path.endswith(file_b) or |
| info.path.endswith(file_c)): |
| assert info.type == FileType.File |
| elif (info.path.rstrip("/").endswith(dir_a) or |
| info.path.rstrip("/").endswith(dir_b)): |
| assert info.type == FileType.Directory |
| elif (fs.type_name == "py::fsspec+s3" and |
| info.path.rstrip("/").endswith("selector-dir")): |
| # s3fs can include base dir, see above |
| assert info.type == FileType.Directory |
| else: |
| raise ValueError('unexpected path {}'.format(info.path)) |
| check_mtime_or_absent(info) |
| |
| # non-recursive selector -> not selecting the nested file_c |
| selector = FileSelector(base_dir, recursive=False) |
| |
| infos = fs.get_file_info(selector) |
| if fs.type_name == "py::fsspec+s3": |
| # s3fs only lists directories if they are not empty |
| # + for s3fs 0.5.2 all directories are dropped because of buggy |
| # side-effect of previous find() call |
| # (https://github.com/dask/s3fs/issues/410) |
| assert (len(infos) == 3) or (len(infos) == 2) |
| else: |
| assert len(infos) == 4 |
| |
| finally: |
| fs.delete_dir(base_dir) |
| |
| |
| def test_create_dir(fs, pathfn): |
| # s3fs fails deleting dir fails if it is empty |
| # (https://github.com/dask/s3fs/issues/317) |
| skip_fsspec_s3fs(fs) |
| d = pathfn('test-directory/') |
| |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(d) |
| |
| fs.create_dir(d) |
| fs.delete_dir(d) |
| |
| d = pathfn('deeply/nested/test-directory/') |
| fs.create_dir(d, recursive=True) |
| fs.delete_dir(d) |
| |
| |
| def test_delete_dir(fs, pathfn): |
| skip_fsspec_s3fs(fs) |
| |
| d = pathfn('directory/') |
| nd = pathfn('directory/nested/') |
| |
| fs.create_dir(nd) |
| fs.delete_dir(d) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(nd) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(d) |
| |
| |
| def test_delete_dir_contents(fs, pathfn): |
| skip_fsspec_s3fs(fs) |
| |
| d = pathfn('directory/') |
| nd = pathfn('directory/nested/') |
| |
| fs.create_dir(nd) |
| fs.delete_dir_contents(d) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(nd) |
| fs.delete_dir(d) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(d) |
| |
| |
| def _check_root_dir_contents(config): |
| fs = config['fs'] |
| pathfn = config['pathfn'] |
| |
| d = pathfn('directory/') |
| nd = pathfn('directory/nested/') |
| |
| fs.create_dir(nd) |
| with pytest.raises(pa.ArrowInvalid): |
| fs.delete_dir_contents("") |
| with pytest.raises(pa.ArrowInvalid): |
| fs.delete_dir_contents("/") |
| with pytest.raises(pa.ArrowInvalid): |
| fs.delete_dir_contents("//") |
| |
| fs.delete_dir_contents("", accept_root_dir=True) |
| fs.delete_dir_contents("/", accept_root_dir=True) |
| fs.delete_dir_contents("//", accept_root_dir=True) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(d) |
| |
| |
| def test_delete_root_dir_contents(mockfs, py_mockfs): |
| _check_root_dir_contents(mockfs) |
| _check_root_dir_contents(py_mockfs) |
| |
| |
| def test_copy_file(fs, pathfn, allow_copy_file): |
| s = pathfn('test-copy-source-file') |
| t = pathfn('test-copy-target-file') |
| |
| with fs.open_output_stream(s): |
| pass |
| |
| if allow_copy_file: |
| fs.copy_file(s, t) |
| fs.delete_file(s) |
| fs.delete_file(t) |
| else: |
| with pytest.raises(pa.ArrowNotImplementedError): |
| fs.copy_file(s, t) |
| |
| |
| def test_move_directory(fs, pathfn, allow_move_dir): |
| # move directory (doesn't work with S3) |
| s = pathfn('source-dir/') |
| t = pathfn('target-dir/') |
| |
| fs.create_dir(s) |
| |
| if allow_move_dir: |
| fs.move(s, t) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_dir(s) |
| fs.delete_dir(t) |
| else: |
| with pytest.raises(pa.ArrowIOError): |
| fs.move(s, t) |
| |
| |
| def test_move_file(fs, pathfn): |
| # s3fs moving a file with recursive=True on latest 0.5 version |
| # (https://github.com/dask/s3fs/issues/394) |
| skip_fsspec_s3fs(fs) |
| |
| s = pathfn('test-move-source-file') |
| t = pathfn('test-move-target-file') |
| |
| with fs.open_output_stream(s): |
| pass |
| |
| fs.move(s, t) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_file(s) |
| fs.delete_file(t) |
| |
| |
| def test_delete_file(fs, pathfn): |
| p = pathfn('test-delete-target-file') |
| with fs.open_output_stream(p): |
| pass |
| |
| fs.delete_file(p) |
| with pytest.raises(pa.ArrowIOError): |
| fs.delete_file(p) |
| |
| d = pathfn('test-delete-nested') |
| fs.create_dir(d) |
| f = pathfn('test-delete-nested/target-file') |
| with fs.open_output_stream(f) as s: |
| s.write(b'data') |
| |
| fs.delete_dir(d) |
| |
| |
| def identity(v): |
| return v |
| |
| |
| @pytest.mark.parametrize( |
| ('compression', 'buffer_size', 'compressor'), |
| [ |
| (None, None, identity), |
| (None, 64, identity), |
| ('gzip', None, gzip.compress), |
| ('gzip', 256, gzip.compress), |
| ] |
| ) |
| def test_open_input_stream(fs, pathfn, compression, buffer_size, compressor): |
| p = pathfn('open-input-stream') |
| |
| data = b'some data for reading\n' * 512 |
| with fs.open_output_stream(p) as s: |
| s.write(compressor(data)) |
| |
| with fs.open_input_stream(p, compression, buffer_size) as s: |
| result = s.read() |
| |
| assert result == data |
| |
| |
| def test_open_input_file(fs, pathfn): |
| p = pathfn('open-input-file') |
| |
| data = b'some data' * 1024 |
| with fs.open_output_stream(p) as s: |
| s.write(data) |
| |
| read_from = len(b'some data') * 512 |
| with fs.open_input_file(p) as f: |
| f.seek(read_from) |
| result = f.read() |
| |
| assert result == data[read_from:] |
| |
| |
| @pytest.mark.parametrize( |
| ('compression', 'buffer_size', 'decompressor'), |
| [ |
| (None, None, identity), |
| (None, 64, identity), |
| ('gzip', None, gzip.decompress), |
| ('gzip', 256, gzip.decompress), |
| ] |
| ) |
| def test_open_output_stream(fs, pathfn, compression, buffer_size, |
| decompressor): |
| p = pathfn('open-output-stream') |
| |
| data = b'some data for writing' * 1024 |
| with fs.open_output_stream(p, compression, buffer_size) as f: |
| f.write(data) |
| |
| with fs.open_input_stream(p, compression, buffer_size) as f: |
| assert f.read(len(data)) == data |
| |
| |
| @pytest.mark.parametrize( |
| ('compression', 'buffer_size', 'compressor', 'decompressor'), |
| [ |
| (None, None, identity, identity), |
| (None, 64, identity, identity), |
| ('gzip', None, gzip.compress, gzip.decompress), |
| ('gzip', 256, gzip.compress, gzip.decompress), |
| ] |
| ) |
| def test_open_append_stream(fs, pathfn, compression, buffer_size, compressor, |
| decompressor, allow_append_to_file): |
| p = pathfn('open-append-stream') |
| |
| initial = compressor(b'already existing') |
| with fs.open_output_stream(p) as s: |
| s.write(initial) |
| |
| if allow_append_to_file: |
| with fs.open_append_stream(p, compression=compression, |
| buffer_size=buffer_size) as f: |
| f.write(b'\nnewly added') |
| |
| with fs.open_input_stream(p) as f: |
| result = f.read() |
| |
| result = decompressor(result) |
| assert result == b'already existing\nnewly added' |
| else: |
| with pytest.raises(pa.ArrowNotImplementedError): |
| fs.open_append_stream(p, compression=compression, |
| buffer_size=buffer_size) |
| |
| |
| def test_localfs_options(): |
| # LocalFileSystem instantiation |
| LocalFileSystem(use_mmap=False) |
| |
| with pytest.raises(TypeError): |
| LocalFileSystem(xxx=False) |
| |
| |
| def test_localfs_errors(localfs): |
| # Local filesystem errors should raise the right Python exceptions |
| # (e.g. FileNotFoundError) |
| fs = localfs['fs'] |
| with assert_file_not_found(): |
| fs.open_input_stream('/non/existent/file') |
| with assert_file_not_found(): |
| fs.open_output_stream('/non/existent/file') |
| with assert_file_not_found(): |
| fs.create_dir('/non/existent/dir', recursive=False) |
| with assert_file_not_found(): |
| fs.delete_dir('/non/existent/dir') |
| with assert_file_not_found(): |
| fs.delete_file('/non/existent/dir') |
| with assert_file_not_found(): |
| fs.move('/non/existent', '/xxx') |
| with assert_file_not_found(): |
| fs.copy_file('/non/existent', '/xxx') |
| |
| |
| def test_localfs_file_info(localfs): |
| fs = localfs['fs'] |
| |
| file_path = pathlib.Path(__file__) |
| dir_path = file_path.parent |
| [file_info, dir_info] = fs.get_file_info([file_path.as_posix(), |
| dir_path.as_posix()]) |
| assert file_info.size == file_path.stat().st_size |
| assert file_info.mtime_ns == file_path.stat().st_mtime_ns |
| check_mtime(file_info) |
| assert dir_info.mtime_ns == dir_path.stat().st_mtime_ns |
| check_mtime(dir_info) |
| |
| |
| def test_mockfs_mtime_roundtrip(mockfs): |
| dt = datetime.fromtimestamp(1568799826, timezone.utc) |
| fs = _MockFileSystem(dt) |
| |
| with fs.open_output_stream('foo'): |
| pass |
| [info] = fs.get_file_info(['foo']) |
| assert info.mtime == dt |
| |
| |
| @pytest.mark.s3 |
| def test_s3_options(): |
| from pyarrow.fs import S3FileSystem |
| |
| fs = S3FileSystem(access_key='access', secret_key='secret', |
| session_token='token', region='us-east-2', |
| scheme='https', endpoint_override='localhost:8999') |
| assert isinstance(fs, S3FileSystem) |
| assert fs.region == 'us-east-2' |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| fs = S3FileSystem(role_arn='role', session_name='session', |
| external_id='id', load_frequency=100) |
| assert isinstance(fs, S3FileSystem) |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| with pytest.raises(ValueError): |
| S3FileSystem(access_key='access') |
| with pytest.raises(ValueError): |
| S3FileSystem(secret_key='secret') |
| with pytest.raises(ValueError): |
| S3FileSystem(access_key='access', session_token='token') |
| with pytest.raises(ValueError): |
| S3FileSystem(secret_key='secret', session_token='token') |
| with pytest.raises(ValueError): |
| S3FileSystem( |
| access_key='access', secret_key='secret', role_arn='arn' |
| ) |
| |
| |
| @pytest.mark.s3 |
| def test_s3_proxy_options(monkeypatch): |
| from pyarrow.fs import S3FileSystem |
| |
| # The following two are equivalent: |
| proxy_opts_1_dict = {'scheme': 'http', 'host': 'localhost', 'port': 8999} |
| proxy_opts_1_str = 'http://localhost:8999' |
| # The following two are equivalent: |
| proxy_opts_2_dict = {'scheme': 'https', 'host': 'localhost', 'port': 8080} |
| proxy_opts_2_str = 'https://localhost:8080' |
| |
| # Check dict case for 'proxy_options' |
| fs = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| assert isinstance(fs, S3FileSystem) |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| fs = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| assert isinstance(fs, S3FileSystem) |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| # Check str case for 'proxy_options' |
| fs = S3FileSystem(proxy_options=proxy_opts_1_str) |
| assert isinstance(fs, S3FileSystem) |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| fs = S3FileSystem(proxy_options=proxy_opts_2_str) |
| assert isinstance(fs, S3FileSystem) |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| # Check that two FSs using the same proxy_options dict are equal |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| # Check that two FSs using the same proxy_options str are equal |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| # Check that two FSs using equivalent proxy_options |
| # (one dict, one str) are equal |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| assert fs1 == fs2 |
| assert pickle.loads(pickle.dumps(fs1)) == fs2 |
| assert pickle.loads(pickle.dumps(fs2)) == fs1 |
| |
| # Check that two FSs using nonequivalent proxy_options are not equal |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| fs2 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| # Check that two FSs (one using proxy_options and the other not) |
| # are not equal |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_dict) |
| fs2 = S3FileSystem() |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_1_str) |
| fs2 = S3FileSystem() |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_2_dict) |
| fs2 = S3FileSystem() |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| fs1 = S3FileSystem(proxy_options=proxy_opts_2_str) |
| fs2 = S3FileSystem() |
| assert fs1 != fs2 |
| assert pickle.loads(pickle.dumps(fs1)) != fs2 |
| assert pickle.loads(pickle.dumps(fs2)) != fs1 |
| |
| # Only dict and str are supported |
| with pytest.raises(TypeError): |
| S3FileSystem(proxy_options=('http', 'localhost', 9090)) |
| # Missing scheme |
| with pytest.raises(KeyError): |
| S3FileSystem(proxy_options={'host': 'localhost', 'port': 9090}) |
| # Missing host |
| with pytest.raises(KeyError): |
| S3FileSystem(proxy_options={'scheme': 'https', 'port': 9090}) |
| # Missing port |
| with pytest.raises(KeyError): |
| S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost'}) |
| # Invalid proxy URI (invalid scheme htttps) |
| with pytest.raises(pa.ArrowInvalid): |
| S3FileSystem(proxy_options='htttps://localhost:9000') |
| # Invalid proxy_options dict (invalid scheme htttps) |
| with pytest.raises(pa.ArrowInvalid): |
| S3FileSystem(proxy_options={'scheme': 'htttp', 'host': 'localhost', |
| 'port': 8999}) |
| |
| |
| @pytest.mark.hdfs |
| def test_hdfs_options(hdfs_connection): |
| from pyarrow.fs import HadoopFileSystem |
| if not pa.have_libhdfs(): |
| pytest.skip('Cannot locate libhdfs') |
| |
| host, port, user = hdfs_connection |
| |
| replication = 2 |
| buffer_size = 64*1024 |
| default_block_size = 128*1024**2 |
| uri = ('hdfs://{}:{}/?user={}&replication={}&buffer_size={}' |
| '&default_block_size={}') |
| |
| hdfs1 = HadoopFileSystem(host, port, user='libhdfs', |
| replication=replication, buffer_size=buffer_size, |
| default_block_size=default_block_size) |
| hdfs2 = HadoopFileSystem.from_uri(uri.format( |
| host, port, 'libhdfs', replication, buffer_size, default_block_size |
| )) |
| hdfs3 = HadoopFileSystem.from_uri(uri.format( |
| host, port, 'me', replication, buffer_size, default_block_size |
| )) |
| hdfs4 = HadoopFileSystem.from_uri(uri.format( |
| host, port, 'me', replication + 1, buffer_size, default_block_size |
| )) |
| hdfs5 = HadoopFileSystem(host, port) |
| hdfs6 = HadoopFileSystem.from_uri('hdfs://{}:{}'.format(host, port)) |
| hdfs7 = HadoopFileSystem(host, port, user='localuser') |
| hdfs8 = HadoopFileSystem(host, port, user='localuser', |
| kerb_ticket="cache_path") |
| hdfs9 = HadoopFileSystem(host, port, user='localuser', |
| kerb_ticket=pathlib.Path("cache_path")) |
| hdfs10 = HadoopFileSystem(host, port, user='localuser', |
| kerb_ticket="cache_path2") |
| hdfs11 = HadoopFileSystem(host, port, user='localuser', |
| kerb_ticket="cache_path", |
| extra_conf={'hdfs_token': 'abcd'}) |
| |
| assert hdfs1 == hdfs2 |
| assert hdfs5 == hdfs6 |
| assert hdfs6 != hdfs7 |
| assert hdfs2 != hdfs3 |
| assert hdfs3 != hdfs4 |
| assert hdfs7 != hdfs5 |
| assert hdfs2 != hdfs3 |
| assert hdfs3 != hdfs4 |
| assert hdfs7 != hdfs8 |
| assert hdfs8 == hdfs9 |
| assert hdfs10 != hdfs9 |
| assert hdfs11 != hdfs8 |
| |
| with pytest.raises(TypeError): |
| HadoopFileSystem() |
| with pytest.raises(TypeError): |
| HadoopFileSystem.from_uri(3) |
| |
| for fs in [hdfs1, hdfs2, hdfs3, hdfs4, hdfs5, hdfs6, hdfs7, hdfs8, |
| hdfs9, hdfs10, hdfs11]: |
| assert pickle.loads(pickle.dumps(fs)) == fs |
| |
| host, port, user = hdfs_connection |
| |
| hdfs = HadoopFileSystem(host, port, user=user) |
| assert hdfs.get_file_info(FileSelector('/')) |
| |
| hdfs = HadoopFileSystem.from_uri( |
| "hdfs://{}:{}/?user={}".format(host, port, user) |
| ) |
| assert hdfs.get_file_info(FileSelector('/')) |
| |
| |
| @pytest.mark.parametrize(('uri', 'expected_klass', 'expected_path'), [ |
| # leading slashes are removed intentionally, because MockFileSystem doesn't |
| # have a distinction between relative and absolute paths |
| ('mock:', _MockFileSystem, ''), |
| ('mock:foo/bar', _MockFileSystem, 'foo/bar'), |
| ('mock:/foo/bar', _MockFileSystem, 'foo/bar'), |
| ('mock:///foo/bar', _MockFileSystem, 'foo/bar'), |
| ('file:/', LocalFileSystem, '/'), |
| ('file:///', LocalFileSystem, '/'), |
| ('file:/foo/bar', LocalFileSystem, '/foo/bar'), |
| ('file:///foo/bar', LocalFileSystem, '/foo/bar'), |
| ('/', LocalFileSystem, '/'), |
| ('/foo/bar', LocalFileSystem, '/foo/bar'), |
| ]) |
| def test_filesystem_from_uri(uri, expected_klass, expected_path): |
| fs, path = FileSystem.from_uri(uri) |
| assert isinstance(fs, expected_klass) |
| assert path == expected_path |
| |
| |
| @pytest.mark.parametrize( |
| 'path', |
| ['', '/', 'foo/bar', '/foo/bar', __file__] |
| ) |
| def test_filesystem_from_path_object(path): |
| p = pathlib.Path(path) |
| fs, path = FileSystem.from_uri(p) |
| assert isinstance(fs, LocalFileSystem) |
| assert path == p.resolve().absolute().as_posix() |
| |
| |
| @pytest.mark.s3 |
| def test_filesystem_from_uri_s3(s3_connection, s3_server): |
| from pyarrow.fs import S3FileSystem |
| |
| host, port, access_key, secret_key = s3_connection |
| |
| uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \ |
| .format(access_key, secret_key, host, port) |
| |
| fs, path = FileSystem.from_uri(uri) |
| assert isinstance(fs, S3FileSystem) |
| assert path == "mybucket/foo/bar" |
| |
| fs.create_dir(path) |
| [info] = fs.get_file_info([path]) |
| assert info.path == path |
| assert info.type == FileType.Directory |
| |
| |
| def test_py_filesystem(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| assert isinstance(fs, PyFileSystem) |
| assert fs.type_name == "py::dummy" |
| assert fs.handler is handler |
| |
| with pytest.raises(TypeError): |
| PyFileSystem(None) |
| |
| |
| def test_py_filesystem_equality(): |
| handler1 = DummyHandler(1) |
| handler2 = DummyHandler(2) |
| handler3 = DummyHandler(2) |
| fs1 = PyFileSystem(handler1) |
| fs2 = PyFileSystem(handler1) |
| fs3 = PyFileSystem(handler2) |
| fs4 = PyFileSystem(handler3) |
| |
| assert fs2 is not fs1 |
| assert fs3 is not fs2 |
| assert fs4 is not fs3 |
| assert fs2 == fs1 # Same handler |
| assert fs3 != fs2 # Unequal handlers |
| assert fs4 == fs3 # Equal handlers |
| |
| assert fs1 != LocalFileSystem() |
| assert fs1 != object() |
| |
| |
| def test_py_filesystem_pickling(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| |
| serialized = pickle.dumps(fs) |
| restored = pickle.loads(serialized) |
| assert isinstance(restored, FileSystem) |
| assert restored == fs |
| assert restored.handler == handler |
| assert restored.type_name == "py::dummy" |
| |
| |
| def test_py_filesystem_lifetime(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| assert isinstance(fs, PyFileSystem) |
| wr = weakref.ref(handler) |
| handler = None |
| assert wr() is not None |
| fs = None |
| assert wr() is None |
| |
| # Taking the .handler attribute doesn't wreck reference counts |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| wr = weakref.ref(handler) |
| handler = None |
| assert wr() is fs.handler |
| assert wr() is not None |
| fs = None |
| assert wr() is None |
| |
| |
| def test_py_filesystem_get_file_info(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| |
| [info] = fs.get_file_info(['some/dir']) |
| assert info.path == 'some/dir' |
| assert info.type == FileType.Directory |
| |
| [info] = fs.get_file_info(['some/file']) |
| assert info.path == 'some/file' |
| assert info.type == FileType.File |
| |
| [info] = fs.get_file_info(['notfound']) |
| assert info.path == 'notfound' |
| assert info.type == FileType.NotFound |
| |
| with pytest.raises(TypeError): |
| fs.get_file_info(['badtype']) |
| |
| with pytest.raises(IOError): |
| fs.get_file_info(['xxx']) |
| |
| |
| def test_py_filesystem_get_file_info_selector(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| |
| selector = FileSelector(base_dir="somedir") |
| infos = fs.get_file_info(selector) |
| assert len(infos) == 2 |
| assert infos[0].path == "somedir/file1" |
| assert infos[0].type == FileType.File |
| assert infos[0].size == 123 |
| assert infos[1].path == "somedir/subdir1" |
| assert infos[1].type == FileType.Directory |
| assert infos[1].size is None |
| |
| selector = FileSelector(base_dir="somedir", recursive=True) |
| infos = fs.get_file_info(selector) |
| assert len(infos) == 3 |
| assert infos[0].path == "somedir/file1" |
| assert infos[1].path == "somedir/subdir1" |
| assert infos[2].path == "somedir/subdir1/file2" |
| |
| selector = FileSelector(base_dir="notfound") |
| with pytest.raises(FileNotFoundError): |
| fs.get_file_info(selector) |
| |
| selector = FileSelector(base_dir="notfound", allow_not_found=True) |
| assert fs.get_file_info(selector) == [] |
| |
| |
| def test_py_filesystem_ops(): |
| handler = DummyHandler() |
| fs = PyFileSystem(handler) |
| |
| fs.create_dir("recursive", recursive=True) |
| fs.create_dir("non-recursive", recursive=False) |
| with pytest.raises(IOError): |
| fs.create_dir("foobar") |
| |
| fs.delete_dir("delete_dir") |
| fs.delete_dir_contents("delete_dir_contents") |
| for path in ("", "/", "//"): |
| with pytest.raises(ValueError): |
| fs.delete_dir_contents(path) |
| fs.delete_dir_contents(path, accept_root_dir=True) |
| fs.delete_file("delete_file") |
| fs.move("move_from", "move_to") |
| fs.copy_file("copy_file_from", "copy_file_to") |
| |
| |
| def test_py_open_input_stream(): |
| fs = PyFileSystem(DummyHandler()) |
| |
| with fs.open_input_stream("somefile") as f: |
| assert f.read() == b"somefile:input_stream" |
| with pytest.raises(FileNotFoundError): |
| fs.open_input_stream("notfound") |
| |
| |
| def test_py_open_input_file(): |
| fs = PyFileSystem(DummyHandler()) |
| |
| with fs.open_input_file("somefile") as f: |
| assert f.read() == b"somefile:input_file" |
| with pytest.raises(FileNotFoundError): |
| fs.open_input_file("notfound") |
| |
| |
| def test_py_open_output_stream(): |
| fs = PyFileSystem(DummyHandler()) |
| |
| with fs.open_output_stream("somefile") as f: |
| f.write(b"data") |
| |
| |
| def test_py_open_append_stream(): |
| fs = PyFileSystem(DummyHandler()) |
| |
| with fs.open_append_stream("somefile") as f: |
| f.write(b"data") |
| |
| |
| @pytest.mark.s3 |
| def test_s3_real_aws(): |
| # Exercise connection code with an AWS-backed S3 bucket. |
| # This is a minimal integration check for ARROW-9261 and similar issues. |
| from pyarrow.fs import S3FileSystem |
| default_region = (os.environ.get('PYARROW_TEST_S3_REGION') or |
| 'us-east-1') |
| fs = S3FileSystem(anonymous=True) |
| assert fs.region == default_region |
| |
| fs = S3FileSystem(anonymous=True, region='us-east-2') |
| entries = fs.get_file_info(FileSelector('ursa-labs-taxi-data')) |
| assert len(entries) > 0 |
| |
| |
| @pytest.mark.s3 |
| def test_s3_real_aws_region_selection(): |
| # Taken from a registry of open S3-hosted datasets |
| # at https://github.com/awslabs/open-data-registry |
| fs, path = FileSystem.from_uri('s3://mf-nwp-models/README.txt') |
| assert fs.region == 'eu-west-1' |
| with fs.open_input_stream(path) as f: |
| assert b"Meteo-France Atmospheric models on AWS" in f.read(50) |
| |
| # Passing an explicit region disables auto-selection |
| fs, path = FileSystem.from_uri( |
| 's3://mf-nwp-models/README.txt?region=us-east-2') |
| assert fs.region == 'us-east-2' |
| # Reading from the wrong region may still work for public buckets... |
| |
| # Non-existent bucket (hopefully, otherwise need to fix this test) |
| with pytest.raises(IOError, match="Bucket '.*' not found"): |
| FileSystem.from_uri('s3://x-arrow-non-existent-bucket') |
| fs, path = FileSystem.from_uri( |
| 's3://x-arrow-non-existent-bucket?region=us-east-3') |
| assert fs.region == 'us-east-3' |