blob: 78ada1768a52806b081598127d40ebd1a184b932 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import json
from dataclasses import dataclass
from typing import Optional, Dict
from cli.command import Command
from cli.command.utils import get_catalog_api_client
from cli.constants import Subcommands, Arguments, UNIT_SEPARATOR
from cli.options.option_tree import Argument
from polaris.management import PolarisDefaultApi
from polaris.catalog.api.policy_api import PolicyAPI
from polaris.catalog.models.create_policy_request import CreatePolicyRequest
from polaris.catalog.models.update_policy_request import UpdatePolicyRequest
from polaris.catalog.models.policy_attachment_target import PolicyAttachmentTarget
from polaris.catalog.models.attach_policy_request import AttachPolicyRequest
from polaris.catalog.models.detach_policy_request import DetachPolicyRequest
@dataclass
class PoliciesCommand(Command):
"""
A Command implementation to represent `polaris policies`.
"""
policies_subcommand: str
catalog_name: str
namespace: str
policy_name: str
policy_file: str
policy_type: Optional[str]
policy_description: Optional[str]
target_name: Optional[str]
parameters: Optional[Dict[str, str]]
detach_all: Optional[bool]
applicable: Optional[bool]
attachment_type: Optional[str]
attachment_path: Optional[str]
def validate(self):
if not self.catalog_name:
raise Exception(f"Missing required argument: {Argument.to_flag_name(Arguments.CATALOG)}")
if self.policies_subcommand in [Subcommands.CREATE, Subcommands.UPDATE]:
if not self.policy_file:
raise Exception(f"Missing required argument: {Argument.to_flag_name(Arguments.POLICY_FILE)}")
if self.policies_subcommand in [Subcommands.ATTACH, Subcommands.DETACH]:
if not self.attachment_type:
raise Exception(f"Missing required argument: {Argument.to_flag_name(Arguments.ATTACHMENT_TYPE)}")
if self.attachment_type != 'catalog' and not self.attachment_path:
raise Exception(f"'{Argument.to_flag_name(Arguments.ATTACHMENT_PATH)}' is required when attachment type is not 'catalog'")
if self.policies_subcommand == Subcommands.LIST and self.applicable and self.target_name:
if not self.namespace:
raise Exception(
f"Missing required argument: {Argument.to_flag_name(Arguments.NAMESPACE)}"
f" when {Argument.to_flag_name(Arguments.TARGET_NAME)} is set."
)
if self.policies_subcommand == Subcommands.LIST and not self.applicable:
if not self.namespace:
raise Exception(
f"Missing required argument: {Argument.to_flag_name(Arguments.NAMESPACE)}"
f" when listing policies without {Argument.to_flag_name(Arguments.APPLICABLE)} flag."
)
def execute(self, api: PolarisDefaultApi) -> None:
catalog_api_client = get_catalog_api_client(api)
policy_api = PolicyAPI(catalog_api_client)
namespace_str = self.namespace.replace('.', UNIT_SEPARATOR) if self.namespace else ""
if self.policies_subcommand == Subcommands.CREATE:
with open(self.policy_file, "r") as f:
policy = json.load(f)
policy_api.create_policy(
prefix=self.catalog_name,
namespace=namespace_str,
create_policy_request=CreatePolicyRequest(
name=self.policy_name,
type=self.policy_type,
description=self.policy_description,
content=json.dumps(policy)
)
)
elif self.policies_subcommand == Subcommands.DELETE:
policy_api.drop_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name,
detach_all=self.detach_all
)
elif self.policies_subcommand == Subcommands.GET:
print(policy_api.load_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name
).to_json())
elif self.policies_subcommand == Subcommands.LIST:
if self.applicable:
applicable_policies_list = []
if self.target_name:
# Table-like level policies
applicable_policies_list = policy_api.get_applicable_policies(
prefix=self.catalog_name,
namespace=namespace_str,
target_name=self.target_name,
policy_type=self.policy_type
).applicable_policies
elif self.namespace:
# Namespace level policies
applicable_policies_list = policy_api.get_applicable_policies(
prefix=self.catalog_name,
namespace=namespace_str,
policy_type=self.policy_type
).applicable_policies
else:
# Catalog level policies
applicable_policies_list = policy_api.get_applicable_policies(
prefix=self.catalog_name,
policy_type=self.policy_type
).applicable_policies
for policy in applicable_policies_list:
print(policy.to_json())
else:
# List all policy identifiers in the namespace
policies_response = policy_api.list_policies(
prefix=self.catalog_name,
namespace=namespace_str,
policy_type=self.policy_type
).to_json()
print(policies_response)
elif self.policies_subcommand == Subcommands.UPDATE:
with open(self.policy_file, "r") as f:
policy_document = json.load(f)
# Fetch the current policy to get its version
loaded_policy_response = policy_api.load_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name
)
if loaded_policy_response and loaded_policy_response.policy:
current_policy_version = loaded_policy_response.policy.version
else:
raise Exception(f"Could not retrieve current policy version for {self.policy_name}")
policy_api.update_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name,
update_policy_request=UpdatePolicyRequest(
description=self.policy_description,
content=json.dumps(policy_document),
current_policy_version=current_policy_version
)
)
elif self.policies_subcommand == Subcommands.ATTACH:
attachment_path_list = [] if self.attachment_type == "catalog" else self.attachment_path.split('.')
policy_api.attach_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name,
attach_policy_request=AttachPolicyRequest(
target=PolicyAttachmentTarget(
type=self.attachment_type,
path=attachment_path_list
),
parameters=self.parameters if isinstance(self.parameters, dict) else None
)
)
elif self.policies_subcommand == Subcommands.DETACH:
attachment_path_list = [] if self.attachment_type == "catalog" else self.attachment_path.split('.')
policy_api.detach_policy(
prefix=self.catalog_name,
namespace=namespace_str,
policy_name=self.policy_name,
detach_policy_request=DetachPolicyRequest(
target=PolicyAttachmentTarget(
type=self.attachment_type,
path=attachment_path_list
),
parameters=self.parameters if isinstance(self.parameters, dict) else None
)
)
else:
raise Exception(f"{self.policies_subcommand} is not supported in the CLI")