blob: e3bf193f563f9d96520a02080ef21d19672ab549 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import json
from dataclasses import dataclass
from typing import Dict, Optional, List
from pydantic import StrictStr
from cli.command import Command
from cli.command.utils import get_catalog_api_client
from cli.constants import Subcommands, Arguments, UNIT_SEPARATOR
from cli.options.option_tree import Argument
from polaris.catalog import IcebergCatalogAPI, CreateNamespaceRequest
from polaris.management import PolarisDefaultApi
@dataclass
class NamespacesCommand(Command):
"""
A Command implementation to represent `polaris namespaces`. The instance attributes correspond to parameters
that can be provided to various subcommands
Example commands:
* ./polaris namespaces create --catalog my_schema my_namespace
* ./polaris namespaces list --catalog my_catalog
* ./polaris namespaces delete --catalog my_catalog my_namespace.inner
"""
namespaces_subcommand: str
catalog: str
namespace: List[StrictStr]
parent: List[StrictStr]
location: str
properties: Optional[Dict[str, StrictStr]]
def validate(self):
if not self.catalog:
raise Exception(
f"Missing required argument: {Argument.to_flag_name(Arguments.CATALOG)}"
)
def execute(self, api: PolarisDefaultApi) -> None:
catalog_api_client = get_catalog_api_client(api)
catalog_api = IcebergCatalogAPI(catalog_api_client)
if self.namespaces_subcommand == Subcommands.CREATE:
req_properties = self.properties or {}
if self.location:
req_properties = {**req_properties, Arguments.LOCATION: self.location}
request = CreateNamespaceRequest(
namespace=self.namespace, properties=req_properties
)
catalog_api.create_namespace(
prefix=self.catalog, create_namespace_request=request
)
elif self.namespaces_subcommand == Subcommands.LIST:
if self.parent is not None:
result = catalog_api.list_namespaces(
prefix=self.catalog, parent=UNIT_SEPARATOR.join(self.parent)
)
else:
result = catalog_api.list_namespaces(prefix=self.catalog)
for namespace in result.namespaces:
print(json.dumps({"namespace": ".".join(namespace)}))
elif self.namespaces_subcommand == Subcommands.DELETE:
catalog_api.drop_namespace(
prefix=self.catalog, namespace=UNIT_SEPARATOR.join(self.namespace)
)
elif self.namespaces_subcommand == Subcommands.GET:
print(
catalog_api.load_namespace_metadata(
prefix=self.catalog, namespace=UNIT_SEPARATOR.join(self.namespace)
).to_json()
)
else:
raise Exception(f"{self.namespaces_subcommand} is not supported in the CLI")