blob: 392fa60af7ffd8e393f1bcf45f9dc04039fecd20 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import pickle
import tempfile
import threading
import uuid
from unittest import mock
import pytest
from botocore.awsrequest import AWSRequest
from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractFileSystem
from requests_mock import Mocker
from pyiceberg.catalog.rest.auth import AUTH_MANAGER
from pyiceberg.exceptions import SignError
from pyiceberg.io import fsspec
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.typedef import Properties
from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES
def test_fsspec_infer_local_fs_from_path(fsspec_fileio: FsspecFileIO) -> None:
"""Test path with `file` scheme and no scheme both use LocalFileSystem"""
assert isinstance(fsspec_fileio.new_output("file://tmp/warehouse")._fs, LocalFileSystem)
assert isinstance(fsspec_fileio.new_output("/tmp/warehouse")._fs, LocalFileSystem)
def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: FsspecFileIO) -> None:
"""Test LocalFileSystem can create path without first creating the parent directories"""
with tempfile.TemporaryDirectory() as tmpdirname:
file_path = f"{tmpdirname}/foo/bar/baz.txt"
output_file = fsspec_fileio.new_output(file_path)
parent_path = os.path.dirname(file_path)
assert output_file._fs.exists(parent_path) is False
try:
with output_file.create() as f:
f.write(b"foo")
except Exception:
pytest.fail("Failed to write to file without parent directory")
def test_fsspec_get_fs_instance_per_thread_caching(fsspec_fileio: FsspecFileIO) -> None:
"""Test that filesystem instances are cached per-thread by `FsspecFileIO.get_fs`"""
fs_instances: list[AbstractFileSystem] = []
start_work_events: list[threading.Event] = [threading.Event() for _ in range(2)]
def get_fs(start_work_event: threading.Event) -> None:
# Wait to be told to actually start getting the filesystem instances
start_work_event.wait()
# Call twice to ensure caching within the same thread
for _ in range(2):
fs_instances.append(fsspec_fileio.get_fs("file"))
threads = [threading.Thread(target=get_fs, args=[start_work_event]) for start_work_event in start_work_events]
# Start both threads (which will immediately block on their `Event`s) as we want to ensure distinct
# `threading.get_ident()` values that are used in the `fsspec.spec.AbstractFileSystem`s cache keys..
for thread in threads:
thread.start()
# Get the filesystem instances in the first thread and wait for completion
start_work_events[0].set()
threads[0].join()
# Get the filesystem instances in the second thread and wait for completion
start_work_events[1].set()
threads[1].join()
# Same thread, same instance
assert fs_instances[0] is fs_instances[1]
assert fs_instances[2] is fs_instances[3]
# Different threads, different instances
assert fs_instances[0] is not fs_instances[2]
@pytest.mark.s3
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new input file from a fsspec file-io"""
filename = str(uuid.uuid4())
input_file = fsspec_fileio.new_input(f"s3://warehouse/{filename}")
assert isinstance(input_file, fsspec.FsspecInputFile)
assert input_file.location == f"s3://warehouse/{filename}"
@pytest.mark.s3
def test_fsspec_new_s3_output_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new output file from an fsspec file-io"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(f"s3://warehouse/{filename}")
assert isinstance(output_file, fsspec.FsspecOutputFile)
assert output_file.location == f"s3://warehouse/{filename}"
@pytest.mark.s3
def test_fsspec_write_and_read_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test writing and reading a file using FsspecInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
with output_file.create() as f:
f.write(b"foo")
input_file = fsspec_fileio.new_input(f"s3://warehouse/{filename}")
assert input_file.open().read() == b"foo"
fsspec_fileio.delete(input_file)
@pytest.mark.s3
def test_fsspec_getting_length_of_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test getting the length of an FsspecInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
with output_file.create() as f:
f.write(b"foobar")
assert len(output_file) == 6
input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
assert len(input_file) == 6
fsspec_fileio.delete(output_file)
@pytest.mark.s3
def test_fsspec_file_tell(fsspec_fileio: FsspecFileIO) -> None:
"""Test finding cursor position for an fsspec file-io file"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
with output_file.create() as write_file:
write_file.write(b"foobar")
input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
f = input_file.open()
f.seek(0)
assert f.tell() == 0
f.seek(1)
assert f.tell() == 1
f.seek(3)
assert f.tell() == 3
f.seek(0)
assert f.tell() == 0
@pytest.mark.s3
def test_fsspec_read_specified_bytes_for_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test reading a specified number of bytes from an fsspec file-io file"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")
input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
f = input_file.open()
f.seek(0)
assert b"f" == f.read(1)
f.seek(0)
assert b"fo" == f.read(2)
f.seek(1)
assert b"o" == f.read(1)
f.seek(1)
assert b"oo" == f.read(2)
f.seek(0)
assert b"foo" == f.read(999) # test reading amount larger than entire content length
fsspec_fileio.delete(input_file)
@pytest.mark.s3
def test_fsspec_raise_on_opening_file_not_found(fsspec_fileio: FsspecFileIO) -> None:
"""Test that an fsspec input file raises appropriately when the s3 file is not found"""
filename = str(uuid.uuid4())
input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
with pytest.raises(FileNotFoundError) as exc_info:
input_file.open().read()
assert filename in str(exc_info.value)
@pytest.mark.s3
def test_checking_if_a_file_exists(fsspec_fileio: FsspecFileIO) -> None:
"""Test checking if a file exists"""
non_existent_file = fsspec_fileio.new_input(location="s3://warehouse/does-not-exist.txt")
assert not non_existent_file.exists()
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
assert not output_file.exists()
with output_file.create() as f:
f.write(b"foo")
existing_input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
assert existing_input_file.exists()
existing_output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
assert existing_output_file.exists()
fsspec_fileio.delete(existing_output_file)
@pytest.mark.s3
def test_closing_a_file(fsspec_fileio: FsspecFileIO) -> None:
"""Test closing an output file and input file"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")
assert not write_file.closed # type: ignore
assert write_file.closed # type: ignore
input_file = fsspec_fileio.new_input(location=f"s3://warehouse/{filename}")
f = input_file.open()
assert not f.closed # type: ignore
f.close()
assert f.closed # type: ignore
fsspec_fileio.delete(f"s3://warehouse/{filename}")
@pytest.mark.s3
def test_fsspec_converting_an_outputfile_to_an_inputfile(fsspec_fileio: FsspecFileIO) -> None:
"""Test converting an output file to an input file"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio.new_output(location=f"s3://warehouse/{filename}")
input_file = output_file.to_input_file()
assert input_file.location == output_file.location
@pytest.mark.s3
def test_writing_avro_file(generated_manifest_entry_file: str, fsspec_fileio: FsspecFileIO) -> None:
"""Test that bytes match when reading a local avro file, writing it using fsspec file-io, and then reading it again"""
filename = str(uuid.uuid4())
with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f:
b1 = f.read()
with fsspec_fileio.new_output(location=f"s3://warehouse/{filename}").create() as out_f:
out_f.write(b1)
with fsspec_fileio.new_input(location=f"s3://warehouse/{filename}").open() as in_f:
b2 = in_f.read()
assert b1 == b2 # Check that bytes of read from local avro file match bytes written to s3
fsspec_fileio.delete(f"s3://warehouse/{filename}")
@pytest.mark.s3
def test_fsspec_pickle_round_trip_s3(fsspec_fileio: FsspecFileIO) -> None:
_test_fsspec_pickle_round_trip(fsspec_fileio, "s3://warehouse/foo.txt")
def test_fsspec_s3_session_properties() -> None:
session_properties: Properties = {
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.region": "us-east-1",
"s3.session-token": "s3.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
s3_fileio = FsspecFileIO(properties=session_properties)
filename = str(uuid.uuid4())
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
mock_s3fs.assert_called_with(
anon=False,
client_kwargs={
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "admin",
"aws_secret_access_key": "password",
"region_name": "us-east-1",
"aws_session_token": "s3.session-token",
},
config_kwargs={},
)
def test_fsspec_s3_session_properties_force_virtual_addressing() -> None:
session_properties: Properties = {
"s3.force-virtual-addressing": True,
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.region": "us-east-1",
"s3.session-token": "s3.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
s3_fileio = FsspecFileIO(properties=session_properties)
filename = str(uuid.uuid4())
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
mock_s3fs.assert_called_with(
anon=False,
client_kwargs={
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "admin",
"aws_secret_access_key": "password",
"region_name": "us-east-1",
"aws_session_token": "s3.session-token",
},
config_kwargs={"s3": {"addressing_style": "virtual"}},
)
def test_fsspec_s3_session_properties_with_anonymous() -> None:
session_properties: Properties = {
"s3.anonymous": "true",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.region": "us-east-1",
"s3.session-token": "s3.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
s3_fileio = FsspecFileIO(properties=session_properties)
filename = str(uuid.uuid4())
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
mock_s3fs.assert_called_with(
anon=True,
client_kwargs={
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "admin",
"aws_secret_access_key": "password",
"region_name": "us-east-1",
"aws_session_token": "s3.session-token",
},
config_kwargs={},
)
def test_fsspec_unified_session_properties() -> None:
session_properties: Properties = {
"s3.endpoint": "http://localhost:9000",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
s3_fileio = FsspecFileIO(properties=session_properties)
filename = str(uuid.uuid4())
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
mock_s3fs.assert_called_with(
anon=False,
client_kwargs={
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "client.access-key-id",
"aws_secret_access_key": "client.secret-access-key",
"region_name": "client.region",
"aws_session_token": "client.session-token",
},
config_kwargs={},
)
@pytest.mark.adls
def test_fsspec_new_input_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new input file from an fsspec file-io"""
filename = str(uuid.uuid4())
input_file = adls_fsspec_fileio.new_input(f"abfss://tests/{filename}")
assert isinstance(input_file, fsspec.FsspecInputFile)
assert input_file.location == f"abfss://tests/{filename}"
@pytest.mark.adls
def test_fsspec_new_abfss_output_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new output file from an fsspec file-io"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(f"abfss://tests/{filename}")
assert isinstance(output_file, fsspec.FsspecOutputFile)
assert output_file.location == f"abfss://tests/{filename}"
@pytest.mark.adls
def test_fsspec_write_and_read_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test writing and reading a file using FsspecInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as f:
f.write(b"foo")
input_file = adls_fsspec_fileio.new_input(f"abfss://tests/{filename}")
assert input_file.open().read() == b"foo"
adls_fsspec_fileio.delete(input_file)
@pytest.mark.adls
def test_fsspec_getting_length_of_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test getting the length of an FsspecInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as f:
f.write(b"foobar")
assert len(output_file) == 6
input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
assert len(input_file) == 6
adls_fsspec_fileio.delete(output_file)
@pytest.mark.adls
def test_fsspec_file_tell_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test finding cursor position for an fsspec file-io file"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foobar")
input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()
f.seek(0)
assert f.tell() == 0
f.seek(1)
assert f.tell() == 1
f.seek(3)
assert f.tell() == 3
f.seek(0)
assert f.tell() == 0
adls_fsspec_fileio.delete(f"abfss://tests/{filename}")
@pytest.mark.adls
def test_fsspec_read_specified_bytes_for_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test reading a specified number of bytes from an fsspec file-io file"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")
input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()
f.seek(0)
assert b"f" == f.read(1)
f.seek(0)
assert b"fo" == f.read(2)
f.seek(1)
assert b"o" == f.read(1)
f.seek(1)
assert b"oo" == f.read(2)
f.seek(0)
assert b"foo" == f.read(999) # test reading amount larger than entire content length
adls_fsspec_fileio.delete(input_file)
@pytest.mark.adls
def test_fsspec_raise_on_opening_file_not_found_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test that an fsspec input file raises appropriately when the adls file is not found"""
filename = str(uuid.uuid4())
input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
with pytest.raises(FileNotFoundError) as exc_info:
input_file.open().read()
assert filename in str(exc_info.value)
@pytest.mark.adls
def test_checking_if_a_file_exists_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test checking if a file exists"""
non_existent_file = adls_fsspec_fileio.new_input(location="abfss://tests/does-not-exist.txt")
assert not non_existent_file.exists()
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
assert not output_file.exists()
with output_file.create() as f:
f.write(b"foo")
existing_input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
assert existing_input_file.exists()
existing_output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
assert existing_output_file.exists()
adls_fsspec_fileio.delete(existing_output_file)
@pytest.mark.adls
def test_closing_a_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test closing an output file and input file"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")
assert not write_file.closed # type: ignore
assert write_file.closed # type: ignore
input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()
assert not f.closed # type: ignore
f.close()
assert f.closed # type: ignore
adls_fsspec_fileio.delete(f"abfss://tests/{filename}")
@pytest.mark.adls
def test_fsspec_converting_an_outputfile_to_an_inputfile_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test converting an output file to an input file"""
filename = str(uuid.uuid4())
output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}")
input_file = output_file.to_input_file()
assert input_file.location == output_file.location
@pytest.mark.adls
def test_writing_avro_file_adls(generated_manifest_entry_file: str, adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test that bytes match when reading a local avro file, writing it using fsspec file-io, and then reading it again"""
filename = str(uuid.uuid4())
with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f:
b1 = f.read()
with adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}").create() as out_f:
out_f.write(b1)
with adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}").open() as in_f:
b2 = in_f.read()
assert b1 == b2 # Check that bytes of read from local avro file match bytes written to adls
adls_fsspec_fileio.delete(f"abfss://tests/{filename}")
@pytest.mark.adls
def test_fsspec_pickle_round_trip_aldfs(adls_fsspec_fileio: FsspecFileIO) -> None:
_test_fsspec_pickle_round_trip(adls_fsspec_fileio, "abfss://tests/foo.txt")
@pytest.mark.adls
def test_adls_account_name_sas_token_extraction() -> None:
session_properties: Properties = {
"adls.tenant-id": "test-tenant-id",
"adls.account-host": "testaccount.dfs.core.windows.net",
"adls.sas-token.testaccount.dfs.core.windows.net": "test-sas-token",
"adls.sas-token-expires-at-ms.testaccount.dfs.core.windows.net": "1757597218121",
}
with mock.patch("adlfs.AzureBlobFileSystem") as mock_adlfs:
adls_fileio = FsspecFileIO(properties=session_properties)
filename = str(uuid.uuid4())
adls_fileio.new_input(location=f"abfss://tests/{filename}")
mock_adlfs.assert_called_with(
connection_string=None,
credential=None,
account_name="testaccount",
account_key=None,
sas_token="test-sas-token",
tenant_id="test-tenant-id",
client_id=None,
client_secret=None,
account_host="testaccount.dfs.core.windows.net",
anon=None,
)
@pytest.mark.gcs
def test_fsspec_new_input_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test creating a new input file from a fsspec file-io"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
input_file = fsspec_fileio_gcs.new_input(location=location)
assert isinstance(input_file, fsspec.FsspecInputFile)
assert input_file.location == location
@pytest.mark.gcs
def test_fsspec_new_output_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test creating a new output file from an fsspec file-io"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
assert isinstance(output_file, fsspec.FsspecOutputFile)
assert output_file.location == location
@pytest.mark.gcs
def test_fsspec_write_and_read_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test writing and reading a file using FsspecInputFile and FsspecOutputFile"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
with output_file.create() as f:
f.write(b"foo")
input_file = fsspec_fileio_gcs.new_input(location)
with input_file.open() as f:
assert f.read() == b"foo"
fsspec_fileio_gcs.delete(input_file)
@pytest.mark.gcs
def test_fsspec_getting_length_of_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test getting the length of an FsspecInputFile and FsspecOutputFile"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
with output_file.create() as f:
f.write(b"foobar")
assert len(output_file) == 6
input_file = fsspec_fileio_gcs.new_input(location=location)
assert len(input_file) == 6
fsspec_fileio_gcs.delete(output_file)
@pytest.mark.gcs
def test_fsspec_file_tell_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test finding cursor position for an fsspec file-io file"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
with output_file.create() as write_file:
write_file.write(b"foobar")
input_file = fsspec_fileio_gcs.new_input(location=location)
with input_file.open() as f:
f.seek(0)
assert f.tell() == 0
f.seek(1)
assert f.tell() == 1
f.seek(3)
assert f.tell() == 3
f.seek(0)
assert f.tell() == 0
@pytest.mark.gcs
def test_fsspec_read_specified_bytes_for_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test reading a specified number of bytes from a fsspec file-io file"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
with output_file.create() as write_file:
write_file.write(b"foo")
input_file = fsspec_fileio_gcs.new_input(location=location)
with input_file.open() as f:
f.seek(0)
assert b"f" == f.read(1)
f.seek(0)
assert b"fo" == f.read(2)
f.seek(1)
assert b"o" == f.read(1)
f.seek(1)
assert b"oo" == f.read(2)
f.seek(0)
assert b"foo" == f.read(999) # test reading amount larger than entire content length
fsspec_fileio_gcs.delete(input_file)
@pytest.mark.gcs
def test_fsspec_raise_on_opening_file_not_found_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test that a fsspec input file raises appropriately when the gcs file is not found"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
input_file = fsspec_fileio_gcs.new_input(location=location)
with pytest.raises(FileNotFoundError) as exc_info:
input_file.open().read()
assert location in str(exc_info.value)
@pytest.mark.gcs
def test_checking_if_a_file_exists_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test checking if a file exists"""
non_existent_file = fsspec_fileio_gcs.new_input(location="gs://warehouse/does-not-exist.txt")
assert not non_existent_file.exists()
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
assert not output_file.exists()
with output_file.create() as f:
f.write(b"foo")
existing_input_file = fsspec_fileio_gcs.new_input(location=location)
assert existing_input_file.exists()
existing_output_file = fsspec_fileio_gcs.new_output(location=location)
assert existing_output_file.exists()
fsspec_fileio_gcs.delete(existing_output_file)
@pytest.mark.gcs
def test_closing_a_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test closing an output file and input file"""
location = f"gs://warehouse/{uuid.uuid4()}.txt"
output_file = fsspec_fileio_gcs.new_output(location=location)
with output_file.create() as write_file:
write_file.write(b"foo")
assert not write_file.closed # type: ignore
assert write_file.closed # type: ignore
input_file = fsspec_fileio_gcs.new_input(location=location)
f = input_file.open()
assert not f.closed # type: ignore
f.close()
assert f.closed # type: ignore
fsspec_fileio_gcs.delete(location=location)
@pytest.mark.gcs
def test_fsspec_converting_an_outputfile_to_an_inputfile_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test converting an output file to an input file"""
filename = str(uuid.uuid4())
output_file = fsspec_fileio_gcs.new_output(location=f"gs://warehouse/{filename}")
input_file = output_file.to_input_file()
assert input_file.location == output_file.location
@pytest.mark.gcs
def test_writing_avro_file_gcs(generated_manifest_entry_file: str, fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test that bytes match when reading a local avro file, writing it using fsspec file-io, and then reading it again"""
filename = str(uuid.uuid4())
with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f:
b1 = f.read()
with fsspec_fileio_gcs.new_output(location=f"gs://warehouse/{filename}").create() as out_f:
out_f.write(b1)
with fsspec_fileio_gcs.new_input(location=f"gs://warehouse/{filename}").open() as in_f:
b2 = in_f.read()
assert b1 == b2 # Check that bytes of read from local avro file match bytes written to s3
fsspec_fileio_gcs.delete(f"gs://warehouse/{filename}")
@pytest.mark.gcs
def test_fsspec_pickle_roundtrip_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
_test_fsspec_pickle_round_trip(fsspec_fileio_gcs, "gs://warehouse/foo.txt")
def _test_fsspec_pickle_round_trip(fsspec_fileio: FsspecFileIO, location: str) -> None:
serialized_file_io = pickle.dumps(fsspec_fileio)
deserialized_file_io = pickle.loads(serialized_file_io)
output_file = deserialized_file_io.new_output(location)
with output_file.create() as f:
f.write(b"foo")
input_file = deserialized_file_io.new_input(location)
with input_file.open() as f:
data = f.read()
assert data == b"foo"
assert len(input_file) == 3
deserialized_file_io.delete(location)
TEST_URI = "https://iceberg-test-signer"
def test_s3v4_rest_signer(requests_mock: Mocker) -> None:
new_uri = "https://other-bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro"
requests_mock.post(
f"{TEST_URI}/v1/aws/s3/sign",
json={
"uri": new_uri,
"headers": {
"Authorization": [
"AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, "
"SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, "
"Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02"
],
"Host": ["bucket.s3.us-west-2.amazonaws.com"],
"User-Agent": ["Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"],
"x-amz-content-sha256": ["UNSIGNED-PAYLOAD"],
"X-Amz-Date": ["20221017T102940Z"],
"X-Amz-Security-Token": [
"YQoJb3JpZ2luX2VjEDoaCXVzLXdlc3QtMiJGMEQCID/fFxZP5oaEgQmcwP6XhZa0xSq9lmLSx8ffaWbySfUPAiAesa7sjd/WV4uwRTO0S03y/MWVtgpH+/NyZQ4bZgLVriqrAggTEAEaDDAzMzQwNzIyMjE1OSIMOeFOWhZIurMmAqjsKogCxMCqxX8ZjK0gacAkcDqBCyA7qTSLhdfKQIH/w7WpLBU1km+cRUWWCudan6gZsAq867DBaKEP7qI05DAWr9MChAkgUgyI8/G3Z23ET0gAedf3GsJbakB0F1kklx8jPmj4BPCht9RcTiXiJ5DxTS/cRCcalIQXmPFbaJSqpBusVG2EkWnm1v7VQrNPE2Os2b2P293vpbhwkyCEQiGRVva4Sw9D1sKvqSsK10QCRG+os6dFEOu1kARaXi6pStvR4OVmj7OYeAYjzaFchn7nz2CSae0M4IluiYQ01eQAywbfRo9DpKSmDM/DnPZWJnD/woLhaaaCrCxSSEaFsvGOHFhLd3Rknw1v0jADMILUtJoGOp4BpqKqyMz0CY3kpKL0jfR3ykTf/ge9wWVE0Alr7wRIkGCIURkhslGHqSyFRGoTqIXaxU+oPbwlw/0w/nYO7qQ6bTANOWye/wgw4h/NmJ6vU7wnZTXwREf1r6MF72++bE/fMk19LfVb8jN/qrUqAUXTc8gBAUxL5pgy8+oT/JnI2BkVrrLS4ilxEXP9Ahm+6GDUYXV4fBpqpZwdkzQ/5Gw="
],
"X-Custom-Header": ["value"],
},
"extensions": {},
},
status_code=200,
)
request = AWSRequest(
method="HEAD",
url="https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
data=b"",
params={},
auth_path="/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
)
request.context = {
"client_region": "us-west-2",
"has_streaming_input": False,
"auth_type": None,
"signing": {"bucket": "bucket"},
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
}
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "header.X-Custom-Header": "value"})
signer(request)
assert request.url == new_uri
assert dict(request.headers) == {
"Authorization": (
"AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, "
"SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, "
"Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02"
),
"Host": "bucket.s3.us-west-2.amazonaws.com",
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
"X-Amz-Date": "20221017T102940Z",
"X-Amz-Security-Token": (
"YQoJb3JpZ2luX2VjEDoaCXVzLXdlc3QtMiJGMEQCID/fFxZP5oaEgQmcwP6XhZa0xSq9lmLSx8ffaWbySfUPAiAesa7sjd"
"/WV4uwRTO0S03y/MWVtgpH+/NyZQ4bZgLVriqrAggTEAEaDDAzMzQwNzIyMjE1OSIMOeFOWhZIurMmAqjsKogCxMCqxX8Z"
"jK0gacAkcDqBCyA7qTSLhdfKQIH/w7WpLBU1km+cRUWWCudan6gZsAq867DBaKEP7qI05DAWr9MChAkgUgyI8/G3Z23ET0"
"gAedf3GsJbakB0F1kklx8jPmj4BPCht9RcTiXiJ5DxTS/cRCcalIQXmPFbaJSqpBusVG2EkWnm1v7VQrNPE2Os2b2P293v"
"pbhwkyCEQiGRVva4Sw9D1sKvqSsK10QCRG+os6dFEOu1kARaXi6pStvR4OVmj7OYeAYjzaFchn7nz2CSae0M4IluiYQ01e"
"QAywbfRo9DpKSmDM/DnPZWJnD/woLhaaaCrCxSSEaFsvGOHFhLd3Rknw1v0jADMILUtJoGOp4BpqKqyMz0CY3kpKL0jfR3"
"ykTf/ge9wWVE0Alr7wRIkGCIURkhslGHqSyFRGoTqIXaxU+oPbwlw/0w/nYO7qQ6bTANOWye/wgw4h/NmJ6vU7wnZTXwRE"
"f1r6MF72++bE/fMk19LfVb8jN/qrUqAUXTc8gBAUxL5pgy8+oT/JnI2BkVrrLS4ilxEXP9Ahm+6GDUYXV4fBpqpZwdkzQ/"
"5Gw="
),
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
"X-Custom-Header": "value",
}
def test_s3v4_rest_signer_endpoint(requests_mock: Mocker) -> None:
new_uri = "https://other-bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro"
endpoint = "v1/main/s3-sign/foo.bar?e=e&b=b&k=k=k&s=s&w=w"
requests_mock.post(
f"{TEST_URI}/{endpoint}",
json={
"uri": new_uri,
"headers": {
"Authorization": [
"AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, "
"SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, "
"Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02"
],
"Host": ["bucket.s3.us-west-2.amazonaws.com"],
"User-Agent": ["Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"],
"x-amz-content-sha256": ["UNSIGNED-PAYLOAD"],
"X-Amz-Date": ["20221017T102940Z"],
"X-Amz-Security-Token": [
"YQoJb3JpZ2luX2VjEDoaCXVzLXdlc3QtMiJGMEQCID/fFxZP5oaEgQmcwP6XhZa0xSq9lmLSx8ffaWbySfUPAiAesa7sjd/WV4uwRTO0S03y/MWVtgpH+/NyZQ4bZgLVriqrAggTEAEaDDAzMzQwNzIyMjE1OSIMOeFOWhZIurMmAqjsKogCxMCqxX8ZjK0gacAkcDqBCyA7qTSLhdfKQIH/w7WpLBU1km+cRUWWCudan6gZsAq867DBaKEP7qI05DAWr9MChAkgUgyI8/G3Z23ET0gAedf3GsJbakB0F1kklx8jPmj4BPCht9RcTiXiJ5DxTS/cRCcalIQXmPFbaJSqpBusVG2EkWnm1v7VQrNPE2Os2b2P293vpbhwkyCEQiGRVva4Sw9D1sKvqSsK10QCRG+os6dFEOu1kARaXi6pStvR4OVmj7OYeAYjzaFchn7nz2CSae0M4IluiYQ01eQAywbfRo9DpKSmDM/DnPZWJnD/woLhaaaCrCxSSEaFsvGOHFhLd3Rknw1v0jADMILUtJoGOp4BpqKqyMz0CY3kpKL0jfR3ykTf/ge9wWVE0Alr7wRIkGCIURkhslGHqSyFRGoTqIXaxU+oPbwlw/0w/nYO7qQ6bTANOWye/wgw4h/NmJ6vU7wnZTXwREf1r6MF72++bE/fMk19LfVb8jN/qrUqAUXTc8gBAUxL5pgy8+oT/JnI2BkVrrLS4ilxEXP9Ahm+6GDUYXV4fBpqpZwdkzQ/5Gw="
],
},
"extensions": {},
},
status_code=200,
)
request = AWSRequest(
method="HEAD",
url="https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
data=b"",
params={},
auth_path="/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
)
request.context = {
"client_region": "us-west-2",
"has_streaming_input": False,
"auth_type": None,
"signing": {"bucket": "bucket"},
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
}
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "s3.signer.endpoint": endpoint})
signer(request)
assert request.url == new_uri
assert dict(request.headers) == {
"Authorization": (
"AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, "
"SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, "
"Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02"
),
"Host": "bucket.s3.us-west-2.amazonaws.com",
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
"X-Amz-Date": "20221017T102940Z",
"X-Amz-Security-Token": (
"YQoJb3JpZ2luX2VjEDoaCXVzLXdlc3QtMiJGMEQCID/fFxZP5oaEgQmcwP6XhZa0xSq9lmLSx8ffaWbySfUPAiAesa7sjd"
"/WV4uwRTO0S03y/MWVtgpH+/NyZQ4bZgLVriqrAggTEAEaDDAzMzQwNzIyMjE1OSIMOeFOWhZIurMmAqjsKogCxMCqxX8Z"
"jK0gacAkcDqBCyA7qTSLhdfKQIH/w7WpLBU1km+cRUWWCudan6gZsAq867DBaKEP7qI05DAWr9MChAkgUgyI8/G3Z23ET0"
"gAedf3GsJbakB0F1kklx8jPmj4BPCht9RcTiXiJ5DxTS/cRCcalIQXmPFbaJSqpBusVG2EkWnm1v7VQrNPE2Os2b2P293v"
"pbhwkyCEQiGRVva4Sw9D1sKvqSsK10QCRG+os6dFEOu1kARaXi6pStvR4OVmj7OYeAYjzaFchn7nz2CSae0M4IluiYQ01e"
"QAywbfRo9DpKSmDM/DnPZWJnD/woLhaaaCrCxSSEaFsvGOHFhLd3Rknw1v0jADMILUtJoGOp4BpqKqyMz0CY3kpKL0jfR3"
"ykTf/ge9wWVE0Alr7wRIkGCIURkhslGHqSyFRGoTqIXaxU+oPbwlw/0w/nYO7qQ6bTANOWye/wgw4h/NmJ6vU7wnZTXwRE"
"f1r6MF72++bE/fMk19LfVb8jN/qrUqAUXTc8gBAUxL5pgy8+oT/JnI2BkVrrLS4ilxEXP9Ahm+6GDUYXV4fBpqpZwdkzQ/"
"5Gw="
),
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
}
def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None:
requests_mock.post(
f"{TEST_URI}/v1/aws/s3/sign",
json={
"method": "HEAD",
"region": "us-west-2",
"uri": "https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
"headers": {"User-Agent": ["Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"]},
},
status_code=401,
)
request = AWSRequest(
method="HEAD",
url="https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
data=b"",
params={},
auth_path="/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro",
)
request.context = {
"client_region": "us-west-2",
"has_streaming_input": False,
"auth_type": None,
"signing": {"bucket": "bucket"},
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
}
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI})
with pytest.raises(SignError) as exc_info:
signer(request)
assert (
"Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', "
"'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', "
"'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}" in str(exc_info.value)
)
def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None:
new_uri = "https://bucket/metadata/snap-signed.avro"
requests_mock.post(
f"{TEST_URI}/v1/aws/s3/sign",
json={
"uri": new_uri,
"headers": {
"Authorization": ["AWS4-HMAC-SHA256 Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"],
"Host": ["bucket.s3.us-west-2.amazonaws.com"],
},
"extensions": {},
},
status_code=200,
)
request = AWSRequest(
method="HEAD",
url="https://bucket/metadata/snap-foo.avro",
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
data=b"",
params={},
auth_path="/metadata/snap-foo.avro",
)
request.context = {
"client_region": "us-west-2",
"has_streaming_input": False,
"auth_type": None,
"signing": {"bucket": "bucket"},
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
}
class DummyAuthManager:
def __init__(self) -> None:
self.calls = 0
def auth_header(self) -> str:
self.calls += 1
return "Bearer via-manager"
auth_manager = DummyAuthManager()
signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri": TEST_URI})
signer(request)
assert auth_manager.calls == 1
assert requests_mock.last_request is not None
assert requests_mock.last_request.headers["Authorization"] == "Bearer via-manager"
assert request.url == new_uri