| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Schema conversion utilities between Apache Arrow and Fory schema types. |
| |
| This module provides bidirectional conversion between Arrow schema types |
| and Fory's internal schema representation for the row format. |
| """ |
| |
| import pyarrow as pa |
| from pyarrow import types as pa_types |
| |
| |
| def arrow_type_to_fory_type_id(arrow_type): |
| """ |
| Convert an Arrow data type to a Fory TypeId value. |
| |
| Args: |
| arrow_type: A PyArrow DataType instance. |
| |
| Returns: |
| int: The corresponding Fory TypeId value. |
| |
| Raises: |
| NotImplementedError: If the Arrow type is not supported. |
| """ |
| # Boolean |
| if pa_types.is_boolean(arrow_type): |
| return 1 # BOOL |
| |
| # Integer types |
| if pa_types.is_int8(arrow_type): |
| return 2 # INT8 |
| if pa_types.is_int16(arrow_type): |
| return 3 # INT16 |
| if pa_types.is_int32(arrow_type): |
| return 4 # INT32 |
| if pa_types.is_int64(arrow_type): |
| return 6 # INT64 |
| |
| # Floating point types |
| if pa_types.is_float16(arrow_type): |
| return 17 # FLOAT16 |
| if pa_types.is_float32(arrow_type): |
| return 19 # FLOAT32 |
| if pa_types.is_float64(arrow_type): |
| return 20 # FLOAT64 |
| |
| # String and binary |
| if pa_types.is_string(arrow_type) or pa_types.is_large_string(arrow_type): |
| return 21 # STRING |
| if pa_types.is_binary(arrow_type) or pa_types.is_large_binary(arrow_type): |
| return 41 # BINARY |
| |
| # Date/time types |
| if pa_types.is_date32(arrow_type): |
| return 39 # DATE |
| if pa_types.is_timestamp(arrow_type): |
| return 38 # TIMESTAMP |
| if pa_types.is_duration(arrow_type): |
| return 37 # DURATION |
| |
| # Decimal |
| if pa_types.is_decimal(arrow_type): |
| return 40 # DECIMAL |
| |
| # Complex types |
| if pa_types.is_list(arrow_type) or pa_types.is_large_list(arrow_type): |
| return 22 # LIST |
| if pa_types.is_map(arrow_type): |
| return 24 # MAP |
| if pa_types.is_struct(arrow_type): |
| return 27 # STRUCT |
| |
| raise NotImplementedError(f"Unsupported Arrow type: {arrow_type}") |
| |
| |
| def fory_type_id_to_arrow_type(type_id, precision=None, scale=None, list_type=None, map_key_type=None, map_value_type=None, struct_fields=None): |
| """ |
| Convert a Fory TypeId value to an Arrow data type. |
| |
| Args: |
| type_id: The Fory TypeId value (int). |
| precision: Precision for decimal types. |
| scale: Scale for decimal types. |
| list_type: Value type for list types (Arrow type). |
| map_key_type: Key type for map types (Arrow type). |
| map_value_type: Value type for map types (Arrow type). |
| struct_fields: List of (name, type) tuples for struct types. |
| |
| Returns: |
| A PyArrow DataType instance. |
| |
| Raises: |
| NotImplementedError: If the Fory type is not supported. |
| """ |
| type_map = { |
| 1: pa.bool_(), # BOOL |
| 2: pa.int8(), # INT8 |
| 3: pa.int16(), # INT16 |
| 4: pa.int32(), # INT32 |
| 6: pa.int64(), # INT64 |
| 17: pa.float16(), # FLOAT16 |
| 19: pa.float32(), # FLOAT32 |
| 20: pa.float64(), # FLOAT64 |
| 21: pa.utf8(), # STRING |
| 37: pa.duration("ns"), # DURATION |
| 38: pa.timestamp("us"), # TIMESTAMP |
| 39: pa.date32(), # DATE |
| 41: pa.binary(), # BINARY |
| } |
| |
| if type_id in type_map: |
| return type_map[type_id] |
| |
| # Decimal |
| if type_id == 40: # DECIMAL |
| return pa.decimal128(precision or 38, scale or 18) |
| |
| # List |
| if type_id == 22: # LIST |
| if list_type is None: |
| raise ValueError("list_type must be provided for LIST type") |
| return pa.list_(list_type) |
| |
| # Map |
| if type_id == 24: # MAP |
| if map_key_type is None or map_value_type is None: |
| raise ValueError("map_key_type and map_value_type must be provided for MAP type") |
| return pa.map_(map_key_type, map_value_type) |
| |
| # Struct |
| if type_id == 27: # STRUCT |
| if struct_fields is None: |
| raise ValueError("struct_fields must be provided for STRUCT type") |
| return pa.struct(struct_fields) |
| |
| raise NotImplementedError(f"Unsupported Fory type ID: {type_id}") |
| |
| |
| def arrow_schema_to_fory_field_list(arrow_schema): |
| """ |
| Convert an Arrow schema to a list of field specifications for Fory. |
| |
| Args: |
| arrow_schema: A PyArrow Schema instance. |
| |
| Returns: |
| list: A list of dictionaries with field specifications. |
| """ |
| fields = [] |
| for i in range(len(arrow_schema)): |
| field = arrow_schema.field(i) |
| field_spec = { |
| "name": field.name, |
| "type_id": arrow_type_to_fory_type_id(field.type), |
| "nullable": field.nullable, |
| "arrow_type": field.type, |
| } |
| # Handle nested types |
| if pa_types.is_list(field.type): |
| field_spec["value_type"] = field.type.value_type |
| elif pa_types.is_map(field.type): |
| field_spec["key_type"] = field.type.key_type |
| field_spec["item_type"] = field.type.item_type |
| elif pa_types.is_struct(field.type): |
| field_spec["struct_fields"] = [(field.type.field(j).name, field.type.field(j).type) for j in range(field.type.num_fields)] |
| elif pa_types.is_decimal(field.type): |
| field_spec["precision"] = field.type.precision |
| field_spec["scale"] = field.type.scale |
| fields.append(field_spec) |
| return fields |
| |
| |
| def fory_field_list_to_arrow_schema(field_list): |
| """ |
| Convert a list of Fory field specifications to an Arrow schema. |
| |
| Args: |
| field_list: A list of dictionaries with field specifications. |
| |
| Returns: |
| A PyArrow Schema instance. |
| """ |
| arrow_fields = [] |
| for field_spec in field_list: |
| name = field_spec["name"] |
| type_id = field_spec["type_id"] |
| nullable = field_spec.get("nullable", True) |
| |
| # Handle nested types |
| if type_id == 21: # LIST |
| value_type = field_spec.get("value_type") |
| arrow_type = pa.list_(value_type) |
| elif type_id == 23: # MAP |
| key_type = field_spec.get("key_type") |
| item_type = field_spec.get("item_type") |
| arrow_type = pa.map_(key_type, item_type) |
| elif type_id == 15: # STRUCT |
| struct_fields = field_spec.get("struct_fields", []) |
| arrow_type = pa.struct(struct_fields) |
| elif type_id == 27: # DECIMAL |
| precision = field_spec.get("precision", 38) |
| scale = field_spec.get("scale", 18) |
| arrow_type = pa.decimal128(precision, scale) |
| else: |
| arrow_type = fory_type_id_to_arrow_type(type_id) |
| |
| arrow_fields.append(pa.field(name, arrow_type, nullable=nullable)) |
| |
| return pa.schema(arrow_fields) |
| |
| |
| def convert_arrow_type_recursive(arrow_type): |
| """ |
| Recursively convert an Arrow type to a serializable specification. |
| |
| Args: |
| arrow_type: A PyArrow DataType instance. |
| |
| Returns: |
| dict: A dictionary specification for the type. |
| """ |
| type_id = arrow_type_to_fory_type_id(arrow_type) |
| spec = {"type_id": type_id} |
| |
| if pa_types.is_list(arrow_type): |
| spec["value_type"] = convert_arrow_type_recursive(arrow_type.value_type) |
| elif pa_types.is_map(arrow_type): |
| spec["key_type"] = convert_arrow_type_recursive(arrow_type.key_type) |
| spec["item_type"] = convert_arrow_type_recursive(arrow_type.item_type) |
| elif pa_types.is_struct(arrow_type): |
| spec["fields"] = [] |
| for i in range(arrow_type.num_fields): |
| field = arrow_type.field(i) |
| spec["fields"].append( |
| { |
| "name": field.name, |
| "type": convert_arrow_type_recursive(field.type), |
| "nullable": field.nullable, |
| } |
| ) |
| elif pa_types.is_decimal(arrow_type): |
| spec["precision"] = arrow_type.precision |
| spec["scale"] = arrow_type.scale |
| |
| return spec |
| |
| |
| def reconstruct_arrow_type(spec): |
| """ |
| Reconstruct an Arrow type from a type specification. |
| |
| Args: |
| spec: A dictionary specification for the type. |
| |
| Returns: |
| A PyArrow DataType instance. |
| """ |
| type_id = spec["type_id"] |
| |
| if type_id == 21: # LIST |
| value_type = reconstruct_arrow_type(spec["value_type"]) |
| return pa.list_(value_type) |
| elif type_id == 23: # MAP |
| key_type = reconstruct_arrow_type(spec["key_type"]) |
| item_type = reconstruct_arrow_type(spec["item_type"]) |
| return pa.map_(key_type, item_type) |
| elif type_id == 15: # STRUCT |
| fields = [] |
| for field_spec in spec["fields"]: |
| field_type = reconstruct_arrow_type(field_spec["type"]) |
| fields.append(pa.field(field_spec["name"], field_type, nullable=field_spec.get("nullable", True))) |
| return pa.struct(fields) |
| elif type_id == 27: # DECIMAL |
| return pa.decimal128(spec.get("precision", 38), spec.get("scale", 18)) |
| else: |
| return fory_type_id_to_arrow_type(type_id) |