| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| from dataclasses import dataclass |
| from typing import Dict, List |
| |
| from pydantic import StrictStr, SecretStr |
| |
| from cli.command import Command |
| from cli.constants import StorageType, CatalogType, CatalogConnectionType, Subcommands, Arguments, AuthenticationType, \ |
| ServiceIdentityType |
| from cli.options.option_tree import Argument |
| from polaris.management import PolarisDefaultApi, CreateCatalogRequest, UpdateCatalogRequest, \ |
| StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \ |
| PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \ |
| OAuthClientCredentialsParameters, SigV4AuthenticationParameters, HadoopConnectionConfigInfo, \ |
| IcebergRestConnectionConfigInfo, AwsIamServiceIdentityInfo |
| |
| |
| @dataclass |
| class CatalogsCommand(Command): |
| """ |
| A Command implementation to represent `polaris catalogs`. The instance attributes correspond to parameters |
| that can be provided to various subcommands, except `catalogs_subcommand` which represents the subcommand |
| itself. |
| |
| Example commands: |
| * ./polaris catalogs create cat_name --storage-type s3 --default-base-location s3://bucket/path --role-arn ... |
| * ./polaris catalogs update cat_name --default-base-location s3://new-bucket/new-location |
| * ./polaris catalogs list |
| """ |
| |
| catalogs_subcommand: str |
| catalog_type: str |
| default_base_location: str |
| storage_type: str |
| allowed_locations: List[str] |
| role_arn: str |
| external_id: str |
| user_arn: str |
| region: str |
| tenant_id: str |
| multi_tenant_app_name: str |
| consent_url: str |
| service_account: str |
| catalog_name: str |
| properties: Dict[str, StrictStr] |
| set_properties: Dict[str, StrictStr] |
| remove_properties: List[str] |
| hadoop_warehouse: str |
| iceberg_remote_catalog_name: str |
| endpoint: str |
| endpoint_internal: str |
| sts_endpoint: str |
| sts_unavailable: bool |
| path_style_access: bool |
| catalog_connection_type: str |
| catalog_authentication_type: str |
| catalog_service_identity_type: str |
| catalog_service_identity_iam_arn: str |
| catalog_uri: str |
| catalog_token_uri: str |
| catalog_client_id: str |
| catalog_client_secret: str |
| catalog_client_scopes: List[str] |
| catalog_bearer_token: str |
| catalog_role_arn: str |
| catalog_role_session_name: str |
| catalog_external_id: str |
| catalog_signing_region: str |
| catalog_signing_name: str |
| |
| def validate(self): |
| if self.catalogs_subcommand == Subcommands.CREATE: |
| if self.catalog_type != CatalogType.EXTERNAL.value: |
| if not self.storage_type: |
| raise Exception(f'Missing required argument:' |
| f' {Argument.to_flag_name(Arguments.STORAGE_TYPE)}') |
| if not self.default_base_location: |
| raise Exception(f'Missing required argument:' |
| f' {Argument.to_flag_name(Arguments.DEFAULT_BASE_LOCATION)}') |
| else: |
| if self.catalog_authentication_type == AuthenticationType.OAUTH.value: |
| if not self.catalog_token_uri or not self.catalog_client_id \ |
| or not self.catalog_client_secret or len(self.catalog_client_scopes) == 0: |
| raise Exception(f"Authentication type 'OAUTH' requires" |
| f" {Argument.to_flag_name(Arguments.CATALOG_TOKEN_URI)}," |
| f" {Argument.to_flag_name(Arguments.CATALOG_CLIENT_ID)}," |
| f" {Argument.to_flag_name(Arguments.CATALOG_CLIENT_SECRET)}," |
| f" and at least one {Argument.to_flag_name(Arguments.CATALOG_CLIENT_SCOPE)}.") |
| elif self.catalog_authentication_type == AuthenticationType.BEARER.value: |
| if not self.catalog_bearer_token: |
| raise Exception(f"Missing required argument for authentication type 'BEARER':" |
| f" {Argument.to_flag_name(Arguments.CATALOG_BEARER_TOKEN)}") |
| elif self.catalog_authentication_type == AuthenticationType.SIGV4.value: |
| if not self.catalog_role_arn or not self.catalog_signing_region: |
| raise Exception(f"Authentication type 'SIGV4 requires" |
| f" {Argument.to_flag_name(Arguments.CATALOG_ROLE_ARN)}" |
| f" and {Argument.to_flag_name(Arguments.CATALOG_SIGNING_REGION)}") |
| if self.catalog_connection_type == CatalogConnectionType.HADOOP.value: |
| if not self.hadoop_warehouse or not self.catalog_uri: |
| raise Exception(f"Missing required argument for connection type 'HADOOP':" |
| f" {Argument.to_flag_name(Arguments.HADOOP_WAREHOUSE)}" |
| f" and {Argument.to_flag_name(Arguments.CATALOG_URI)}") |
| if self.catalog_service_identity_type == ServiceIdentityType.AWS_IAM.value: |
| if not self.catalog_service_identity_iam_arn: |
| raise Exception(f"Missing required argument for service identity type 'AWS_IAM':" |
| f" {Argument.to_flag_name(Arguments.CATALOG_SERVICE_IDENTITY_IAM_ARN)}") |
| |
| if self.storage_type == StorageType.S3.value: |
| if self._has_azure_storage_info() or self._has_gcs_storage_info(): |
| raise Exception( |
| f"Storage type 's3' supports the options" |
| f" {Argument.to_flag_name(Arguments.ROLE_ARN)}," |
| f" {Argument.to_flag_name(Arguments.REGION)}," |
| f" {Argument.to_flag_name(Arguments.EXTERNAL_ID)}," |
| f" {Argument.to_flag_name(Arguments.USER_ARN)}," |
| f" {Argument.to_flag_name(Arguments.ENDPOINT)}," |
| f" {Argument.to_flag_name(Arguments.ENDPOINT_INTERNAL)}," |
| f" {Argument.to_flag_name(Arguments.STS_ENDPOINT)}," |
| f" {Argument.to_flag_name(Arguments.STS_UNAVAILABLE)}, and" |
| f" {Argument.to_flag_name(Arguments.PATH_STYLE_ACCESS)}" |
| ) |
| elif self.storage_type == StorageType.AZURE.value: |
| if not self.tenant_id: |
| raise Exception( |
| "Missing required argument for storage type 'azure': " |
| f" {Argument.to_flag_name(Arguments.TENANT_ID)}" |
| ) |
| if self._has_aws_storage_info() or self._has_gcs_storage_info(): |
| raise Exception( |
| "Storage type 'azure' supports the options" |
| f" {Argument.to_flag_name(Arguments.TENANT_ID)}," |
| f" {Argument.to_flag_name(Arguments.MULTI_TENANT_APP_NAME)}, and" |
| f" {Argument.to_flag_name(Arguments.CONSENT_URL)}" |
| ) |
| elif self.storage_type == StorageType.GCS.value: |
| if self._has_aws_storage_info() or self._has_azure_storage_info(): |
| raise Exception( |
| "Storage type 'gcs' supports the storage credential" |
| f" {Argument.to_flag_name(Arguments.SERVICE_ACCOUNT)}" |
| ) |
| elif self.storage_type == StorageType.FILE.value: |
| if ( |
| self._has_aws_storage_info() |
| or self._has_azure_storage_info() |
| or self._has_gcs_storage_info() |
| ): |
| raise Exception( |
| "Storage type 'file' does not support any additional options" |
| ) |
| |
| def _has_aws_storage_info(self): |
| return self.role_arn or self.external_id or self.user_arn or self.region or self.endpoint or self.endpoint_internal or self.sts_endpoint or self.path_style_access |
| |
| def _has_azure_storage_info(self): |
| return self.tenant_id or self.multi_tenant_app_name or self.consent_url |
| |
| def _has_gcs_storage_info(self): |
| return self.service_account |
| |
| def _build_storage_config_info(self): |
| config = None |
| if self.storage_type == StorageType.S3.value: |
| config = AwsStorageConfigInfo( |
| storage_type=self.storage_type.upper(), |
| allowed_locations=self.allowed_locations, |
| role_arn=self.role_arn, |
| external_id=self.external_id, |
| user_arn=self.user_arn, |
| region=self.region, |
| endpoint=self.endpoint, |
| endpoint_internal=self.endpoint_internal, |
| sts_endpoint=self.sts_endpoint, |
| sts_unavailable=self.sts_unavailable, |
| path_style_access=self.path_style_access, |
| ) |
| elif self.storage_type == StorageType.AZURE.value: |
| config = AzureStorageConfigInfo( |
| storage_type=self.storage_type.upper(), |
| allowed_locations=self.allowed_locations, |
| tenant_id=self.tenant_id, |
| multi_tenant_app_name=self.multi_tenant_app_name, |
| consent_url=self.consent_url, |
| ) |
| elif self.storage_type == StorageType.GCS.value: |
| config = GcpStorageConfigInfo( |
| storage_type=self.storage_type.upper(), |
| allowed_locations=self.allowed_locations, |
| gcs_service_account=self.service_account, |
| ) |
| elif self.storage_type == StorageType.FILE.value: |
| config = StorageConfigInfo( |
| storage_type=self.storage_type.upper(), |
| allowed_locations=self.allowed_locations, |
| ) |
| return config |
| |
| def _build_connection_config_info(self): |
| if self.catalog_type != CatalogType.EXTERNAL.value: |
| return None |
| |
| auth_params = None |
| if self.catalog_authentication_type == AuthenticationType.OAUTH.value: |
| auth_params = OAuthClientCredentialsParameters( |
| authentication_type=self.catalog_authentication_type.upper(), |
| token_uri=self.catalog_token_uri, |
| client_id=self.catalog_client_id, |
| client_secret=SecretStr(self.catalog_client_secret), |
| scopes=self.catalog_client_scopes |
| ) |
| elif self.catalog_authentication_type == AuthenticationType.BEARER.value: |
| auth_params = BearerAuthenticationParameters( |
| authentication_type=self.catalog_authentication_type.upper(), |
| bearer_token=SecretStr(self.catalog_bearer_token) |
| ) |
| elif self.catalog_authentication_type == AuthenticationType.SIGV4.value: |
| auth_params = SigV4AuthenticationParameters( |
| authentication_type=self.catalog_authentication_type.upper(), |
| role_arn=self.catalog_role_arn, |
| role_session_name=self.catalog_role_session_name, |
| external_id=self.catalog_external_id, |
| signing_region=self.catalog_signing_region, |
| signing_name=self.catalog_signing_name, |
| ) |
| elif self.catalog_authentication_type == AuthenticationType.IMPLICIT.value: |
| auth_params = ImplicitAuthenticationParameters( |
| authentication_type=self.catalog_authentication_type.upper() |
| ) |
| elif self.catalog_authentication_type is not None: |
| raise Exception("Unknown authentication type:", self.catalog_authentication_type) |
| |
| service_identity = None |
| if self.catalog_service_identity_type == ServiceIdentityType.AWS_IAM: |
| service_identity = AwsIamServiceIdentityInfo( |
| identity_type=self.catalog_service_identity_type.upper(), |
| iam_arn=self.catalog_service_identity_iam_arn |
| ) |
| elif self.catalog_service_identity_type is not None: |
| raise Exception("Unknown service identity type:", self.catalog_service_identity_type) |
| |
| config = None |
| if self.catalog_connection_type == CatalogConnectionType.HADOOP.value: |
| config = HadoopConnectionConfigInfo( |
| connection_type=self.catalog_connection_type.upper(), |
| uri=self.catalog_uri, |
| authentication_parameters=auth_params, |
| service_identity=service_identity, |
| warehouse=self.hadoop_warehouse |
| ) |
| elif self.catalog_connection_type == CatalogConnectionType.ICEBERG.value: |
| config = IcebergRestConnectionConfigInfo( |
| connection_type=self.catalog_connection_type.upper().replace('-', '_'), |
| uri=self.catalog_uri, |
| authentication_parameters=auth_params, |
| service_identity=service_identity, |
| remote_catalog_name=self.iceberg_remote_catalog_name |
| ) |
| elif self.catalog_connection_type is not None: |
| raise Exception("Unknown catalog connection type:", self.catalog_connection_type) |
| return config |
| |
| def execute(self, api: PolarisDefaultApi) -> None: |
| if self.catalogs_subcommand == Subcommands.CREATE: |
| storage_config = self._build_storage_config_info() |
| connection_config = self._build_connection_config_info() |
| if self.catalog_type == CatalogType.EXTERNAL.value: |
| request = CreateCatalogRequest( |
| catalog=ExternalCatalog( |
| type=self.catalog_type.upper(), |
| name=self.catalog_name, |
| storage_config_info=storage_config, |
| properties=CatalogProperties( |
| default_base_location=self.default_base_location, |
| additional_properties=self.properties |
| ), |
| connection_config_info=connection_config |
| ) |
| ) |
| else: |
| request = CreateCatalogRequest( |
| catalog=PolarisCatalog( |
| type=self.catalog_type.upper(), |
| name=self.catalog_name, |
| storage_config_info=storage_config, |
| properties=CatalogProperties( |
| default_base_location=self.default_base_location, |
| additional_properties=self.properties |
| ), |
| connection_config_info=connection_config |
| ) |
| ) |
| api.create_catalog(request) |
| elif self.catalogs_subcommand == Subcommands.DELETE: |
| api.delete_catalog(self.catalog_name) |
| elif self.catalogs_subcommand == Subcommands.GET: |
| print(api.get_catalog(self.catalog_name).to_json()) |
| elif self.catalogs_subcommand == Subcommands.LIST: |
| for catalog in api.list_catalogs().catalogs: |
| print(catalog.to_json()) |
| elif self.catalogs_subcommand == Subcommands.UPDATE: |
| catalog = api.get_catalog(self.catalog_name) |
| |
| if ( |
| self.default_base_location |
| or self.set_properties |
| or self.remove_properties |
| ): |
| new_default_base_location = ( |
| self.default_base_location |
| or catalog.properties.default_base_location |
| ) |
| new_additional_properties = ( |
| catalog.properties.additional_properties or {} |
| ) |
| |
| # Add or update all entries specified in set_properties |
| if self.set_properties: |
| new_additional_properties = { |
| **new_additional_properties, |
| **self.set_properties, |
| } |
| |
| # Remove all keys specified in remove_properties |
| if self.remove_properties: |
| for to_remove in self.remove_properties: |
| new_additional_properties.pop(to_remove, None) |
| |
| catalog.properties = CatalogProperties( |
| default_base_location=new_default_base_location, |
| additional_properties=new_additional_properties, |
| ) |
| |
| if ( |
| self._has_aws_storage_info() |
| or self._has_azure_storage_info() |
| or self._has_gcs_storage_info() |
| or self.allowed_locations |
| ): |
| # We must first reconstitute local storage-config related settings from the existing |
| # catalog to properly construct the complete updated storage-config |
| updated_storage_info = catalog.storage_config_info |
| |
| # In order to apply mutations client-side, we can't just use the base |
| # _build_storage_config_info helper; instead, each allowed updatable field defined |
| # in option_tree.py should be applied individually against the existing |
| # storage_config_info here. |
| if self.allowed_locations: |
| updated_storage_info.allowed_locations.extend( |
| self.allowed_locations |
| ) |
| |
| if self.region: |
| # Note: We have to lowercase the returned value because the server enum |
| # is uppercase but we defined the StorageType enums as lowercase. |
| storage_type = updated_storage_info.storage_type |
| if storage_type.lower() != StorageType.S3.value: |
| raise Exception( |
| f"--region requires S3 storage_type, got: {storage_type}" |
| ) |
| updated_storage_info.region = self.region |
| |
| request = UpdateCatalogRequest( |
| current_entity_version=catalog.entity_version, |
| properties=catalog.properties.to_dict(), |
| storage_config_info=updated_storage_info, |
| ) |
| else: |
| request = UpdateCatalogRequest( |
| current_entity_version=catalog.entity_version, |
| properties=catalog.properties.to_dict(), |
| ) |
| |
| api.update_catalog(self.catalog_name, request) |
| else: |
| raise Exception(f"{self.catalogs_subcommand} is not supported in the CLI") |