| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint: disable=broad-except,redefined-builtin,redefined-outer-name |
| from functools import wraps |
| from typing import ( |
| Any, |
| Callable, |
| Dict, |
| Literal, |
| Optional, |
| Tuple, |
| ) |
| |
| import click |
| from click import Context |
| |
| from pyiceberg import __version__ |
| from pyiceberg.catalog import Catalog, load_catalog |
| from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output |
| from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError |
| from pyiceberg.table import TableProperties |
| from pyiceberg.table.refs import SnapshotRef |
| from pyiceberg.utils.properties import property_as_int |
| |
| |
| def catch_exception() -> Callable: # type: ignore |
| def decorator(func: Callable) -> Callable: # type: ignore |
| @wraps(func) |
| def wrapper(*args: Any, **kwargs: Any): # type: ignore |
| try: |
| return func(*args, **kwargs) |
| except Exception as e: |
| ctx: Context = click.get_current_context(silent=True) |
| _, output = _catalog_and_output(ctx) |
| output.exception(e) |
| ctx.exit(1) |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| @click.group() |
| @click.option("--catalog") |
| @click.option("--verbose", type=click.BOOL) |
| @click.option("--output", type=click.Choice(["text", "json"]), default="text") |
| @click.option("--ugi") |
| @click.option("--uri") |
| @click.option("--credential") |
| @click.pass_context |
| def run( |
| ctx: Context, |
| catalog: Optional[str], |
| verbose: bool, |
| output: str, |
| ugi: Optional[str], |
| uri: Optional[str], |
| credential: Optional[str], |
| ) -> None: |
| properties = {} |
| if ugi: |
| properties["ugi"] = ugi |
| if uri: |
| properties["uri"] = uri |
| if credential: |
| properties["credential"] = credential |
| |
| ctx.ensure_object(dict) |
| if output == "text": |
| ctx.obj["output"] = ConsoleOutput(verbose=verbose) |
| else: |
| ctx.obj["output"] = JsonOutput(verbose=verbose) |
| |
| try: |
| ctx.obj["catalog"] = load_catalog(catalog, **properties) |
| except Exception as e: |
| ctx.obj["output"].exception(e) |
| ctx.exit(1) |
| |
| if not isinstance(ctx.obj["catalog"], Catalog): |
| ctx.obj["output"].exception( |
| ValueError("Could not determine catalog type from uri. REST (http/https) and Hive (thrift) is supported") |
| ) |
| ctx.exit(1) |
| |
| |
| def _catalog_and_output(ctx: Context) -> Tuple[Catalog, Output]: |
| """Small helper to set the types.""" |
| return ctx.obj["catalog"], ctx.obj["output"] |
| |
| |
| @run.command() |
| @click.pass_context |
| @click.argument("parent", required=False) |
| @catch_exception() |
| def list(ctx: Context, parent: Optional[str]) -> None: # pylint: disable=redefined-builtin |
| """List tables or namespaces.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| identifiers = [] |
| if parent: |
| # Do we have tables under parent namespace? |
| identifiers = catalog.list_tables(parent) |
| if not identifiers: |
| # List hierarchical namespaces if parent, root namespaces otherwise. |
| identifiers = catalog.list_namespaces(parent or ()) |
| output.identifiers(identifiers) |
| |
| |
| @run.command() |
| @click.option("--entity", type=click.Choice(["any", "namespace", "table"]), default="any") |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def describe(ctx: Context, entity: Literal["name", "namespace", "table"], identifier: str) -> None: |
| """Describe a namespace or a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| identifier_tuple = Catalog.identifier_to_tuple(identifier) |
| |
| is_namespace = False |
| if entity in {"namespace", "any"} and len(identifier_tuple) > 0: |
| try: |
| namespace_properties = catalog.load_namespace_properties(identifier_tuple) |
| output.describe_properties(namespace_properties) |
| is_namespace = True |
| except NoSuchNamespaceError as exc: |
| if entity != "any" or len(identifier_tuple) == 1: # type: ignore |
| raise exc |
| |
| is_table = False |
| if entity in {"table", "any"} and len(identifier_tuple) > 1: |
| try: |
| catalog_table = catalog.load_table(identifier) |
| output.describe_table(catalog_table) |
| is_table = True |
| except NoSuchTableError as exc: |
| if entity != "any": |
| raise exc |
| |
| if is_namespace is False and is_table is False: |
| raise NoSuchTableError(f"Table or namespace does not exist: {identifier}") |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.option("--history", is_flag=True) |
| @click.pass_context |
| @catch_exception() |
| def files(ctx: Context, identifier: str, history: bool) -> None: |
| """List all the files of the table.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog_table = catalog.load_table(identifier) |
| output.files(catalog_table, history) |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def schema(ctx: Context, identifier: str) -> None: |
| """Get the schema of the table.""" |
| catalog, output = _catalog_and_output(ctx) |
| table = catalog.load_table(identifier) |
| output.schema(table.schema()) |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def spec(ctx: Context, identifier: str) -> None: |
| """Return the partition spec of the table.""" |
| catalog, output = _catalog_and_output(ctx) |
| table = catalog.load_table(identifier) |
| output.spec(table.spec()) |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def uuid(ctx: Context, identifier: str) -> None: |
| """Return the UUID of the table.""" |
| catalog, output = _catalog_and_output(ctx) |
| metadata = catalog.load_table(identifier).metadata |
| output.uuid(metadata.table_uuid) |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def location(ctx: Context, identifier: str) -> None: |
| """Return the location of the table.""" |
| catalog, output = _catalog_and_output(ctx) |
| table = catalog.load_table(identifier) |
| output.text(table.location()) |
| |
| |
| @run.command() |
| @click.pass_context |
| @catch_exception() |
| def version(ctx: Context) -> None: |
| """Print pyiceberg version.""" |
| ctx.obj["output"].version(__version__) |
| |
| |
| @run.group() |
| def create() -> None: |
| """Operation to create a namespace.""" |
| |
| |
| @create.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def namespace(ctx: Context, identifier: str) -> None: |
| """Create a namespace.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog.create_namespace(identifier) |
| output.text(f"Created namespace: {identifier}") |
| |
| |
| @run.group() |
| def drop() -> None: |
| """Operations to drop a namespace or table.""" |
| |
| |
| @drop.command() |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def table(ctx: Context, identifier: str) -> None: # noqa: F811 |
| """Drop a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog.drop_table(identifier) |
| output.text(f"Dropped table: {identifier}") |
| |
| |
| @drop.command() # type: ignore |
| @click.argument("identifier") |
| @click.pass_context |
| @catch_exception() |
| def namespace(ctx: Context, identifier: str) -> None: # noqa: F811 |
| """Drop a namespace.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog.drop_namespace(identifier) |
| output.text(f"Dropped namespace: {identifier}") |
| |
| |
| @run.command() |
| @click.argument("from_identifier") |
| @click.argument("to_identifier") |
| @click.pass_context |
| @catch_exception() |
| def rename(ctx: Context, from_identifier: str, to_identifier: str) -> None: |
| """Rename a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog.rename_table(from_identifier, to_identifier) |
| output.text(f"Renamed table from {from_identifier} to {to_identifier}") |
| |
| |
| @run.group() |
| def properties() -> None: |
| """Properties on tables/namespaces.""" |
| |
| |
| @properties.group() |
| def get() -> None: |
| """Fetch properties on tables/namespaces.""" |
| |
| |
| @get.command("namespace") |
| @click.argument("identifier") |
| @click.argument("property_name", required=False) |
| @click.pass_context |
| @catch_exception() |
| def get_namespace(ctx: Context, identifier: str, property_name: str) -> None: |
| """Fetch properties on a namespace.""" |
| catalog, output = _catalog_and_output(ctx) |
| identifier_tuple = Catalog.identifier_to_tuple(identifier) |
| |
| namespace_properties = catalog.load_namespace_properties(identifier_tuple) |
| assert namespace_properties |
| |
| if property_name: |
| if property_value := namespace_properties.get(property_name): |
| output.text(property_value) |
| else: |
| raise NoSuchPropertyException(f"Could not find property {property_name} on namespace {identifier}") |
| else: |
| output.describe_properties(namespace_properties) |
| |
| |
| @get.command("table") |
| @click.argument("identifier") |
| @click.argument("property_name", required=False) |
| @click.pass_context |
| @catch_exception() |
| def get_table(ctx: Context, identifier: str, property_name: str) -> None: |
| """Fetch properties on a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| identifier_tuple = Catalog.identifier_to_tuple(identifier) |
| |
| metadata = catalog.load_table(identifier_tuple).metadata |
| assert metadata |
| |
| if property_name: |
| if property_value := metadata.properties.get(property_name): |
| output.text(property_value) |
| else: |
| raise NoSuchPropertyException(f"Could not find property {property_name} on table {identifier}") |
| else: |
| output.describe_properties(metadata.properties) |
| |
| |
| @properties.group() |
| def set() -> None: |
| """Set a property on tables/namespaces.""" |
| |
| |
| @set.command() # type: ignore |
| @click.argument("identifier") |
| @click.argument("property_name") |
| @click.argument("property_value") |
| @click.pass_context |
| @catch_exception() |
| def namespace(ctx: Context, identifier: str, property_name: str, property_value: str) -> None: # noqa: F811 |
| """Set a property on a namespace.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| catalog.update_namespace_properties(identifier, updates={property_name: property_value}) |
| output.text(f"Updated {property_name} on {identifier}") |
| |
| |
| @set.command() # type: ignore |
| @click.argument("identifier") |
| @click.argument("property_name") |
| @click.argument("property_value") |
| @click.pass_context |
| @catch_exception() |
| def table(ctx: Context, identifier: str, property_name: str, property_value: str) -> None: # noqa: F811 |
| """Set a property on a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| identifier_tuple = Catalog.identifier_to_tuple(identifier) |
| |
| _ = catalog.load_table(identifier_tuple) |
| output.text(f"Setting {property_name}={property_value} on {identifier}") |
| raise NotImplementedError("Writing is WIP") |
| |
| |
| @properties.group() |
| def remove() -> None: |
| """Remove a property from tables/namespaces.""" |
| |
| |
| @remove.command() # type: ignore |
| @click.argument("identifier") |
| @click.argument("property_name") |
| @click.pass_context |
| @catch_exception() |
| def namespace(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F811 |
| """Remove a property from a namespace.""" |
| catalog, output = _catalog_and_output(ctx) |
| |
| result = catalog.update_namespace_properties(identifier, removals={property_name}) |
| |
| if result.removed == [property_name]: |
| output.text(f"Property {property_name} removed from {identifier}") |
| else: |
| raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}") |
| |
| |
| @remove.command() # type: ignore |
| @click.argument("identifier") |
| @click.argument("property_name") |
| @click.pass_context |
| @catch_exception() |
| def table(ctx: Context, identifier: str, property_name: str) -> None: # noqa: F811 |
| """Remove a property from a table.""" |
| catalog, output = _catalog_and_output(ctx) |
| table = catalog.load_table(identifier) |
| if property_name in table.metadata.properties: |
| output.exception(NotImplementedError("Writing is WIP")) |
| ctx.exit(1) |
| else: |
| raise NoSuchPropertyException(f"Property {property_name} does not exist on {identifier}") |
| |
| |
| @run.command() |
| @click.argument("identifier") |
| @click.option("--type", required=False) |
| @click.option("--verbose", type=click.BOOL) |
| @click.pass_context |
| @catch_exception() |
| def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None: |
| """List all the refs in the provided table.""" |
| catalog, output = _catalog_and_output(ctx) |
| table = catalog.load_table(identifier) |
| refs = table.refs() |
| if type: |
| type = type.lower() |
| if type not in {"branch", "tag"}: |
| raise ValueError(f"Type must be either branch or tag, got: {type}") |
| |
| relevant_refs = [ |
| (ref_name, ref.snapshot_ref_type, _retention_properties(ref, table.properties)) |
| for (ref_name, ref) in refs.items() |
| if not type or ref.snapshot_ref_type == type |
| ] |
| |
| output.describe_refs(relevant_refs) |
| |
| |
| def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]: |
| retention_properties = {} |
| if ref.snapshot_ref_type == "branch": |
| default_min_snapshots_to_keep = property_as_int( |
| table_properties, |
| TableProperties.MIN_SNAPSHOTS_TO_KEEP, |
| TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT, |
| ) |
| |
| default_max_snapshot_age_ms = property_as_int( |
| table_properties, |
| TableProperties.MAX_SNAPSHOT_AGE_MS, |
| TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT, |
| ) |
| |
| retention_properties["min_snapshots_to_keep"] = ( |
| str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else str(default_min_snapshots_to_keep) |
| ) |
| retention_properties["max_snapshot_age_ms"] = ( |
| str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else str(default_max_snapshot_age_ms) |
| ) |
| else: |
| retention_properties["min_snapshots_to_keep"] = "N/A" |
| retention_properties["max_snapshot_age_ms"] = "N/A" |
| |
| retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if ref.max_ref_age_ms else "forever" |
| |
| return retention_properties |