blob: 9f1f6df0435a856992cbe0155721d304a3417edc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from typing import List, Optional, Union
import pyarrow as pa
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DoubleType,
FixedType,
FloatType,
IntegerType,
LongType,
NestedField,
StringType,
TimestampType,
TimestamptzType,
)
TABLE_SCHEMA = Schema(
NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="string", field_type=StringType(), required=False),
NestedField(field_id=3, name="string_long", field_type=StringType(), required=False),
NestedField(field_id=4, name="int", field_type=IntegerType(), required=False),
NestedField(field_id=5, name="long", field_type=LongType(), required=False),
NestedField(field_id=6, name="float", field_type=FloatType(), required=False),
NestedField(field_id=7, name="double", field_type=DoubleType(), required=False),
# NestedField(field_id=8, name="time", field_type=TimeType(), required=False), # Spark does not support time fields
NestedField(field_id=8, name="timestamp", field_type=TimestampType(), required=False),
NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), required=False),
NestedField(field_id=10, name="date", field_type=DateType(), required=False),
# NestedField(field_id=11, name="time", field_type=TimeType(), required=False),
# NestedField(field_id=12, name="uuid", field_type=UuidType(), required=False),
NestedField(field_id=11, name="binary", field_type=BinaryType(), required=False),
NestedField(field_id=12, name="fixed", field_type=FixedType(16), required=False),
)
def _create_table(
session_catalog: Catalog,
identifier: str,
properties: Properties = EMPTY_DICT,
data: Optional[List[pa.Table]] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA,
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec)
if data is not None:
for d in data:
tbl.append(d)
return tbl