blob: 4efa64326a0ec9612778fea9c5aa3a4066876f31 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Optional
import pytest
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.types import NestedField, StringType
PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field")
PARTITION_KEY = PartitionKey(
field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")],
partition_spec=PartitionSpec(PARTITION_FIELD),
schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)),
)
class CustomLocationProvider(LocationProvider):
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
return f"custom_location_provider/{data_file_name}"
def test_simple_location_provider_no_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"})
assert provider.new_data_location("my_file") == "table_location/data/my_file"
def test_simple_location_provider_with_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "false"})
assert provider.new_data_location("my_file", PARTITION_KEY) == "table_location/data/string_field=example_string/my_file"
def test_custom_location_provider() -> None:
qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__
provider = load_location_provider(
table_location="table_location", table_properties={"write.py-location-provider.impl": qualified_name}
)
assert provider.new_data_location("my_file") == "custom_location_provider/my_file"
def test_custom_location_provider_single_path() -> None:
with pytest.raises(ValueError, match=r"write\.py-location-provider\.impl should be full path"):
load_location_provider(table_location="table_location", table_properties={"write.py-location-provider.impl": "not_found"})
def test_custom_location_provider_not_found(caplog: Any) -> None:
with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"):
load_location_provider(
table_location="table_location", table_properties={"write.py-location-provider.impl": "module.not_found"}
)
assert "ModuleNotFoundError: No module named 'module'" in caplog.text
def test_object_storage_no_partition() -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})
location = provider.new_data_location("test.parquet")
parts = location.split("/")
assert len(parts) == 7
assert parts[0] == "table_location"
assert parts[1] == "data"
assert parts[-1] == "test.parquet"
# Entropy directories in the middle
for dir_name in parts[2:-1]:
assert dir_name
assert all(c in "01" for c in dir_name)
def test_object_storage_with_partition() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={"write.object-storage.enabled": "true"},
)
location = provider.new_data_location("test.parquet", PARTITION_KEY)
# Partition values AND entropy included in the path. Entropy differs to that in the test below because the partition
# key AND the data file name are used as the hash input. This matches Java behaviour; the hash below is what the
# Java implementation produces for this input too.
assert location == "table_location/data/0001/0010/1001/00000011/string_field=example_string/test.parquet"
# NB: We test here with None partition key too because disabling partitioned paths still replaces final / with - even in
# paths of un-partitioned files. This matches the behaviour of the Java implementation.
@pytest.mark.parametrize("partition_key", [PARTITION_KEY, None])
def test_object_storage_partitioned_paths_disabled(partition_key: Optional[PartitionKey]) -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
"write.object-storage.enabled": "true",
"write.object-storage.partitioned-paths": "false",
},
)
location = provider.new_data_location("test.parquet", partition_key)
# No partition values included in the path and last part of entropy is separated with "-"
assert location == "table_location/data/0110/1010/0011/11101000-test.parquet"
@pytest.mark.parametrize(
["data_file_name", "expected_hash"],
[
("a", "0101/0110/1001/10110010"),
("b", "1110/0111/1110/00000011"),
("c", "0010/1101/0110/01011111"),
("d", "1001/0001/0100/01110011"),
],
)
def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"})
assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"
def test_object_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="s3://table-location/table",
table_properties={
"write.object-storage.enabled": "true",
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
},
)
assert (
provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/0010/1111/0101/11011101/file.parquet"
)
def test_simple_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
"write.object-storage.enabled": "false",
},
)
assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"
def test_location_provider_metadata_default_location() -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)
assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"
def test_location_provider_metadata_location_with_custom_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
)
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"
def test_metadata_location_with_trailing_slash() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
)
assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"