blob: 223c6d2f9e77004785288be7cbf6e017c9fbb609 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,unused-argument
import base64
import os
from typing import Any, Callable, Dict, cast
from unittest import mock
import pytest
from requests.exceptions import HTTPError
from requests_mock import Mocker
import pyiceberg
from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
from pyiceberg.catalog.rest import OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, RestCatalog
from pyiceberg.exceptions import (
AuthorizationExpiredError,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIdentifierError,
NoSuchNamespaceError,
NoSuchTableError,
NoSuchViewError,
OAuthError,
ServerError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import RecursiveDict
from pyiceberg.utils.config import Config
TEST_URI = "https://iceberg-test-catalog/"
TEST_CREDENTIALS = "client:secret"
TEST_OAUTH2_SERVER_URI = "https://auth-endpoint/"
TEST_TOKEN = "some_jwt_token"
TEST_SCOPE = "openid_offline_corpds_ds_profile"
TEST_AUDIENCE = "test_audience"
TEST_RESOURCE = "test_resource"
TEST_HEADERS = {
"Content-type": "application/json",
"User-Agent": f"PyIceberg/{pyiceberg.__version__}",
"Authorization": f"Bearer {TEST_TOKEN}",
"X-Iceberg-Access-Delegation": "vended-credentials",
}
OAUTH_TEST_HEADERS = {
"Content-type": "application/x-www-form-urlencoded",
}
@pytest.fixture
def example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1: Dict[str, Any]) -> Dict[str, Any]:
return {
"metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
"metadata": example_table_metadata_with_snapshot_v1,
"config": {
"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory",
"region": "us-west-2",
},
}
@pytest.fixture
def example_table_metadata_with_no_location(example_table_metadata_with_snapshot_v1: Dict[str, Any]) -> Dict[str, Any]:
return {
"metadata": example_table_metadata_with_snapshot_v1,
"config": {
"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory",
"region": "us-west-2",
},
}
@pytest.fixture
def example_table_metadata_no_snapshot_v1_rest_json(example_table_metadata_no_snapshot_v1: Dict[str, Any]) -> Dict[str, Any]:
return {
"metadata-location": "s3://warehouse/database/table/metadata.json",
"metadata": example_table_metadata_no_snapshot_v1,
"config": {
"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory",
"region": "us-west-2",
},
}
@pytest.fixture
def rest_mock(requests_mock: Mocker) -> Mocker:
"""Takes the default requests_mock and adds the config endpoint to it
This endpoint is called when initializing the rest catalog
"""
requests_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
return requests_mock
def test_no_uri_supplied() -> None:
with pytest.raises(KeyError):
RestCatalog("production")
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_200(rest_mock: Mocker) -> None:
rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"scope": "openid offline",
"refresh_token": "refresh_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS)._session.headers["Authorization"] # pylint: disable=W0212
== f"Bearer {TEST_TOKEN}"
)
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_200_without_optional_fields(rest_mock: Mocker) -> None:
rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS)._session.headers["Authorization"] # pylint: disable=W0212
== f"Bearer {TEST_TOKEN}"
)
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_with_optional_oauth_params(rest_mock: Mocker) -> None:
mock_request = rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog(
"rest", uri=TEST_URI, credential=TEST_CREDENTIALS, audience=TEST_AUDIENCE, resource=TEST_RESOURCE
)._session.headers["Authorization"]
== f"Bearer {TEST_TOKEN}"
)
assert TEST_AUDIENCE in mock_request.last_request.text
assert TEST_RESOURCE in mock_request.last_request.text
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_with_optional_oauth_params_as_empty(rest_mock: Mocker) -> None:
mock_request = rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS, audience="", resource="")._session.headers["Authorization"]
== f"Bearer {TEST_TOKEN}"
)
assert TEST_AUDIENCE not in mock_request.last_request.text
assert TEST_RESOURCE not in mock_request.last_request.text
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_with_default_scope(rest_mock: Mocker) -> None:
mock_request = rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS)._session.headers["Authorization"] == f"Bearer {TEST_TOKEN}"
)
assert "catalog" in mock_request.last_request.text
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_with_custom_scope(rest_mock: Mocker) -> None:
mock_request = rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS, scope=TEST_SCOPE)._session.headers["Authorization"]
== f"Bearer {TEST_TOKEN}"
)
assert TEST_SCOPE in mock_request.last_request.text
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_200_w_oauth2_server_uri(rest_mock: Mocker) -> None:
rest_mock.post(
TEST_OAUTH2_SERVER_URI,
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
# pylint: disable=W0212
assert (
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS, **{OAUTH2_SERVER_URI: OAUTH2_SERVER_URI})._session.headers[
"Authorization"
]
== f"Bearer {TEST_TOKEN}"
)
# pylint: enable=W0212
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_config_200(requests_mock: Mocker) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
requests_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": TEST_TOKEN,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
request_headers=OAUTH_TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS, warehouse="s3://some-bucket")
assert requests_mock.called
assert requests_mock.call_count == 2
history = requests_mock.request_history
assert history[1].method == "GET"
assert history[1].url == "https://iceberg-test-catalog/v1/config?warehouse=s3%3A%2F%2Fsome-bucket"
def test_properties_sets_headers(requests_mock: Mocker) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog(
"rest",
uri=TEST_URI,
warehouse="s3://some-bucket",
**{"header.Content-Type": "application/vnd.api+json", "header.Customized-Header": "some/value"},
)
assert catalog._session.headers.get("Content-type") == "application/json", (
"Expected 'Content-Type' default header not to be overwritten"
)
assert requests_mock.last_request.headers["Content-type"] == "application/json", (
"Config request did not include expected 'Content-Type' header"
)
assert catalog._session.headers.get("Customized-Header") == "some/value", (
"Expected 'Customized-Header' header to be 'some/value'"
)
assert requests_mock.last_request.headers["Customized-Header"] == "some/value", (
"Config request did not include expected 'Customized-Header' header"
)
def test_config_sets_headers(requests_mock: Mocker) -> None:
namespace = "leden"
requests_mock.get(
f"{TEST_URI}v1/config",
json={
"defaults": {"header.Content-Type": "application/vnd.api+json", "header.Customized-Header": "some/value"},
"overrides": {},
},
status_code=200,
)
requests_mock.post(f"{TEST_URI}v1/namespaces", json={"namespace": [namespace], "properties": {}}, status_code=200)
catalog = RestCatalog("rest", uri=TEST_URI, warehouse="s3://some-bucket")
catalog.create_namespace(namespace)
assert catalog._session.headers.get("Content-type") == "application/json", (
"Expected 'Content-Type' default header not to be overwritten"
)
assert requests_mock.last_request.headers["Content-type"] == "application/json", (
"Create namespace request did not include expected 'Content-Type' header"
)
assert catalog._session.headers.get("Customized-Header") == "some/value", (
"Expected 'Customized-Header' header to be 'some/value'"
)
assert requests_mock.last_request.headers["Customized-Header"] == "some/value", (
"Create namespace request did not include expected 'Customized-Header' header"
)
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_400(rest_mock: Mocker) -> None:
rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={"error": "invalid_client", "error_description": "Credentials for key invalid_key do not match"},
status_code=400,
request_headers=OAUTH_TEST_HEADERS,
)
with pytest.raises(OAuthError) as e:
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS)
assert str(e.value) == "invalid_client: Credentials for key invalid_key do not match"
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
def test_token_401(rest_mock: Mocker) -> None:
message = "invalid_client"
rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={"error": "invalid_client", "error_description": "Unknown or invalid client"},
status_code=401,
request_headers=OAUTH_TEST_HEADERS,
)
with pytest.raises(OAuthError) as e:
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS)
assert message in str(e.value)
def test_list_tables_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")]
def test_list_tables_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) == [
("examples", "fooshare")
]
assert rest_mock.called
def test_list_tables_404(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/tables",
json={
"error": {
"message": "Namespace does not exist: personal in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)
assert "Namespace does not exist" in str(e.value)
def test_list_views_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")]
def test_list_views_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [
("examples", "fooshare")
]
assert rest_mock.called
def test_list_views_404(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={
"error": {
"message": "Namespace does not exist: personal in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)
assert "Namespace does not exist" in str(e.value)
def test_view_exists_204(rest_mock: Mocker) -> None:
namespace = "examples"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{namespace}/views/{view}",
status_code=204,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.view_exists((namespace, view))
def test_view_exists_404(rest_mock: Mocker) -> None:
namespace = "examples"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{namespace}/views/{view}",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.view_exists((namespace, view))
def test_view_exists_multilevel_namespace_404(rest_mock: Mocker) -> None:
multilevel_namespace = "core.examples.some_namespace"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{multilevel_namespace}/views/{view}",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.view_exists((multilevel_namespace, view))
def test_list_namespaces_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [
("default",),
("examples",),
("fokko",),
("system",),
]
def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=accounting",
json={"namespaces": [["accounting", "tax"]]},
status_code=200,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) == [
("accounting", "tax"),
]
def test_list_namespace_with_parent_404(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces?parent=some_namespace",
json={
"error": {
"message": "Namespace provided in the `parent` query parameter is not found",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError):
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",))
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
)
@pytest.mark.parametrize("status_code", [401, 419])
def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, status_code: int) -> None:
new_token = "new_jwt_token"
new_header = dict(TEST_HEADERS)
new_header["Authorization"] = f"Bearer {new_token}"
namespaces = rest_mock.register_uri(
"GET",
f"{TEST_URI}v1/namespaces",
[
{
"status_code": status_code,
"json": {
"error": {
"message": "Authorization expired.",
"type": "AuthorizationExpiredError",
"code": status_code,
}
},
"headers": TEST_HEADERS,
},
{
"status_code": 200,
"json": {"namespaces": [["default"], ["examples"], ["fokko"], ["system"]]},
"headers": new_header,
},
{
"status_code": 200,
"json": {"namespaces": [["default"], ["examples"], ["fokko"], ["system"]]},
"headers": new_header,
},
],
)
tokens = rest_mock.post(
f"{TEST_URI}v1/oauth/tokens",
json={
"access_token": new_token,
"token_type": "Bearer",
"expires_in": 86400,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
},
status_code=200,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, credential=TEST_CREDENTIALS)
# LegacyOAuth2AuthManager is created twice through `_create_session()`
# which results in the token being refreshed twice when the RestCatalog is initialized.
assert tokens.call_count == 2
assert catalog.list_namespaces() == [
("default",),
("examples",),
("fokko",),
("system",),
]
assert namespaces.call_count == 2
assert tokens.call_count == 3
assert catalog.list_namespaces() == [
("default",),
("examples",),
("fokko",),
("system",),
]
assert namespaces.call_count == 3
assert tokens.call_count == 3
def test_create_namespace_200(rest_mock: Mocker) -> None:
namespace = "leden"
rest_mock.post(
f"{TEST_URI}v1/namespaces",
json={"namespace": [namespace], "properties": {}},
status_code=200,
request_headers=TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_namespace(namespace)
def test_create_namespace_if_exists_409(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.post(
f"{TEST_URI}v1/namespaces",
json={
"error": {
"message": "Namespace already exists: fokko in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_namespace_if_not_exists(namespace)
def test_create_namespace_409(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.post(
f"{TEST_URI}v1/namespaces",
json={
"error": {
"message": "Namespace already exists: fokko in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
with pytest.raises(NamespaceAlreadyExistsError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_namespace(namespace)
assert "Namespace already exists" in str(e.value)
def test_drop_namespace_404(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.delete(
f"{TEST_URI}v1/namespaces/{namespace}",
json={
"error": {
"message": "Namespace does not exist: leden in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_namespace(namespace)
assert "Namespace does not exist" in str(e.value)
def test_drop_namespace_409(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.delete(
f"{TEST_URI}v1/namespaces/{namespace}",
json={
"error": {
"message": "Namespace is not empty: leden in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NamespaceNotEmptyError",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
with pytest.raises(NamespaceNotEmptyError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_namespace(namespace)
assert "Namespace is not empty" in str(e.value)
def test_load_namespace_properties_200(rest_mock: Mocker) -> None:
namespace = "leden"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}",
json={"namespace": ["fokko"], "properties": {"prop": "yes"}},
status_code=204,
request_headers=TEST_HEADERS,
)
assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_namespace_properties(namespace) == {"prop": "yes"}
def test_load_namespace_properties_404(rest_mock: Mocker) -> None:
namespace = "leden"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}",
json={
"error": {
"message": "Namespace does not exist: fokko22 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_namespace_properties(namespace)
assert "Namespace does not exist" in str(e.value)
def test_update_namespace_properties_200(rest_mock: Mocker) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/properties",
json={"removed": [], "updated": ["prop"], "missing": ["abc"]},
status_code=200,
request_headers=TEST_HEADERS,
)
response = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).update_namespace_properties(
("fokko",), {"abc"}, {"prop": "yes"}
)
assert response == PropertiesUpdateSummary(removed=[], updated=["prop"], missing=["abc"])
def test_namespace_exists_200(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko",
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.namespace_exists("fokko")
def test_namespace_exists_204(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko",
status_code=204,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.namespace_exists("fokko")
def test_namespace_exists_404(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.namespace_exists("fokko")
def test_namespace_exists_500(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko",
status_code=500,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(ServerError):
catalog.namespace_exists("fokko")
def test_update_namespace_properties_404(rest_mock: Mocker) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/properties",
json={
"error": {
"message": "Namespace does not exist: does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).update_namespace_properties(("fokko",), {"abc"}, {"prop": "yes"})
assert "Namespace does not exist" in str(e.value)
def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("fokko", "table"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
# First compare the dicts
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_load_table_200_loading_mode(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table?snapshots=refs",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{SNAPSHOT_LOADING_MODE: "refs"})
actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("fokko", "table"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
# First compare the dicts
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_load_table_honor_access_delegation(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"}
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=test_headers_with_remote_signing,
)
# catalog = RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "access-delegation": "remote-signing"})
catalog = RestCatalog(
"rest",
**{
"uri": TEST_URI,
"token": TEST_TOKEN,
"header.X-Iceberg-Access-Delegation": "remote-signing",
},
)
actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("fokko", "table"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
# First compare the dicts
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_load_table_from_self_identifier_200(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/pdames/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("pdames", "table"))
actual = catalog.load_table(table.name())
expected = Table(
identifier=("pdames", "table"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_load_table_404(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists",
json={
"error": {
"message": "Table does not exist: examples.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceErrorException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchTableError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists"))
assert "Table does not exist" in str(e.value)
def test_table_exists_200(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.table_exists(("fokko", "table"))
def test_table_exists_204(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
status_code=204,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.table_exists(("fokko", "table"))
def test_table_exists_404(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.table_exists(("fokko", "table"))
def test_table_exists_500(rest_mock: Mocker) -> None:
rest_mock.head(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
status_code=500,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(ServerError):
catalog.table_exists(("fokko", "table"))
def test_drop_table_404(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists",
json={
"error": {
"message": "Table does not exist: fokko.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceErrorException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchTableError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("fokko", "does_not_exists"))
assert "Table does not exist" in str(e.value)
def test_create_table_200(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
expected = Table(
identifier=("fokko", "fokko2"),
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual == expected
def test_create_table_with_given_location_removes_trailing_slash_200(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
location = "s3://warehouse/database/table-custom-location"
catalog.create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=f"{location}/",
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
assert rest_mock.last_request
assert rest_mock.last_request.json()["location"] == location
def test_create_staged_table_200(
rest_mock: Mocker,
table_schema_simple: Schema,
example_table_metadata_with_no_location: Dict[str, Any],
example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any],
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json=example_table_metadata_with_no_location,
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
identifier = ("fokko", "fokko2")
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
txn = catalog.create_table_transaction(
identifier=identifier,
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
txn.commit_transaction()
actual_response = rest_mock.last_request.json()
expected = {
"identifier": {"namespace": ["fokko"], "name": "fokko2"},
"requirements": [{"type": "assert-create"}],
"updates": [
{"action": "assign-uuid", "uuid": "b55d9dda-6561-423a-8bfc-787980ce421f"},
{"action": "upgrade-format-version", "format-version": 1},
{
"action": "add-schema",
"schema": {
"type": "struct",
"fields": [
{"id": 1, "name": "id", "type": "int", "required": False},
{"id": 2, "name": "data", "type": "string", "required": False},
],
"schema-id": 0,
"identifier-field-ids": [],
},
},
{"action": "set-current-schema", "schema-id": -1},
{"action": "add-spec", "spec": {"spec-id": 0, "fields": []}},
{"action": "set-default-spec", "spec-id": -1},
{"action": "add-sort-order", "sort-order": {"order-id": 0, "fields": []}},
{"action": "set-default-sort-order", "sort-order-id": -1},
{"action": "set-location", "location": "s3://warehouse/database/table"},
{"action": "set-properties", "updates": {"owner": "bryan", "write.metadata.compression-codec": "gzip"}},
],
}
assert actual_response == expected
def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json={
"error": {
"message": "Table already exists: fokko.already_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
with pytest.raises(TableAlreadyExistsError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id")
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
assert "Table already exists" in str(e.value)
def test_create_table_if_not_exists_200(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
def json_callback() -> Callable[[Any, Any], Dict[str, Any]]:
call_count = 0
def callback(request: Any, context: Any) -> Dict[str, Any]:
nonlocal call_count
call_count += 1
if call_count == 1:
context.status_code = 200
return example_table_metadata_no_snapshot_v1_rest_json
else:
context.status_code = 409
return {
"error": {
"message": "Table already exists: fokko.already_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
}
return callback
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json=json_callback(),
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table1 = catalog.create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id"), spec_id=1
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
table2 = catalog.create_table_if_not_exists(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id")),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
assert table1 == table2
def test_create_table_419(rest_mock: Mocker, table_schema_simple: Schema) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json={
"error": {
"message": "Authorization expired.",
"type": "AuthorizationExpiredError",
"code": 419,
}
},
status_code=419,
request_headers=TEST_HEADERS,
)
with pytest.raises(AuthorizationExpiredError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=3), name="id")
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
)
assert "Authorization expired" in str(e.value)
assert rest_mock.call_count == 3
def test_register_table_200(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.register_table(
identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json"
)
expected = Table(
identifier=("default", "registered_table"),
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual.metadata_location == expected.metadata_location
assert actual.name() == expected.name()
def test_register_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
json={
"error": {
"message": "Table already exists: fokko.fokko2 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "AlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(TableAlreadyExistsError) as e:
catalog.register_table(
identifier=("default", "registered_table"), metadata_location="s3://warehouse/database/table/metadata.json"
)
assert "Table already exists" in str(e.value)
def test_delete_namespace_204(rest_mock: Mocker) -> None:
namespace = "example"
rest_mock.delete(
f"{TEST_URI}v1/namespaces/{namespace}",
json={},
status_code=204,
request_headers=TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_namespace(namespace)
def test_delete_table_204(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/example/tables/fokko",
json={},
status_code=204,
request_headers=TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("example", "fokko"))
def test_delete_table_from_self_identifier_204(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/pdames/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("pdames", "table"))
rest_mock.delete(
f"{TEST_URI}v1/namespaces/pdames/tables/table",
json={},
status_code=204,
request_headers=TEST_HEADERS,
)
catalog.drop_table(table.name())
def test_rename_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
rest_mock.post(
f"{TEST_URI}v1/tables/rename",
json={
"source": {"namespace": ("pdames",), "name": "source"},
"destination": {"namespace": ("pdames",), "name": "destination"},
},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/pdames/tables/destination",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
from_identifier = ("pdames", "source")
to_identifier = ("pdames", "destination")
actual = catalog.rename_table(from_identifier, to_identifier)
expected = Table(
identifier=("pdames", "destination"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_rename_table_from_self_identifier_200(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/pdames/tables/source",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
from_identifier = ("pdames", "source")
to_identifier = ("pdames", "destination")
table = catalog.load_table(from_identifier)
rest_mock.post(
f"{TEST_URI}v1/tables/rename",
json={
"source": {"namespace": ("pdames",), "name": "source"},
"destination": {"namespace": ("pdames",), "name": "destination"},
},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/pdames/tables/destination",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
actual = catalog.rename_table(table.name(), to_identifier)
expected = Table(
identifier=("pdames", "destination"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
def test_delete_table_404(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/example/tables/fokko",
json={
"error": {
"message": "Table does not exist: fokko.fokko2 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchTableException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchTableError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("example", "fokko"))
assert "Table does not exist" in str(e.value)
def test_create_table_missing_namespace(rest_mock: Mocker, table_schema_simple: Schema) -> None:
table = "table"
with pytest.raises(NoSuchIdentifierError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(table, table_schema_simple)
assert f"Missing namespace or invalid identifier: {table}" in str(e.value)
def test_load_table_invalid_namespace(rest_mock: Mocker) -> None:
table = "table"
with pytest.raises(NoSuchIdentifierError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(table)
assert f"Missing namespace or invalid identifier: {table}" in str(e.value)
def test_drop_table_invalid_namespace(rest_mock: Mocker) -> None:
table = "table"
with pytest.raises(NoSuchIdentifierError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(table)
assert f"Missing namespace or invalid identifier: {table}" in str(e.value)
def test_purge_table_invalid_namespace(rest_mock: Mocker) -> None:
table = "table"
with pytest.raises(NoSuchIdentifierError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).purge_table(table)
assert f"Missing namespace or invalid identifier: {table}" in str(e.value)
def test_create_namespace_invalid_namespace(rest_mock: Mocker) -> None:
with pytest.raises(NoSuchNamespaceError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_namespace(())
assert "Empty namespace identifier" in str(e.value)
def test_drop_namespace_invalid_namespace(rest_mock: Mocker) -> None:
with pytest.raises(NoSuchNamespaceError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_namespace(())
assert "Empty namespace identifier" in str(e.value)
def test_load_namespace_properties_invalid_namespace(rest_mock: Mocker) -> None:
with pytest.raises(NoSuchNamespaceError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_namespace_properties(())
assert "Empty namespace identifier" in str(e.value)
def test_update_namespace_properties_invalid_namespace(rest_mock: Mocker) -> None:
with pytest.raises(NoSuchNamespaceError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).update_namespace_properties(())
assert "Empty namespace identifier" in str(e.value)
def test_request_session_with_ssl_ca_bundle(monkeypatch: pytest.MonkeyPatch) -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
"ssl": {
"cabundle": "path_to_ca_bundle",
},
}
with pytest.raises(OSError) as e:
monkeypatch.delenv("REQUESTS_CA_BUNDLE", raising=False)
monkeypatch.delenv("CURL_CA_BUNDLE", raising=False)
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "Could not find a suitable TLS CA certificate bundle, invalid path: path_to_ca_bundle" in str(e.value)
def test_request_session_with_ssl_client_cert() -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
"ssl": {
"client": {
"cert": "path_to_client_cert",
"key": "path_to_client_key",
}
},
}
with pytest.raises(OSError) as e:
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value)
def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None:
# Given
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "basic",
"basic": {
"username": "one",
"password": "two",
},
},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
assert catalog.uri == TEST_URI
encoded_user_pass = base64.b64encode(b"one:two").decode()
expected_auth_header = f"Basic {encoded_user_pass}"
assert rest_mock.last_request.headers["Authorization"] == expected_auth_header
def test_rest_catalog_with_custom_auth_type() -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "custom",
"impl": "dummy.nonexistent.package",
"custom": {
"property1": "one",
"property2": "two",
},
},
}
with pytest.raises(ValueError) as e:
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "Could not load AuthManager class for 'dummy.nonexistent.package'" in str(e.value)
def test_rest_catalog_with_custom_basic_auth_type(rest_mock: Mocker) -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "custom",
"impl": "pyiceberg.catalog.rest.auth.BasicAuthManager",
"custom": {
"username": "one",
"password": "two",
},
},
}
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
assert catalog.uri == TEST_URI
encoded_user_pass = base64.b64encode(b"one:two").decode()
expected_auth_header = f"Basic {encoded_user_pass}"
assert rest_mock.last_request.headers["Authorization"] == expected_auth_header
def test_rest_catalog_with_custom_auth_type_no_impl() -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "custom",
"custom": {
"property1": "one",
"property2": "two",
},
},
}
with pytest.raises(ValueError) as e:
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "auth.impl must be specified when using custom auth.type" in str(e.value)
def test_rest_catalog_with_non_custom_auth_type_impl() -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "basic",
"impl": "basic.package",
"basic": {
"username": "one",
"password": "two",
},
},
}
with pytest.raises(ValueError) as e:
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "auth.impl can only be specified when using custom auth.type" in str(e.value)
def test_rest_catalog_with_unsupported_auth_type() -> None:
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "unsupported",
"unsupported": {
"property1": "one",
"property2": "two",
},
},
}
with pytest.raises(ValueError) as e:
# Missing namespace
RestCatalog("rest", **catalog_properties) # type: ignore
assert "Could not load AuthManager class for 'unsupported'" in str(e.value)
def test_rest_catalog_with_oauth2_auth_type(requests_mock: Mocker) -> None:
requests_mock.post(
f"{TEST_URI}oauth2/token",
json={
"access_token": "MTQ0NjJkZmQ5OTM2NDE1ZTZjNGZmZjI3",
"token_type": "Bearer",
"expires_in": 3600,
"refresh_token": "IwOGYzYTlmM2YxOTQ5MGE3YmNmMDFkNTVk",
"scope": "read",
},
status_code=200,
)
requests_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "oauth2",
"oauth2": {
"client_id": "some_client_id",
"client_secret": "some_client_secret",
"token_url": f"{TEST_URI}oauth2/token",
"scope": "read",
},
},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
assert catalog.uri == TEST_URI
def test_rest_catalog_oauth2_non_200_token_response(requests_mock: Mocker) -> None:
requests_mock.post(
f"{TEST_URI}oauth2/token",
json={"error": "invalid_client"},
status_code=401,
)
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "oauth2",
"oauth2": {
"client_id": "bad_client_id",
"client_secret": "bad_client_secret",
"token_url": f"{TEST_URI}oauth2/token",
"scope": "read",
},
},
}
with pytest.raises(HTTPError):
RestCatalog("rest", **catalog_properties) # type: ignore
EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": TEST_URI}
@mock.patch.dict(os.environ, EXAMPLE_ENV)
@mock.patch("pyiceberg.catalog.Config.get_catalog_config")
def test_catalog_from_environment_variables(catalog_config_mock: mock.Mock, rest_mock: Mocker) -> None:
env_config: RecursiveDict = Config._from_environment_variables({})
catalog_config_mock.return_value = cast(RecursiveDict, env_config.get("catalog")).get("production")
catalog = cast(RestCatalog, load_catalog("production"))
assert catalog.uri == TEST_URI
@mock.patch.dict(os.environ, EXAMPLE_ENV)
@mock.patch("pyiceberg.catalog._ENV_CONFIG.get_catalog_config")
def test_catalog_from_environment_variables_override(catalog_config_mock: mock.Mock, rest_mock: Mocker) -> None:
rest_mock.get(
"https://other-service.io/api/v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
env_config: RecursiveDict = Config._from_environment_variables({})
catalog_config_mock.return_value = cast(RecursiveDict, env_config.get("catalog")).get("production")
catalog = cast(RestCatalog, load_catalog("production", uri="https://other-service.io/api"))
assert catalog.uri == "https://other-service.io/api"
def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None:
rest_mock.get(
"https://other-service.io/api/v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = cast(RestCatalog, load_catalog("production", uri="https://other-service.io/api"))
assert catalog.uri == "https://other-service.io/api"
def test_table_identifier_in_commit_table_request(
rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_v2: Dict[str, Any]
) -> None:
metadata_location = "s3://some_bucket/metadata.json"
rest_mock.post(
url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name",
json={
"metadata": example_table_metadata_v2,
"metadata-location": metadata_location,
},
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)
table = Table(
identifier=("namespace", "table_name"),
metadata=None, # type: ignore
metadata_location=metadata_location,
io=None, # type: ignore
catalog=catalog,
)
catalog.commit_table(table, (), ())
assert (
rest_mock.last_request.text
== """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""
)
def test_drop_view_invalid_namespace(rest_mock: Mocker) -> None:
view = "view"
with pytest.raises(NoSuchIdentifierError) as e:
# Missing namespace
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_view(view)
assert f"Missing namespace or invalid identifier: {view}" in str(e.value)
def test_drop_view_404(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/some_namespace/views/does_not_exists",
json={
"error": {
"message": "The given view does not exist",
"type": "NoSuchViewException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchViewError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_view(("some_namespace", "does_not_exists"))
assert "The given view does not exist" in str(e.value)
def test_drop_view_204(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/some_namespace/views/some_view",
json={},
status_code=204,
request_headers=TEST_HEADERS,
)
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_view(("some_namespace", "some_view"))
@mock.patch("google.auth.transport.requests.Request")
@mock.patch("google.auth.load_credentials_from_file")
def test_rest_catalog_with_google_credentials_path(
mock_load_creds: mock.MagicMock, mock_google_request: mock.MagicMock, rest_mock: Mocker
) -> None:
mock_credentials = mock.MagicMock()
mock_credentials.token = "file_token"
mock_load_creds.return_value = (mock_credentials, "test_project_file")
# Given
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
# Given
catalog_properties = {
"uri": TEST_URI,
"auth": {
"type": "google",
"google": {
"credentials_path": "/fake/path.json",
},
},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
assert catalog.uri == TEST_URI
expected_auth_header = "Bearer file_token"
assert rest_mock.last_request.headers["Authorization"] == expected_auth_header
mock_load_creds.assert_called_with("/fake/path.json", scopes=None)
mock_credentials.refresh.assert_called_once_with(mock_google_request.return_value)
history = rest_mock.request_history
assert len(history) == 1
actual_headers = history[0].headers
assert actual_headers["Authorization"] == expected_auth_header
class TestRestCatalogClose:
"""Tests RestCatalog close functionality"""
EXPECTED_ADAPTERS = 2
EXPECTED_ADAPTERS_SIGV4 = 3
def test_catalog_close(self, rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
catalog.close()
# Verify session still exists after close the session pooled connections
assert hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS
# Second close should not raise any exception
catalog.close()
def test_rest_catalog_close_sigv4(self, rest_mock: Mocker) -> None:
catalog = None
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
catalog = RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"})
catalog.close()
assert hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4
def test_rest_catalog_context_manager_with_exception(self, rest_mock: Mocker) -> None:
"""Test RestCatalog context manager properly closes with exceptions."""
catalog = None
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
try:
with RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) as cat:
catalog = cat
raise ValueError("Test exception")
except ValueError:
pass
assert catalog is not None and hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS
def test_rest_catalog_context_manager_with_exception_sigv4(self, rest_mock: Mocker) -> None:
"""Test RestCatalog context manager properly closes with exceptions."""
catalog = None
rest_mock.get(
f"{TEST_URI}v1/config",
json={"defaults": {}, "overrides": {}},
status_code=200,
)
try:
with RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}) as cat:
catalog = cat
raise ValueError("Test exception")
except ValueError:
pass
assert catalog is not None and hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4