| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint:disable=redefined-outer-name |
| from typing import List, Optional, Union |
| |
| import pyarrow as pa |
| |
| from pyiceberg.catalog import Catalog |
| from pyiceberg.exceptions import NoSuchTableError |
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec |
| from pyiceberg.schema import Schema |
| from pyiceberg.table import Table |
| from pyiceberg.typedef import EMPTY_DICT, Properties |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| LongType, |
| NestedField, |
| StringType, |
| TimestampType, |
| TimestamptzType, |
| ) |
| |
| TABLE_SCHEMA = Schema( |
| NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False), |
| NestedField(field_id=2, name="string", field_type=StringType(), required=False), |
| NestedField(field_id=3, name="string_long", field_type=StringType(), required=False), |
| NestedField(field_id=4, name="int", field_type=IntegerType(), required=False), |
| NestedField(field_id=5, name="long", field_type=LongType(), required=False), |
| NestedField(field_id=6, name="float", field_type=FloatType(), required=False), |
| NestedField(field_id=7, name="double", field_type=DoubleType(), required=False), |
| # NestedField(field_id=8, name="time", field_type=TimeType(), required=False), # Spark does not support time fields |
| NestedField(field_id=8, name="timestamp", field_type=TimestampType(), required=False), |
| NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), required=False), |
| NestedField(field_id=10, name="date", field_type=DateType(), required=False), |
| # NestedField(field_id=11, name="time", field_type=TimeType(), required=False), |
| # NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False), |
| NestedField(field_id=11, name="binary", field_type=BinaryType(), required=False), |
| NestedField(field_id=12, name="fixed", field_type=FixedType(16), required=False), |
| ) |
| |
| |
| def _create_table( |
| session_catalog: Catalog, |
| identifier: str, |
| properties: Properties = EMPTY_DICT, |
| data: Optional[List[pa.Table]] = None, |
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, |
| schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA, |
| ) -> Table: |
| try: |
| session_catalog.drop_table(identifier=identifier) |
| except NoSuchTableError: |
| pass |
| |
| tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec) |
| |
| if data is not None: |
| for d in data: |
| tbl.append(d) |
| |
| return tbl |