| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| import enum |
| |
| |
| from iceberg.api import (Table, |
| Transaction) |
| from iceberg.core import TableOperations |
| from iceberg.exceptions import CommitFailedException |
| |
| |
| class BaseTransaction(Transaction): |
| |
| @staticmethod |
| def replace_table_transaction(ops, start): |
| return BaseTransaction(ops, start) |
| |
| @staticmethod |
| def create_table_transaction(ops, start): |
| if ops.current() is not None: |
| raise RuntimeError("Cannot start create table transaction: table already exists") |
| |
| @staticmethod |
| def new_transaction(ops): |
| return BaseTransaction(ops, ops.refesh()) |
| |
| def __init__(self, ops, start): |
| self.ops = ops |
| self.updates = list() |
| self.intermediate_snapshot_ids = set() |
| self.base = ops.current |
| if self.base is None and start is None: |
| self.type = TransactionType.CREATE_TABLE |
| elif self.base is not None and start != self.base: |
| self.type = TransactionType.REPLACE_TABLE |
| else: |
| self.type = TransactionType.SIMPLE |
| |
| self.last_base = None |
| self.current = start |
| self.transaction_table = TransactionTable(self, self.current) |
| self.transaction_ops = TransactionTableOperations |
| |
| def table(self): |
| return self.transaction_table |
| |
| def check_last_operation_commited(self, operation): |
| if self.last_base == self.current: |
| raise RuntimeError("Cannot create new %s: last operation has not committed" % operation) |
| self.last_base = self.current |
| |
| def update_schema(self): |
| self.check_last_operation_commited("UpdateSchema") |
| |
| @staticmethod |
| def current_id(meta): |
| if meta is not None and meta.current_snapshot() is not None: |
| return meta.current_snapshot().snapshot_id |
| |
| |
| class TransactionType(enum.Enum): |
| |
| CREATE_TABLE = 0 |
| REPLACE_TABLE = 1 |
| SIMPLE = 1 |
| |
| |
| class TransactionTableOperations(TableOperations): |
| |
| def __init__(self, bt): |
| self._bt = bt |
| |
| def current(self): |
| return self._bt.current |
| |
| def refresh(self): |
| return self._bt.current |
| |
| def commit(self, base, metadata): |
| if base != self.current(): |
| raise CommitFailedException("Table metadata refresh is required") |
| |
| old_id = BaseTransaction.current_id(self._bt.current) |
| if old_id is not None and old_id not in (BaseTransaction.current_id(metadata), |
| BaseTransaction.current_id(base)): |
| self._bt.intermediate_snapshot_ids.add(old_id) |
| |
| self._bt.current = metadata |
| |
| def io(self): |
| return self._bt.ops.io() |
| |
| def metadata_file_location(self, file): |
| return self._bt.ops.metadata_file_location(file) |
| |
| def new_snapshot_id(self): |
| return self._bt.ops.new_snapshot_id() |
| |
| |
| class TransactionTable(Table): |
| def __init__(self, bt, current): |
| self.bt = bt |
| self.current = current |
| |
| def refresh(self): |
| pass |
| |
| def new_scan(self): |
| raise RuntimeError("Transaction tables do not support scans") |
| |
| def schema(self): |
| return self.current.schema |
| |
| def spec(self): |
| return self.current.spec |
| |
| def properties(self): |
| return self.current.properties |
| |
| def location(self): |
| return self.current.location |
| |
| def current_snapshot(self): |
| return self.current.current_snapshot() |
| |
| def snapshots(self): |
| return self.current.snapshots |
| |
| def update_schema(self): |
| return self.bt.update_schema() |
| |
| def update_properties(self): |
| return self.bt.update_properties() |
| |
| def update_location(self): |
| return self.bt.update_location() |
| |
| def new_append(self): |
| return self.bt.new_append() |
| |
| def new_rewrite(self): |
| return self.bt.new_rewrite() |
| |
| def new_overwrite(self): |
| return self.bt.new_overwrite() |
| |
| def new_replace_partitions(self): |
| return self.bt.new_replace_partitions() |
| |
| def new_delete(self): |
| return self.bt.new_delete() |
| |
| def expire_snapshots(self): |
| return self.bt.expire_snapshots() |
| |
| def rollback(self): |
| raise RuntimeError("Transaction tables do not support rollback") |
| |
| def new_transaction(self): |
| raise RuntimeError("Cannot create a transaction within a transaction") |