blob: 19616f3b7bde6a8add76a23339d17a740d171550 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import enum
from iceberg.api import (Table,
Transaction)
from iceberg.core import TableOperations
from iceberg.exceptions import CommitFailedException
class BaseTransaction(Transaction):
@staticmethod
def replace_table_transaction(ops, start):
return BaseTransaction(ops, start)
@staticmethod
def create_table_transaction(ops, start):
if ops.current() is not None:
raise RuntimeError("Cannot start create table transaction: table already exists")
@staticmethod
def new_transaction(ops):
return BaseTransaction(ops, ops.refesh())
def __init__(self, ops, start):
self.ops = ops
self.updates = list()
self.intermediate_snapshot_ids = set()
self.base = ops.current
if self.base is None and start is None:
self.type = TransactionType.CREATE_TABLE
elif self.base is not None and start != self.base:
self.type = TransactionType.REPLACE_TABLE
else:
self.type = TransactionType.SIMPLE
self.last_base = None
self.current = start
self.transaction_table = TransactionTable(self, self.current)
self.transaction_ops = TransactionTableOperations
def table(self):
return self.transaction_table
def check_last_operation_commited(self, operation):
if self.last_base == self.current:
raise RuntimeError("Cannot create new %s: last operation has not committed" % operation)
self.last_base = self.current
def update_schema(self):
self.check_last_operation_commited("UpdateSchema")
@staticmethod
def current_id(meta):
if meta is not None and meta.current_snapshot() is not None:
return meta.current_snapshot().snapshot_id
class TransactionType(enum.Enum):
CREATE_TABLE = 0
REPLACE_TABLE = 1
SIMPLE = 1
class TransactionTableOperations(TableOperations):
def __init__(self, bt):
self._bt = bt
def current(self):
return self._bt.current
def refresh(self):
return self._bt.current
def commit(self, base, metadata):
if base != self.current():
raise CommitFailedException("Table metadata refresh is required")
old_id = BaseTransaction.current_id(self._bt.current)
if old_id is not None and old_id not in (BaseTransaction.current_id(metadata),
BaseTransaction.current_id(base)):
self._bt.intermediate_snapshot_ids.add(old_id)
self._bt.current = metadata
def io(self):
return self._bt.ops.io()
def metadata_file_location(self, file):
return self._bt.ops.metadata_file_location(file)
def new_snapshot_id(self):
return self._bt.ops.new_snapshot_id()
class TransactionTable(Table):
def __init__(self, bt, current):
self.bt = bt
self.current = current
def refresh(self):
pass
def new_scan(self):
raise RuntimeError("Transaction tables do not support scans")
def schema(self):
return self.current.schema
def spec(self):
return self.current.spec
def properties(self):
return self.current.properties
def location(self):
return self.current.location
def current_snapshot(self):
return self.current.current_snapshot()
def snapshots(self):
return self.current.snapshots
def update_schema(self):
return self.bt.update_schema()
def update_properties(self):
return self.bt.update_properties()
def update_location(self):
return self.bt.update_location()
def new_append(self):
return self.bt.new_append()
def new_rewrite(self):
return self.bt.new_rewrite()
def new_overwrite(self):
return self.bt.new_overwrite()
def new_replace_partitions(self):
return self.bt.new_replace_partitions()
def new_delete(self):
return self.bt.new_delete()
def expire_snapshots(self):
return self.bt.expire_snapshots()
def rollback(self):
raise RuntimeError("Transaction tables do not support rollback")
def new_transaction(self):
raise RuntimeError("Cannot create a transaction within a transaction")