blob: d26041cd6ebce3d4b09c20b5de37d5008430f1c9 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import codecs
import os
import time
import uuid
from urllib.parse import unquote
import boto3
import botocore
import pytest
from py4j.protocol import Py4JJavaError
from iceberg_spark import IcebergSparkSession
from apache_polaris.sdk.catalog import CreateNamespaceRequest, CreateTableRequest, ModelSchema
from apache_polaris.sdk.catalog.api.iceberg_catalog_api import IcebergCatalogAPI
from apache_polaris.sdk.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API
from apache_polaris.sdk.catalog.api_client import ApiClient as CatalogApiClient
from apache_polaris.sdk.catalog.configuration import Configuration
from apache_polaris.sdk.management import ApiClient as ManagementApiClient
from apache_polaris.sdk.management import PolarisDefaultApi, Principal, PrincipalRole, CatalogRole, \
CatalogGrant, CatalogPrivilege, ApiException, CreateCatalogRoleRequest, CreatePrincipalRoleRequest, \
CreatePrincipalRequest, AddGrantRequest, GrantCatalogRoleRequest, GrantPrincipalRoleRequest, UpdateCatalogRequest
@pytest.fixture
def snowman(polaris_url, polaris_catalog_url, root_client, snowflake_catalog):
"""
create the snowman principal with full table/namespace privileges
:param root_client:
:param snowflake_catalog:
:return:
"""
snowman_name = f"snowman_{str(uuid.uuid4())[-10:]}"
table_writer_rolename = f"table_writer_{str(uuid.uuid4())[-10:]}"
snowflake_writer_rolename = "snowflake_writer"
try:
snowman = create_principal(polaris_url, polaris_catalog_url, root_client, snowman_name)
writer_principal_role = create_principal_role(root_client, table_writer_rolename)
writer_catalog_role = create_catalog_role(root_client, snowflake_catalog, snowflake_writer_rolename)
root_client.assign_catalog_role_to_principal_role(principal_role_name=writer_principal_role.name,
catalog_name=snowflake_catalog.name,
grant_catalog_role_request=GrantCatalogRoleRequest(
catalog_role=writer_catalog_role))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, writer_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_FULL_METADATA)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, writer_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.VIEW_FULL_METADATA)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, writer_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_WRITE_DATA)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, writer_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.NAMESPACE_FULL_METADATA)))
root_client.assign_principal_role(snowman.principal.name,
grant_principal_role_request=GrantPrincipalRoleRequest(
principal_role=writer_principal_role))
yield snowman
finally:
root_client.delete_principal(snowman_name)
root_client.delete_principal_role(principal_role_name=table_writer_rolename)
root_client.delete_catalog_role(catalog_role_name=snowflake_writer_rolename, catalog_name=snowflake_catalog.name)
@pytest.fixture
def reader(polaris_url, polaris_catalog_url, root_client, snowflake_catalog):
"""
create the test_reader principal with table/namespace list and read privileges
:param root_client:
:param snowflake_catalog:
:return:
"""
reader_principal_name = 'test_reader'
reader_principal_role_name = "table_reader"
reader_catalog_role_name = 'snowflake_reader'
try:
reader = create_principal(polaris_url, polaris_catalog_url, root_client, reader_principal_name)
reader_principal_role = create_principal_role(root_client, reader_principal_role_name)
reader_catalog_role = create_catalog_role(root_client, snowflake_catalog, reader_catalog_role_name)
root_client.assign_catalog_role_to_principal_role(principal_role_name=reader_principal_role.name,
catalog_name=snowflake_catalog.name,
grant_catalog_role_request=GrantCatalogRoleRequest(
catalog_role=reader_catalog_role))
root_client.assign_principal_role(reader.principal.name,
grant_principal_role_request=GrantPrincipalRoleRequest(
principal_role=reader_principal_role))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, reader_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_READ_DATA)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, reader_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_LIST)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, reader_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, reader_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.NAMESPACE_LIST)))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, reader_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.NAMESPACE_READ_PROPERTIES)))
yield reader
finally:
root_client.delete_principal(reader_principal_name)
root_client.delete_principal_role(principal_role_name=reader_principal_role_name)
root_client.delete_catalog_role(catalog_role_name=reader_catalog_role_name, catalog_name=snowflake_catalog.name)
@pytest.fixture
def snowman_catalog_client(polaris_catalog_url, snowman):
"""
Create an iceberg catalog client with snowman credentials
:param polaris_catalog_url:
:param snowman:
:return:
"""
client = CatalogApiClient(Configuration(username=snowman.principal.client_id,
password=snowman.credentials.client_secret.get_secret_value(),
host=polaris_catalog_url))
oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL', client_id=snowman.principal.client_id,
client_secret=snowman.credentials.client_secret.get_secret_value(),
grant_type='client_credentials',
_headers={'realm': 'POLARIS'})
return IcebergCatalogAPI(CatalogApiClient(Configuration(access_token=token.access_token,
host=polaris_catalog_url)))
@pytest.fixture
def creator_catalog_client(polaris_catalog_url, creator):
"""
Create an iceberg catalog client with TABLE_CREATE credentials
:param polaris_catalog_url:
:param creator:
:return:
"""
client = CatalogApiClient(Configuration(username=creator.principal.client_id,
password=creator.credentials.client_secret.get_secret_value(),
host=polaris_catalog_url))
oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL', client_id=creator.principal.client_id,
client_secret=creator.credentials.client_secret.get_secret_value(),
grant_type='client_credentials',
_headers={'realm': 'POLARIS'})
return IcebergCatalogAPI(CatalogApiClient(Configuration(access_token=token.access_token,
host=polaris_catalog_url)))
@pytest.fixture
def creator(polaris_url, polaris_catalog_url, root_client, snowflake_catalog):
"""
create the creator principal with only TABLE_CREATE privileges
:param root_client:
:param snowflake_catalog:
:return:
"""
creator_name = "creator"
principal_role = "creator_principal_role"
catalog_role = "creator_catalog_role"
try:
creator = create_principal(polaris_url, polaris_catalog_url, root_client, creator_name)
creator_principal_role = create_principal_role(root_client, principal_role)
creator_catalog_role = create_catalog_role(root_client, snowflake_catalog, catalog_role)
root_client.assign_catalog_role_to_principal_role(principal_role_name=creator_principal_role.name,
catalog_name=snowflake_catalog.name,
grant_catalog_role_request=GrantCatalogRoleRequest(
catalog_role=creator_catalog_role))
root_client.add_grant_to_catalog_role(snowflake_catalog.name, creator_catalog_role.name,
AddGrantRequest(grant=CatalogGrant(catalog_name=snowflake_catalog.name,
type='catalog',
privilege=CatalogPrivilege.TABLE_CREATE)))
root_client.assign_principal_role(creator.principal.name,
grant_principal_role_request=GrantPrincipalRoleRequest(
principal_role=creator_principal_role))
yield creator
finally:
root_client.delete_principal(creator_name)
root_client.delete_principal_role(principal_role_name=principal_role)
root_client.delete_catalog_role(catalog_role_name=catalog_role, catalog_name=snowflake_catalog.name)
@pytest.fixture
def reader_catalog_client(polaris_catalog_url, reader):
"""
Create an iceberg catalog client with test_reader credentials
:param polaris_catalog_url:
:param reader:
:return:
"""
client = CatalogApiClient(Configuration(username=reader.principal.client_id,
password=reader.credentials.client_secret.get_secret_value(),
host=polaris_catalog_url))
oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL', client_id=reader.principal.client_id,
client_secret=reader.credentials.client_secret.get_secret_value(),
grant_type='client_credentials',
_headers={'realm': 'POLARIS'})
return IcebergCatalogAPI(CatalogApiClient(Configuration(access_token=token.access_token,
host=polaris_catalog_url)))
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials(root_client, snowflake_catalog, polaris_catalog_url, snowman, reader):
"""
Basic spark test - using snowman, create namespaces and a table. Insert into the table and read records back.
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
spark.sql('CREATE TABLE iceberg_table (col1 int, col2 string)')
spark.sql('SHOW TABLES')
spark.sql("""INSERT INTO iceberg_table VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
count = spark.sql("SELECT * FROM iceberg_table").count()
assert count == 3
# switch users to the reader. we can query, show namespaces, but we can't insert
with IcebergSparkSession(credentials=f'{reader.principal.client_id}:{reader.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
count = spark.sql("SELECT * FROM iceberg_table").count()
assert count == 3
try:
spark.sql("""INSERT INTO iceberg_table VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
pytest.fail("Expected exception when trying to write without permission")
except:
print("Exception caught attempting to write without permission")
# switch back to delete stuff
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('USE db1.schema')
spark.sql('DROP TABLE iceberg_table PURGE')
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_cannot_create_table_outside_of_namespace_dir(root_client, snowflake_catalog, polaris_catalog_url,
snowman, reader):
"""
Basic spark test - using snowman, create a namespace and try to create a table outside of the namespace. This should fail
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_location = snowflake_catalog.properties.default_base_location + '/db1/outside_schema/table_outside_namespace'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
try:
spark.sql(f"CREATE TABLE iceberg_table_outside_namespace (col1 int, col2 string) LOCATION '{table_location}'")
pytest.fail("Expected to fail when creating table outside of namespace directory")
except Py4JJavaError as e:
assert "is not in the list of allowed locations" in e.java_exception.getMessage()
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_creates_table_in_custom_namespace_dir(root_client, snowflake_catalog, polaris_catalog_url, snowman,
reader):
"""
Basic spark test - using snowman, create a namespace and try to create a table outside of the namespace. This should fail
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
namespace_location = snowflake_catalog.properties.default_base_location + '/db1/custom_location'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql(f"CREATE NAMESPACE db1.schema LOCATION '{namespace_location}'")
spark.sql('USE db1.schema')
spark.sql("CREATE TABLE table_in_custom_namespace_location (col1 int, col2 string)")
assert spark.sql("SELECT * FROM table_in_custom_namespace_location").count() == 0
# check the metadata and assert the custom namespace location is used
entries = spark.sql(
"SELECT file FROM db1.schema.table_in_custom_namespace_location.metadata_log_entries").collect()
assert namespace_location in entries[0][0]
try:
assert spark.sql("SELECT * FROM table_in_custom_namespace_location").count() == 0
# check the metadata and assert the custom namespace location is used
entries = spark.sql(
"SELECT file FROM db1.schema.table_in_custom_namespace_location.metadata_log_entries").collect()
assert namespace_location in entries[0][0]
finally:
spark.sql('DROP TABLE table_in_custom_namespace_location PURGE')
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_can_create_table_in_custom_allowed_dir(root_client, snowflake_catalog, polaris_catalog_url, snowman,
reader):
"""
Basic spark test - using snowman, create a namespace and try to create a table outside of the namespace. This should fail
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_location = snowflake_catalog.properties.default_base_location + '/db1/custom_schema_location/table_outside_namespace'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql(
f"CREATE NAMESPACE db1.schema LOCATION '{snowflake_catalog.properties.default_base_location}/db1/custom_schema_location'")
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
# this is supported because it is inside of the custom namespace location
spark.sql(f"CREATE TABLE iceberg_table_outside_namespace (col1 int, col2 string) LOCATION '{table_location}'")
spark.sql("drop table iceberg_table_outside_namespace PURGE")
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_cannot_create_view_overlapping_table(root_client, snowflake_catalog, polaris_catalog_url, snowman,
reader):
"""
Basic spark test - using snowman, create a namespace and try to create a table outside of the namespace. This should fail
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_location = snowflake_catalog.properties.default_base_location + '/db1/schema/table_dir'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql(f"CREATE NAMESPACE db1.schema LOCATION '{snowflake_catalog.properties.default_base_location}/db1/schema'")
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
spark.sql(f"CREATE TABLE my_iceberg_table (col1 int, col2 string) LOCATION '{table_location}'")
try:
spark.sql(
f"CREATE VIEW disallowed_view (int, string) TBLPROPERTIES ('location'= '{table_location}') AS SELECT * FROM my_iceberg_table")
pytest.fail("Expected to fail when creating table outside of namespace directory")
except Py4JJavaError as e:
assert "conflicts with existing table or namespace at location" in e.java_exception.getMessage()
spark.sql("drop table my_iceberg_table PURGE")
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_can_delete_after_purge(root_client, snowflake_catalog, polaris_catalog_url, snowman,
snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records
to generate multiple metadata.json files and manifests. Drop the table with purge=true. Poll S3 and validate all of
the files are deleted.
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
spark.sql(f'CREATE TABLE {table_name} (col1 int, col2 string)')
spark.sql('SHOW TABLES')
# several inserts and an update, which should cause earlier files to show up as deleted in the later manifests
spark.sql(f"""INSERT INTO {table_name} VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(40, 'mystring'),
(50, 'anotherstring'),
(60, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(70, 'mystring'),
(80, 'anotherstring'),
(90, null)
""")
spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50")
count = spark.sql(f"SELECT * FROM {table_name}").count()
assert count == 9
# fetch aws credentials to examine the metadata files
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), table_name,
"true")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
# Extract the table location from the metadata_location in the response
# metadata_location format: s3://bucket/path/to/table/metadata/v1.metadata.json
# We need to extract the base table path (everything before /metadata/)
metadata_location = response.metadata_location
assert metadata_location.startswith('s3://')
# Remove s3:// prefix and bucket name to get the path
path_without_scheme = metadata_location[5:] # Remove 's3://'
path_parts = path_without_scheme.split('/', 1) # Split bucket and path
bucket_from_metadata = path_parts[0]
full_path = path_parts[1] if len(path_parts) > 1 else ''
# Extract table base path (everything before /metadata/)
table_base_path = full_path.rsplit('/metadata/', 1)[0] if '/metadata/' in full_path else ''
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{table_base_path}/data/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) >= 4 # it varies - at least one file for each insert and one for the update
print(f"Found {len(objects['Contents'])} data files in S3 before drop")
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{table_base_path}/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) == 15 # 5 metadata.json files, 4 manifest lists, and 6 manifests
print(f"Found {len(objects['Contents'])} metadata files in S3 before drop")
# use the api client to ensure the purge flag is set to true
snowman_catalog_client.drop_table(snowflake_catalog.name,
codecs.decode("1F", "hex").decode("UTF-8").join(['db1', 'schema']), table_name,
purge_requested=True)
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
print("Dropped table with purge - waiting for files to be deleted")
attempts = 0
# watch the data directory. metadata will be deleted first, so if data directory is clear, we can expect
# metadata directory to be clear also
while 'Contents' in objects and len(objects['Contents']) > 0 and attempts < 60:
time.sleep(1) # seconds, not milliseconds ;)
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{table_base_path}/data/')
attempts = attempts + 1
if 'Contents' in objects and len(objects['Contents']) > 0:
pytest.fail(f"Expected all data to be deleted, but found metadata files {objects['Contents']}")
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{table_base_path}/data/')
if 'Contents' in objects and len(objects['Contents']) > 0:
pytest.fail(f"Expected all data to be deleted, but found data files {objects['Contents']}")
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_can_write_with_random_prefix(root_client, snowflake_catalog, polaris_catalog_url, snowman,
snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Update the catalog configuration to support unstructured table locations. Using snowman, create namespaces and a
table configured to use object-store layout in a folder under the catalog root, outside of the default table
directory. Insert into the table in multiple operations and update existing records to generate multiple metadata.json
files and manifests. Validate the data files are present under the expected subdirectory. Delete the files afterward.
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
snowflake_catalog.properties.additional_properties['allow.unstructured.table.location'] = 'true'
root_client.update_catalog(catalog_name=snowflake_catalog.name,
update_catalog_request=UpdateCatalogRequest(
properties=snowflake_catalog.properties.to_dict(),
current_entity_version=snowflake_catalog.entity_version))
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
spark.sql(
f"CREATE TABLE {table_name} (col1 int, col2 string) TBLPROPERTIES ('write.object-storage.enabled'='true','write.data.path'='s3://{test_bucket}/{aws_bucket_base_location_prefix}/snowflake_catalog/{table_name}data')")
spark.sql('SHOW TABLES')
# several inserts and an update, which should cause earlier files to show up as deleted in the later manifests
spark.sql(f"""INSERT INTO {table_name} VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(40, 'mystring'),
(50, 'anotherstring'),
(60, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(70, 'mystring'),
(80, 'anotherstring'),
(90, null)
""")
spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50")
count = spark.sql(f"SELECT * FROM {table_name}").count()
assert count == 9
# fetch aws credentials to examine the metadata files
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), table_name,
"true")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/{table_name}data/')
assert objects is not None
assert len(objects['CommonPrefixes']) >= 3
print(f"Found common prefixes in S3 {objects['CommonPrefixes']}")
objs_to_delete = []
for prefix in objects['CommonPrefixes']:
# Don't utilize delimiter so all files (recursive) are returned
data_objects = s3.list_objects(Bucket=test_bucket,
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/{table_name}data/')
assert data_objects is not None
print(data_objects)
assert 'Contents' in data_objects
for obj in data_objects['Contents']:
filePathMap = {'Key': obj['Key']}
assert f'/schema/{table_name}/' in filePathMap['Key']
objs_to_delete.append(filePathMap)
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/{table_name}/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) == 15 # 5 metadata.json files, 4 manifest lists, and 6 manifests
print(f"Found {len(objects['Contents'])} metadata files in S3 before drop")
# use the api client to ensure the purge flag is set to true
snowman_catalog_client.drop_table(snowflake_catalog.name,
codecs.decode("1F", "hex").decode("UTF-8").join(['db1', 'schema']), table_name,
purge_requested=True)
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
s3.delete_objects(Bucket=test_bucket,
Delete={'Objects': objs_to_delete})
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_object_store_layout_under_table_dir(root_client, snowflake_catalog, polaris_catalog_url, snowman,
snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Using snowman, create namespaces and a table configured to use object-store layout, using a folder under the default
table directory structure. Insert into the table in multiple operations and update existing records
to generate multiple metadata.json files and manifests. Validate the data files are present under the expected
subdirectory. Delete the files afterward.
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:param reader:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
table_base_dir = f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/{table_name}/obj_layout/'
spark.sql(
f"CREATE TABLE {table_name} (col1 int, col2 string) TBLPROPERTIES ('write.object-storage.enabled'='true','write.data.path'='s3://{test_bucket}/{table_base_dir}')")
spark.sql('SHOW TABLES')
# several inserts and an update, which should cause earlier files to show up as deleted in the later manifests
spark.sql(f"""INSERT INTO {table_name} VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(40, 'mystring'),
(50, 'anotherstring'),
(60, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(70, 'mystring'),
(80, 'anotherstring'),
(90, null)
""")
spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50")
count = spark.sql(f"SELECT * FROM {table_name}").count()
assert count == 9
# fetch aws credentials to examine the metadata files
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), table_name,
"true")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=table_base_dir)
assert objects is not None
assert len(objects['CommonPrefixes']) >= 3
print(f"Found common prefixes in S3 {objects['CommonPrefixes']}")
objs_to_delete = []
for prefix in objects['CommonPrefixes']:
# Files may be under further subdirectories, so omit the
# delimiter so that all files (recursive) are returned.
data_objects = s3.list_objects(Bucket=test_bucket,
Prefix=f'{prefix["Prefix"]}')
assert data_objects is not None
print(data_objects)
assert 'Contents' in data_objects
objs_to_delete.extend([{'Key': obj['Key']} for obj in data_objects['Contents']])
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/{table_name}/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) == 15 # 5 metadata.json files, 4 manifest lists, and 6 manifests
print(f"Found {len(objects['Contents'])} metadata files in S3 before drop")
# use the api client to ensure the purge flag is set to true
snowman_catalog_client.drop_table(snowflake_catalog.name,
codecs.decode("1F", "hex").decode("UTF-8").join(['db1', 'schema']), table_name,
purge_requested=True)
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
s3.delete_objects(Bucket=test_bucket,
Delete={'Objects': objs_to_delete})
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
# @pytest.mark.skip(reason="This test is flaky")
def test_spark_credentials_can_create_views(snowflake_catalog, polaris_catalog_url, snowman):
"""
Using snowman, create namespaces and a table. Insert into the table in multiple operations and update existing records
to generate multiple metadata.json files and manifests. Create a view on the table. Verify the state of the view
matches the state of the table.
Using the reader principal's credentials verify read access. Validate the reader cannot insert into the table.
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('SHOW NAMESPACES')
spark.sql('USE db1.schema')
spark.sql(f'CREATE TABLE {table_name} (col1 int, col2 string)')
spark.sql('SHOW TABLES')
# several inserts
spark.sql(f"""INSERT INTO {table_name} VALUES
(10, 'mystring'),
(20, 'anotherstring'),
(30, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(40, 'mystring'),
(50, 'anotherstring'),
(60, null)
""")
spark.sql(f"""INSERT INTO {table_name} VALUES
(70, 'mystring'),
(80, 'anotherstring'),
(90, null)
""")
# verify the view reflects the current state of the table
spark.sql(f"CREATE VIEW {table_name}_view AS SELECT col2 FROM {table_name} where col1 > 30 ORDER BY col1 DESC")
view_records = spark.sql(f"SELECT * FROM {table_name}_view").collect()
assert len(view_records) == 6
assert len(view_records[0]) == 1
assert view_records[1][0] == 'anotherstring'
assert view_records[5][0] == 'mystring'
# Update some records. Assert the view reflects the new state
spark.sql(f"UPDATE {table_name} SET col2='changed string' WHERE col1 BETWEEN 20 AND 50")
view_records = spark.sql(f"SELECT * FROM {table_name}_view").collect()
assert len(view_records) == 6
assert view_records[5][0] == 'changed string'
spark.sql(f"drop table {table_name} PURGE")
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_s3_direct_with_write(root_client, snowflake_catalog, polaris_catalog_url,
snowman, snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Create two tables using Spark. Then call the loadTable api directly with snowman token to fetch the vended credentials
for the first table.
Verify that the credentials returned to snowman can read and write to the table's directory in S3, but don't allow
reads or writes to the other table's directory
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman_catalog_client:
:param reader_catalog_client:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql('CREATE TABLE iceberg_table (col1 int, col2 string)')
spark.sql('CREATE TABLE iceberg_table_2 (col1 int, col2 string)')
table2_metadata = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table_2",
"vended-credentials").metadata_location
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), "iceberg_table",
"vended-credentials")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table/')
assert objects is not None
assert 'CommonPrefixes' in objects
assert len(objects['CommonPrefixes']) > 0
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) > 0
metadata_file = next(f for f in objects['Contents'] if f['Key'].endswith('metadata.json'))
assert metadata_file is not None
metadata_contents = s3.get_object(Bucket=test_bucket, Key=metadata_file['Key'])
assert metadata_contents is not None
assert metadata_contents['ContentLength'] > 0
put_object = s3.put_object(Bucket=test_bucket, Key=f"{metadata_file['Key']}.bak",
Body=metadata_contents['Body'].read())
assert put_object is not None
assert 'ETag' in put_object
assert put_object['ETag'] is not None
# list files in the other table's directory. The access policy should restrict this
try:
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table_2/metadata/')
pytest.fail('Expected exception listing file outside of table directory')
except botocore.exceptions.ClientError as error:
print(error)
try:
metadata_contents = s3.get_object(Bucket=test_bucket, Key=table2_metadata)
pytest.fail("Expected exception reading file outside of table directory")
except botocore.exceptions.ClientError as error:
print(error)
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('USE db1.schema')
spark.sql('DROP TABLE iceberg_table PURGE')
spark.sql('DROP TABLE iceberg_table_2 PURGE')
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'false').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_s3_direct_without_write(root_client, snowflake_catalog, polaris_catalog_url,
snowman, reader_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Create two tables using Spark. Then call the loadTable api directly with test_reader token to fetch the vended
credentials for the first table.
Verify that the credentials returned to test_reader allow reads, but don't allow writes to the table's directory
and don't allow reads or writes anywhere else on S3. This verifies that Polaris's authz model does not only prevent
users from updating metadata to enforce read-only access, but uses credential scoping to enforce restrictions at
the storage layer.
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param reader_catalog_client:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql('CREATE TABLE iceberg_table (col1 int, col2 string)')
spark.sql('CREATE TABLE iceberg_table_2 (col1 int, col2 string)')
table2_metadata = reader_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table_2",
"vended-credentials").metadata_location
response = reader_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'), "iceberg_table",
"vended-credentials")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) > 0
metadata_file = next(f for f in objects['Contents'] if f['Key'].endswith('metadata.json'))
assert metadata_file is not None
metadata_contents = s3.get_object(Bucket=test_bucket, Key=metadata_file['Key'])
assert metadata_contents is not None
assert metadata_contents['ContentLength'] > 0
# try to write. Expect it to fail
try:
put_object = s3.put_object(Bucket=test_bucket, Key=f"{metadata_file['Key']}.bak",
Body=metadata_contents['Body'].read())
pytest.fail("Expect exception trying to write to table directory")
except botocore.exceptions.ClientError as error:
print(error)
# list files in the other table's directory. The access policy should restrict this
try:
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table_2/metadata/')
pytest.fail('Expected exception listing file outside of table directory')
except botocore.exceptions.ClientError as error:
print(error)
try:
metadata_contents = s3.get_object(Bucket=test_bucket, Key=table2_metadata)
pytest.fail("Expected exception reading file outside of table directory")
except botocore.exceptions.ClientError as error:
print(error)
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('USE db1.schema')
spark.sql('DROP TABLE iceberg_table')
spark.sql('DROP TABLE iceberg_table_2')
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'false').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_s3_direct_without_read(
snowflake_catalog, snowman_catalog_client, creator_catalog_client, test_bucket):
"""
Create a table using `creator`, which does not have TABLE_READ_DATA and expect a `ForbiddenException`
"""
snowman_catalog_client.create_namespace(
prefix=snowflake_catalog.name,
create_namespace_request=CreateNamespaceRequest(
namespace=["some_schema"]
)
)
try:
creator_catalog_client.create_table(
prefix=snowflake_catalog.name,
namespace="some_schema",
x_iceberg_access_delegation="true",
create_table_request=CreateTableRequest(
name="some_table",
var_schema=ModelSchema(
type='struct',
fields=[],
)
)
)
pytest.fail("Expected exception when creating a table without TABLE_WRITE")
except Exception as e:
assert 'CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION' in str(e)
snowman_catalog_client.drop_namespace(
prefix=snowflake_catalog.name,
namespace="some_schema"
)
def create_principal(polaris_url, polaris_catalog_url, api, principal_name):
principal = Principal(name=principal_name, type="SERVICE")
principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))
token_client = CatalogApiClient(Configuration(username=principal_result.principal.client_id,
password=principal_result.credentials.client_secret.get_secret_value(),
host=polaris_catalog_url))
oauth_api = IcebergOAuth2API(token_client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL', client_id=principal_result.principal.client_id,
client_secret=principal_result.credentials.client_secret.get_secret_value(),
grant_type='client_credentials',
_headers={'realm': 'POLARIS'})
rotate_client = ManagementApiClient(Configuration(access_token=token.access_token,
host=polaris_url))
rotate_api = PolarisDefaultApi(rotate_client)
rotate_credentials = rotate_api.rotate_credentials(principal_name=principal_name)
return rotate_credentials
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_s3_scoped_to_metadata_data_locations(root_client, snowflake_catalog, polaris_catalog_url,
snowman, snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Create a table using Spark. Then call the loadTable api directly with snowman token to fetch the vended credentials
for the table.
Verify that the credentials returned to snowman can only work for the location that ending with metadata or data directory
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman_catalog_client:
:param reader_catalog_client:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql('CREATE TABLE iceberg_table_scope_loc(col1 int, col2 string)')
spark.sql(
f'''CREATE TABLE iceberg_table_scope_loc_slashes (col1 int, col2 string) LOCATION \'s3://{test_bucket}/{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table_scope_loc_slashes/path_with_slashes///////\'''')
prefix1 = f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table_scope_loc'
prefix2 = f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table_scope_loc_slashes/path_with_slashes'
response1 = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table_scope_loc",
"vended-credentials")
response2 = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table_scope_loc_slashes",
"vended-credentials")
assert response1 is not None
assert response2 is not None
assert response1.metadata_location.startswith(f"s3://{test_bucket}/{prefix1}/metadata/")
# ensure that the slashes are removed before "/metadata/"
assert response2.metadata_location.startswith(f"s3://{test_bucket}/{prefix2}/metadata/")
s3_1 = boto3.client('s3',
aws_access_key_id=response1.config['s3.access-key-id'],
aws_secret_access_key=response1.config['s3.secret-access-key'],
aws_session_token=response1.config['s3.session-token'])
s3_2 = boto3.client('s3',
aws_access_key_id=response2.config['s3.access-key-id'],
aws_secret_access_key=response2.config['s3.secret-access-key'],
aws_session_token=response2.config['s3.session-token'])
for client, prefix in [(s3_1, prefix1), (s3_2, prefix2)]:
objects = client.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{prefix}/metadata/')
assert objects is not None
assert 'Contents' in objects, f'list metadata files failed in prefix: {prefix}/metadata/'
objects = client.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{prefix}/data/')
assert objects is not None
# no insert executed, so should not have any data files
assert 'Contents' not in objects, f'No contents should be in prefix: {prefix}/data/'
objects = client.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{prefix}/')
assert objects is not None
assert 'CommonPrefixes' in objects, f'list prefixes failed in prefix: {prefix}/'
assert len(objects['CommonPrefixes']) > 0
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('USE db1.schema')
spark.sql('DROP TABLE iceberg_table_scope_loc PURGE')
spark.sql('DROP TABLE iceberg_table_scope_loc_slashes PURGE')
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_ctas(snowflake_catalog, polaris_catalog_url, snowman):
"""
Create a table using CTAS and ensure that credentials are vended
:param root_client:
:param snowflake_catalog:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
table_name = f'iceberg_test_table_{str(uuid.uuid4())[-10:]}'
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql(f'CREATE TABLE {table_name}_t1 (col1 int)')
spark.sql('SHOW TABLES')
# Insert some data
spark.sql(f"INSERT INTO {table_name}_t1 VALUES (10)")
# Run CTAS
spark.sql(f"CREATE TABLE {table_name}_t2 AS SELECT * FROM {table_name}_t1")
spark.sql(f"drop table {table_name}_t1 PURGE")
spark.sql(f"drop table {table_name}_t2 PURGE")
@pytest.mark.skipif(os.environ.get('AWS_TEST_ENABLED', 'False').lower() != 'true',
reason='AWS_TEST_ENABLED is not set or is false')
def test_spark_credentials_s3_exception_on_metadata_file_deletion(root_client, snowflake_catalog, polaris_catalog_url,
snowman, snowman_catalog_client, test_bucket, aws_bucket_base_location_prefix):
"""
Create a using Spark. Then call the loadTable api directly with snowman token to fetch the vended credentials
for the first table.
Delete the metadata directory and try to access the table using the vended credentials.
It should throw 404 exception
:param root_client:
:param snowflake_catalog:
:param polaris_catalog_url:
:param snowman_catalog_client:
:param reader_catalog_client:
:return:
"""
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('CREATE NAMESPACE db1')
spark.sql('CREATE NAMESPACE db1.schema')
spark.sql('USE db1.schema')
spark.sql('CREATE TABLE iceberg_table (col1 int, col2 string)')
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table",
"vended-credentials")
assert response.config is not None
assert 's3.access-key-id' in response.config
assert 's3.secret-access-key' in response.config
assert 's3.session-token' in response.config
s3 = boto3.client('s3',
aws_access_key_id=response.config['s3.access-key-id'],
aws_secret_access_key=response.config['s3.secret-access-key'],
aws_session_token=response.config['s3.session-token'])
# Get metadata files
objects = s3.list_objects(Bucket=test_bucket, Delimiter='/',
Prefix=f'{aws_bucket_base_location_prefix}/snowflake_catalog/db1/schema/iceberg_table/metadata/')
assert objects is not None
assert 'Contents' in objects
assert len(objects['Contents']) > 0
# Verify metadata content
metadata_file = next(f for f in objects['Contents'] if f['Key'].endswith('metadata.json'))
assert metadata_file is not None
metadata_contents = s3.get_object(Bucket=test_bucket, Key=metadata_file['Key'])
assert metadata_contents is not None
assert metadata_contents['ContentLength'] > 0
# Delete metadata files
objects_to_delete = [{'Key': obj['Key']} for obj in objects['Contents']]
s3.delete_objects(Bucket=test_bucket,
Delete={'Objects': objects_to_delete})
try:
response = snowman_catalog_client.load_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table",
"vended-credentials")
except Exception as e:
# 400 error(BadRequest) is thrown when metadata file is missing
assert '400' in str(e)
with IcebergSparkSession(credentials=f'{snowman.principal.client_id}:{snowman.credentials.client_secret.get_secret_value()}',
catalog_name=snowflake_catalog.name,
polaris_url=polaris_catalog_url) as spark:
# Spark drop table triggers load table underneath, which fails due to missing metadata file.
# Directly call drop_table api to drop the table entity.
snowman_catalog_client.drop_table(snowflake_catalog.name, unquote('db1%1Fschema'),
"iceberg_table")
spark.sql(f'USE {snowflake_catalog.name}')
spark.sql('DROP NAMESPACE db1.schema')
spark.sql('DROP NAMESPACE db1')
def create_catalog_role(api, catalog, role_name):
catalog_role = CatalogRole(name=role_name)
try:
api.create_catalog_role(catalog_name=catalog.name,
create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))
return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
except ApiException:
return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
else:
raise e
def create_principal_role(api, role_name):
principal_role = PrincipalRole(name=role_name)
try:
api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))
return api.get_principal_role(principal_role_name=role_name)
except ApiException:
return api.get_principal_role(principal_role_name=role_name)