blob: dedb603e269e8a4312e0c557d41c55e38570395b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/schema_internal.h"
#include <charconv>
#include <cstring>
#include <optional>
#include <string>
#include "iceberg/constants.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"
namespace iceberg {
namespace {
// Constants for Arrow schema metadata
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
constexpr const char* kArrowUuidExtensionName = "arrow.uuid";
constexpr int32_t kUnknownFieldId = -1;
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
std::optional<int32_t> field_id, ArrowSchema* schema) {
ArrowBuffer metadata_buffer;
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
if (field_id.has_value()) {
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
&metadata_buffer, ArrowCharView(std::string(kParquetFieldIdKey).c_str()),
ArrowCharView(std::to_string(field_id.value()).c_str())));
}
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = static_cast<const StructType&>(type);
const auto& fields = struct_type.fields();
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(schema, fields.size()));
for (size_t i = 0; i < fields.size(); i++) {
const auto& field = fields[i];
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*field.type(), field.optional(),
field.name(), field.field_id(),
schema->children[i]));
}
} break;
case TypeId::kList: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_LIST));
const auto& list_type = static_cast<const ListType&>(type);
const auto& elem_field = list_type.fields()[0];
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*elem_field.type(), elem_field.optional(),
elem_field.name(), elem_field.field_id(),
schema->children[0]));
} break;
case TypeId::kMap: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_MAP));
const auto& map_type = static_cast<const MapType&>(type);
const auto& key_field = map_type.key();
const auto& value_field = map_type.value();
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*key_field.type(), key_field.optional(),
key_field.name(), key_field.field_id(),
schema->children[0]->children[0]));
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*value_field.type(), value_field.optional(),
value_field.name(), value_field.field_id(),
schema->children[0]->children[1]));
} break;
case TypeId::kBoolean:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_BOOL));
break;
case TypeId::kInt:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_INT32));
break;
case TypeId::kLong:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_INT64));
break;
case TypeId::kFloat:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_FLOAT));
break;
case TypeId::kDouble:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_DOUBLE));
break;
case TypeId::kDecimal: {
const auto& decimal_type = static_cast<const DecimalType&>(type);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL128,
decimal_type.precision(),
decimal_type.scale()));
} break;
case TypeId::kDate:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_DATE32));
break;
case TypeId::kTime: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIME64,
NANOARROW_TIME_UNIT_MICRO,
/*timezone=*/nullptr));
} break;
case TypeId::kTimestamp: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIMESTAMP,
NANOARROW_TIME_UNIT_MICRO,
/*timezone=*/nullptr));
} break;
case TypeId::kTimestampTz: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(
schema, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, "UTC"));
} break;
case TypeId::kString:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_STRING));
break;
case TypeId::kBinary:
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_BINARY));
break;
case TypeId::kFixed: {
const auto& fixed_type = static_cast<const FixedType&>(type);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_type.length()));
} break;
case TypeId::kUuid: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
NANOARROW_RETURN_NOT_OK(
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
ArrowCharView(kArrowUuidExtensionName)));
} break;
}
if (!name.empty()) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(schema, std::string(name).c_str()));
}
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(
schema, reinterpret_cast<const char*>(metadata_buffer.data)));
ArrowBufferReset(&metadata_buffer);
if (optional) {
schema->flags |= ARROW_FLAG_NULLABLE;
} else {
schema->flags &= ~ARROW_FLAG_NULLABLE;
}
return NANOARROW_OK;
}
} // namespace
Status ToArrowSchema(const Schema& schema, ArrowSchema* out) {
if (out == nullptr) [[unlikely]] {
return InvalidArgument("Output Arrow schema cannot be null");
}
ArrowSchemaInit(out);
if (ArrowErrorCode errorCode = ToArrowSchema(schema, /*optional=*/false, /*name=*/"",
/*field_id=*/std::nullopt, out);
errorCode != NANOARROW_OK) {
return InvalidSchema(
"Failed to convert Iceberg schema to Arrow schema, error code: {}", errorCode);
}
return {};
}
namespace {
int32_t GetFieldId(const ArrowSchema& schema) {
if (schema.metadata == nullptr) {
return kUnknownFieldId;
}
ArrowStringView field_id_key{.data = kParquetFieldIdKey.data(),
.size_bytes = kParquetFieldIdKey.size()};
ArrowStringView field_id_value;
if (ArrowMetadataGetValue(schema.metadata, field_id_key, &field_id_value) !=
NANOARROW_OK) {
return kUnknownFieldId;
}
int32_t field_id = kUnknownFieldId;
std::from_chars(field_id_value.data, field_id_value.data + field_id_value.size_bytes,
field_id);
return field_id;
}
Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
auto to_schema_field =
[](const ArrowSchema& schema) -> Result<std::unique_ptr<SchemaField>> {
ICEBERG_ASSIGN_OR_RAISE(auto field_type, FromArrowSchema(schema));
auto field_id = GetFieldId(schema);
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
return std::make_unique<SchemaField>(field_id, schema.name, std::move(field_type),
is_optional);
};
ArrowError arrow_error;
ArrowErrorInit(&arrow_error);
ArrowSchemaView schema_view;
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
error_code != NANOARROW_OK) {
return InvalidSchema("Failed to read Arrow schema, code: {}, message: {}", error_code,
arrow_error.message);
}
switch (schema_view.type) {
case NANOARROW_TYPE_STRUCT: {
std::vector<SchemaField> fields;
fields.reserve(schema.n_children);
for (int i = 0; i < schema.n_children; i++) {
ICEBERG_ASSIGN_OR_RAISE(auto field, to_schema_field(*schema.children[i]));
fields.emplace_back(std::move(*field));
}
return std::make_shared<StructType>(std::move(fields));
}
case NANOARROW_TYPE_LIST: {
ICEBERG_ASSIGN_OR_RAISE(auto element_field_result,
to_schema_field(*schema.children[0]));
return std::make_shared<ListType>(std::move(*element_field_result));
}
case NANOARROW_TYPE_MAP: {
ICEBERG_ASSIGN_OR_RAISE(auto key_field,
to_schema_field(*schema.children[0]->children[0]));
ICEBERG_ASSIGN_OR_RAISE(auto value_field,
to_schema_field(*schema.children[0]->children[1]));
return std::make_shared<MapType>(std::move(*key_field), std::move(*value_field));
}
case NANOARROW_TYPE_BOOL:
return iceberg::boolean();
case NANOARROW_TYPE_INT32:
return iceberg::int32();
case NANOARROW_TYPE_INT64:
return iceberg::int64();
case NANOARROW_TYPE_FLOAT:
return iceberg::float32();
case NANOARROW_TYPE_DOUBLE:
return iceberg::float64();
case NANOARROW_TYPE_DECIMAL128:
return iceberg::decimal(schema_view.decimal_precision, schema_view.decimal_scale);
case NANOARROW_TYPE_DATE32:
return iceberg::date();
case NANOARROW_TYPE_TIME64:
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return InvalidSchema("Unsupported time unit for Arrow time type: {}",
static_cast<int>(schema_view.time_unit));
}
return iceberg::time();
case NANOARROW_TYPE_TIMESTAMP: {
bool with_timezone =
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return InvalidSchema("Unsupported time unit for Arrow timestamp type: {}",
static_cast<int>(schema_view.time_unit));
}
if (with_timezone) {
return iceberg::timestamp_tz();
} else {
return iceberg::timestamp();
}
}
case NANOARROW_TYPE_STRING:
return iceberg::string();
case NANOARROW_TYPE_BINARY:
return iceberg::binary();
case NANOARROW_TYPE_FIXED_SIZE_BINARY: {
if (auto extension_name = std::string_view(schema_view.extension_name.data,
schema_view.extension_name.size_bytes);
extension_name == kArrowUuidExtensionName) {
if (schema_view.fixed_size != 16) {
return InvalidSchema("UUID type must have a fixed size of 16");
}
return iceberg::uuid();
}
return iceberg::fixed(schema_view.fixed_size);
}
default:
return InvalidSchema("Unsupported Arrow type: {}",
ArrowTypeString(schema_view.type));
}
}
} // namespace
std::unique_ptr<Schema> FromStructType(StructType&& struct_type,
std::optional<int32_t> schema_id_opt) {
std::vector<SchemaField> fields;
fields.reserve(struct_type.fields().size());
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
return std::make_unique<Schema>(std::move(fields), schema_id);
}
Result<std::unique_ptr<Schema>> FromArrowSchema(const ArrowSchema& schema,
std::optional<int32_t> schema_id) {
ICEBERG_ASSIGN_OR_RAISE(auto type, FromArrowSchema(schema));
if (type->type_id() != TypeId::kStruct) {
return InvalidSchema("Arrow schema must be a struct type for Iceberg schema");
}
auto& struct_type = static_cast<StructType&>(*type);
return FromStructType(std::move(struct_type), schema_id);
}
std::unique_ptr<StructType> ToStructType(const Schema& schema) {
std::vector<SchemaField> fields(schema.fields().begin(), schema.fields().end());
return std::make_unique<StructType>(std::move(fields));
}
} // namespace iceberg