blob: 7d6c9ee25a4d826b81553f900d7c2bc01c142c71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <algorithm>
#include <cstdint>
#include <format>
#include <regex>
#include <unordered_set>
#include <utility>
#include <nlohmann/json.hpp>
#include "iceberg/constants.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/partition_field.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/timepoint.h"
namespace iceberg {
namespace {
// Transform constants
constexpr std::string_view kTransform = "transform";
constexpr std::string_view kSourceId = "source-id";
constexpr std::string_view kDirection = "direction";
constexpr std::string_view kNullOrder = "null-order";
// Sort order constants
constexpr std::string_view kOrderId = "order-id";
constexpr std::string_view kFields = "fields";
// Schema constants
constexpr std::string_view kSchemaId = "schema-id";
constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids";
// Type constants
constexpr std::string_view kType = "type";
constexpr std::string_view kStruct = "struct";
constexpr std::string_view kList = "list";
constexpr std::string_view kMap = "map";
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";
constexpr std::string_view kDoc = "doc";
constexpr std::string_view kName = "name";
constexpr std::string_view kNamespace = "namespace";
constexpr std::string_view kNames = "names";
constexpr std::string_view kId = "id";
constexpr std::string_view kInitialDefault = "initial-default";
constexpr std::string_view kWriteDefault = "write-default";
constexpr std::string_view kFieldId = "field-id";
constexpr std::string_view kElementId = "element-id";
constexpr std::string_view kKeyId = "key-id";
constexpr std::string_view kValueId = "value-id";
constexpr std::string_view kRequired = "required";
constexpr std::string_view kElementRequired = "element-required";
constexpr std::string_view kValueRequired = "value-required";
// Snapshot constants
constexpr std::string_view kSpecId = "spec-id";
constexpr std::string_view kSnapshotId = "snapshot-id";
constexpr std::string_view kParentSnapshotId = "parent-snapshot-id";
constexpr std::string_view kSequenceNumber = "sequence-number";
constexpr std::string_view kTimestampMs = "timestamp-ms";
constexpr std::string_view kManifestList = "manifest-list";
constexpr std::string_view kSummary = "summary";
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";
const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
SnapshotSummaryFields::kOperation,
SnapshotSummaryFields::kAddedDataFiles,
SnapshotSummaryFields::kDeletedDataFiles,
SnapshotSummaryFields::kTotalDataFiles,
SnapshotSummaryFields::kAddedDeleteFiles,
SnapshotSummaryFields::kAddedEqDeleteFiles,
SnapshotSummaryFields::kRemovedEqDeleteFiles,
SnapshotSummaryFields::kAddedPosDeleteFiles,
SnapshotSummaryFields::kRemovedPosDeleteFiles,
SnapshotSummaryFields::kAddedDVs,
SnapshotSummaryFields::kRemovedDVs,
SnapshotSummaryFields::kRemovedDeleteFiles,
SnapshotSummaryFields::kTotalDeleteFiles,
SnapshotSummaryFields::kAddedRecords,
SnapshotSummaryFields::kDeletedRecords,
SnapshotSummaryFields::kTotalRecords,
SnapshotSummaryFields::kAddedFileSize,
SnapshotSummaryFields::kRemovedFileSize,
SnapshotSummaryFields::kTotalFileSize,
SnapshotSummaryFields::kAddedPosDeletes,
SnapshotSummaryFields::kRemovedPosDeletes,
SnapshotSummaryFields::kTotalPosDeletes,
SnapshotSummaryFields::kAddedEqDeletes,
SnapshotSummaryFields::kRemovedEqDeletes,
SnapshotSummaryFields::kTotalEqDeletes,
SnapshotSummaryFields::kDeletedDuplicatedFiles,
SnapshotSummaryFields::kChangedPartitionCountProp,
SnapshotSummaryFields::kWAPId,
SnapshotSummaryFields::kPublishedWAPId,
SnapshotSummaryFields::kSourceSnapshotId,
SnapshotSummaryFields::kEngineName,
SnapshotSummaryFields::kEngineVersion};
const std::unordered_set<std::string_view> kValidDataOperation = {
DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
DataOperation::kDelete};
// TableMetadata constants
constexpr std::string_view kFormatVersion = "format-version";
constexpr std::string_view kTableUuid = "table-uuid";
constexpr std::string_view kLocation = "location";
constexpr std::string_view kLastSequenceNumber = "last-sequence-number";
constexpr std::string_view kLastUpdatedMs = "last-updated-ms";
constexpr std::string_view kLastColumnId = "last-column-id";
constexpr std::string_view kSchema = "schema";
constexpr std::string_view kSchemas = "schemas";
constexpr std::string_view kCurrentSchemaId = "current-schema-id";
constexpr std::string_view kPartitionSpec = "partition-spec";
constexpr std::string_view kPartitionSpecs = "partition-specs";
constexpr std::string_view kDefaultSpecId = "default-spec-id";
constexpr std::string_view kLastPartitionId = "last-partition-id";
constexpr std::string_view kProperties = "properties";
constexpr std::string_view kCurrentSnapshotId = "current-snapshot-id";
constexpr std::string_view kSnapshots = "snapshots";
constexpr std::string_view kSnapshotLog = "snapshot-log";
constexpr std::string_view kMetadataLog = "metadata-log";
constexpr std::string_view kSortOrders = "sort-orders";
constexpr std::string_view kDefaultSortOrderId = "default-sort-order-id";
constexpr std::string_view kRefs = "refs";
constexpr std::string_view kStatistics = "statistics";
constexpr std::string_view kPartitionStatistics = "partition-statistics";
constexpr std::string_view kNextRowId = "next-row-id";
constexpr std::string_view kMetadataFile = "metadata-file";
constexpr std::string_view kStatisticsPath = "statistics-path";
constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes";
constexpr std::string_view kFileFooterSizeInBytes = "file-footer-size-in-bytes";
constexpr std::string_view kBlobMetadata = "blob-metadata";
// TableUpdate action constants
constexpr std::string_view kAction = "action";
constexpr std::string_view kActionAssignUUID = "assign-uuid";
constexpr std::string_view kActionUpgradeFormatVersion = "upgrade-format-version";
constexpr std::string_view kActionAddSchema = "add-schema";
constexpr std::string_view kActionSetCurrentSchema = "set-current-schema";
constexpr std::string_view kActionAddPartitionSpec = "add-spec";
constexpr std::string_view kActionSetDefaultPartitionSpec = "set-default-spec";
constexpr std::string_view kActionRemovePartitionSpecs = "remove-partition-specs";
constexpr std::string_view kActionRemoveSchemas = "remove-schemas";
constexpr std::string_view kActionAddSortOrder = "add-sort-order";
constexpr std::string_view kActionSetDefaultSortOrder = "set-default-sort-order";
constexpr std::string_view kActionAddSnapshot = "add-snapshot";
constexpr std::string_view kActionRemoveSnapshots = "remove-snapshots";
constexpr std::string_view kActionRemoveSnapshotRef = "remove-snapshot-ref";
constexpr std::string_view kActionSetSnapshotRef = "set-snapshot-ref";
constexpr std::string_view kActionSetProperties = "set-properties";
constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
constexpr std::string_view kActionSetPartitionStatistics = "set-partition-statistics";
constexpr std::string_view kActionRemovePartitionStatistics =
"remove-partition-statistics";
// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
constexpr std::string_view kSpec = "spec";
constexpr std::string_view kSpecIds = "spec-ids";
constexpr std::string_view kSchemaIds = "schema-ids";
constexpr std::string_view kSortOrder = "sort-order";
constexpr std::string_view kSortOrderId = "sort-order-id";
constexpr std::string_view kSnapshot = "snapshot";
constexpr std::string_view kSnapshotIds = "snapshot-ids";
constexpr std::string_view kRefName = "ref-name";
constexpr std::string_view kUpdates = "updates";
constexpr std::string_view kRemovals = "removals";
// TableRequirement type constants
constexpr std::string_view kRequirementAssertDoesNotExist = "assert-create";
constexpr std::string_view kRequirementAssertUUID = "assert-table-uuid";
constexpr std::string_view kRequirementAssertRefSnapshotID = "assert-ref-snapshot-id";
constexpr std::string_view kRequirementAssertLastAssignedFieldId =
"assert-last-assigned-field-id";
constexpr std::string_view kRequirementAssertCurrentSchemaID = "assert-current-schema-id";
constexpr std::string_view kRequirementAssertLastAssignedPartitionId =
"assert-last-assigned-partition-id";
constexpr std::string_view kRequirementAssertDefaultSpecID = "assert-default-spec-id";
constexpr std::string_view kRequirementAssertDefaultSortOrderID =
"assert-default-sort-order-id";
constexpr std::string_view kLastAssignedFieldId = "last-assigned-field-id";
constexpr std::string_view kLastAssignedPartitionId = "last-assigned-partition-id";
} // namespace
nlohmann::json ToJson(const SortField& sort_field) {
nlohmann::json json;
json[kTransform] = std::format("{}", *sort_field.transform());
json[kSourceId] = sort_field.source_id();
json[kDirection] = std::format("{}", sort_field.direction());
json[kNullOrder] = std::format("{}", sort_field.null_order());
return json;
}
nlohmann::json ToJson(const SortOrder& sort_order) {
nlohmann::json json;
json[kOrderId] = sort_order.order_id();
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : sort_order.fields()) {
fields_json.push_back(ToJson(field));
}
json[kFields] = fields_json;
return json;
}
Result<std::unique_ptr<SortField>> SortFieldFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
ICEBERG_ASSIGN_OR_RAISE(
auto transform,
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
ICEBERG_ASSIGN_OR_RAISE(
auto direction,
GetJsonValue<std::string>(json, kDirection).and_then(SortDirectionFromString));
ICEBERG_ASSIGN_OR_RAISE(
auto null_order,
GetJsonValue<std::string>(json, kNullOrder).and_then(NullOrderFromString));
return std::make_unique<SortField>(source_id, std::move(transform), direction,
null_order);
}
Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
const nlohmann::json& json, const std::shared_ptr<Schema>& current_schema) {
ICEBERG_ASSIGN_OR_RAISE(auto order_id, GetJsonValue<int32_t>(json, kOrderId));
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
std::vector<SortField> sort_fields;
for (const auto& field_json : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_field, SortFieldFromJson(field_json));
sort_fields.push_back(std::move(*sort_field));
}
return SortOrder::Make(*current_schema, order_id, std::move(sort_fields));
}
Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto order_id, GetJsonValue<int32_t>(json, kOrderId));
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
std::vector<SortField> sort_fields;
for (const auto& field_json : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_field, SortFieldFromJson(field_json));
sort_fields.push_back(std::move(*sort_field));
}
return SortOrder::Make(order_id, std::move(sort_fields));
}
nlohmann::json ToJson(const SchemaField& field) {
nlohmann::json json;
json[kId] = field.field_id();
json[kName] = field.name();
json[kRequired] = !field.optional();
json[kType] = ToJson(*field.type());
if (!field.doc().empty()) {
json[kDoc] = field.doc();
}
return json;
}
nlohmann::json ToJson(const Type& type) {
switch (type.type_id()) {
case TypeId::kStruct: {
const auto& struct_type = internal::checked_cast<const StructType&>(type);
nlohmann::json json;
json[kType] = kStruct;
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : struct_type.fields()) {
fields_json.push_back(ToJson(field));
// TODO(gangwu): add default values
}
json[kFields] = fields_json;
return json;
}
case TypeId::kList: {
const auto& list_type = internal::checked_cast<const ListType&>(type);
nlohmann::json json;
json[kType] = kList;
const auto& element_field = list_type.fields().front();
json[kElementId] = element_field.field_id();
json[kElementRequired] = !element_field.optional();
json[kElement] = ToJson(*element_field.type());
return json;
}
case TypeId::kMap: {
const auto& map_type = internal::checked_cast<const MapType&>(type);
nlohmann::json json;
json[std::string(kType)] = kMap;
const auto& key_field = map_type.key();
json[kKeyId] = key_field.field_id();
json[kKey] = ToJson(*key_field.type());
const auto& value_field = map_type.value();
json[kValueId] = value_field.field_id();
json[kValueRequired] = !value_field.optional();
json[kValue] = ToJson(*value_field.type());
return json;
}
case TypeId::kBoolean:
return "boolean";
case TypeId::kInt:
return "int";
case TypeId::kLong:
return "long";
case TypeId::kFloat:
return "float";
case TypeId::kDouble:
return "double";
case TypeId::kDecimal: {
const auto& decimal_type = internal::checked_cast<const DecimalType&>(type);
return std::format("decimal({},{})", decimal_type.precision(),
decimal_type.scale());
}
case TypeId::kDate:
return "date";
case TypeId::kTime:
return "time";
case TypeId::kTimestamp:
return "timestamp";
case TypeId::kTimestampTz:
return "timestamptz";
case TypeId::kString:
return "string";
case TypeId::kBinary:
return "binary";
case TypeId::kFixed: {
const auto& fixed_type = internal::checked_cast<const FixedType&>(type);
return std::format("fixed[{}]", fixed_type.length());
}
case TypeId::kUuid:
return "uuid";
}
std::unreachable();
}
nlohmann::json ToJson(const Schema& schema) {
nlohmann::json json = ToJson(internal::checked_cast<const Type&>(schema));
json[kSchemaId] = schema.schema_id();
if (!schema.IdentifierFieldIds().empty()) {
json[kIdentifierFieldIds] = schema.IdentifierFieldIds();
}
return json;
}
Result<std::string> ToJsonString(const Schema& schema) {
return ToJsonString(ToJson(schema));
}
nlohmann::json ToJson(const SnapshotRef& ref) {
nlohmann::json json;
json[kSnapshotId] = ref.snapshot_id;
json[kType] = std::format("{}", ref.type());
if (ref.type() == SnapshotRefType::kBranch) {
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
SetOptionalField(json, kMinSnapshotsToKeep, branch.min_snapshots_to_keep);
SetOptionalField(json, kMaxSnapshotAgeMs, branch.max_snapshot_age_ms);
SetOptionalField(json, kMaxRefAgeMs, branch.max_ref_age_ms);
} else if (ref.type() == SnapshotRefType::kTag) {
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
SetOptionalField(json, kMaxRefAgeMs, tag.max_ref_age_ms);
}
return json;
}
nlohmann::json ToJson(const Snapshot& snapshot) {
nlohmann::json json;
json[kSnapshotId] = snapshot.snapshot_id;
SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) {
json[kSequenceNumber] = snapshot.sequence_number;
}
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
if (snapshot.Operation().has_value()) {
json[kSummary] = snapshot.summary;
}
SetOptionalField(json, kSchemaId, snapshot.schema_id);
return json;
}
namespace {
Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto json_fields, GetJsonValue<nlohmann::json>(json, kFields));
std::vector<SchemaField> fields;
for (const auto& field_json : json_fields) {
ICEBERG_ASSIGN_OR_RAISE(auto field, FieldFromJson(field_json));
fields.emplace_back(std::move(*field));
}
return std::make_unique<StructType>(std::move(fields));
}
Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue<int32_t>(json, kElementId));
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
GetJsonValue<bool>(json, kElementRequired));
return std::make_unique<ListType>(
SchemaField(element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required));
}
Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(
auto key_type, GetJsonValue<nlohmann::json>(json, kKey).and_then(TypeFromJson));
ICEBERG_ASSIGN_OR_RAISE(
auto value_type, GetJsonValue<nlohmann::json>(json, kValue).and_then(TypeFromJson));
ICEBERG_ASSIGN_OR_RAISE(auto key_id, GetJsonValue<int32_t>(json, kKeyId));
ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue<int32_t>(json, kValueId));
ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue<bool>(json, kValueRequired));
SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
/*optional=*/false);
SchemaField value_field(value_id, std::string(MapType::kValueName),
std::move(value_type), !value_required);
return std::make_unique<MapType>(std::move(key_field), std::move(value_field));
}
} // namespace
Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
if (json.is_string()) {
std::string type_str = json.get<std::string>();
if (type_str == "boolean") {
return std::make_unique<BooleanType>();
} else if (type_str == "int") {
return std::make_unique<IntType>();
} else if (type_str == "long") {
return std::make_unique<LongType>();
} else if (type_str == "float") {
return std::make_unique<FloatType>();
} else if (type_str == "double") {
return std::make_unique<DoubleType>();
} else if (type_str == "date") {
return std::make_unique<DateType>();
} else if (type_str == "time") {
return std::make_unique<TimeType>();
} else if (type_str == "timestamp") {
return std::make_unique<TimestampType>();
} else if (type_str == "timestamptz") {
return std::make_unique<TimestampTzType>();
} else if (type_str == "string") {
return std::make_unique<StringType>();
} else if (type_str == "binary") {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
if (std::regex_match(type_str, match, fixed_regex)) {
ICEBERG_ASSIGN_OR_RAISE(auto length,
StringUtils::ParseNumber<int32_t>(match[1].str()));
return std::make_unique<FixedType>(length);
}
return JsonParseError("Invalid fixed type: {}", type_str);
} else if (type_str.starts_with("decimal")) {
std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
std::smatch match;
if (std::regex_match(type_str, match, decimal_regex)) {
ICEBERG_ASSIGN_OR_RAISE(auto precision,
StringUtils::ParseNumber<int32_t>(match[1].str()));
ICEBERG_ASSIGN_OR_RAISE(auto scale,
StringUtils::ParseNumber<int32_t>(match[2].str()));
return std::make_unique<DecimalType>(precision, scale);
}
return JsonParseError("Invalid decimal type: {}", type_str);
} else {
return JsonParseError("Unknown primitive type: {}", type_str);
}
}
// For complex types like struct, list, and map
ICEBERG_ASSIGN_OR_RAISE(auto type_str, GetJsonValue<std::string>(json, kType));
if (type_str == kStruct) {
return StructTypeFromJson(json);
} else if (type_str == kList) {
return ListTypeFromJson(json);
} else if (type_str == kMap) {
return MapTypeFromJson(json);
} else {
return JsonParseError("Unknown complex type: {}", type_str);
}
}
Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(
auto type, GetJsonValue<nlohmann::json>(json, kType).and_then(TypeFromJson));
ICEBERG_ASSIGN_OR_RAISE(auto field_id, GetJsonValue<int32_t>(json, kId));
ICEBERG_ASSIGN_OR_RAISE(auto name, GetJsonValue<std::string>(json, kName));
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));
return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
!required, doc);
}
Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id_opt,
GetJsonValueOptional<int32_t>(json, kSchemaId));
ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
if (type->type_id() != TypeId::kStruct) [[unlikely]] {
return JsonParseError("Schema must be a struct type, but got {}", SafeDumpJson(json));
}
auto& struct_type = internal::checked_cast<StructType&>(*type);
std::vector<SchemaField> fields;
fields.reserve(struct_type.fields().size());
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
ICEBERG_ASSIGN_OR_RAISE(
auto identifier_field_ids,
GetJsonValueOrDefault<std::vector<int32_t>>(json, kIdentifierFieldIds));
return Schema::Make(std::move(fields), schema_id_opt.value_or(Schema::kInitialSchemaId),
std::move(identifier_field_ids));
}
nlohmann::json ToJson(const PartitionField& partition_field) {
nlohmann::json json;
json[kSourceId] = partition_field.source_id();
json[kFieldId] = partition_field.field_id();
json[kTransform] = std::format("{}", *partition_field.transform());
json[kName] = partition_field.name();
return json;
}
nlohmann::json ToJson(const PartitionSpec& partition_spec) {
nlohmann::json json;
json[kSpecId] = partition_spec.spec_id();
nlohmann::json fields_json = nlohmann::json::array();
for (const auto& field : partition_spec.fields()) {
fields_json.push_back(ToJson(field));
}
json[kFields] = fields_json;
return json;
}
Result<std::string> ToJsonString(const PartitionSpec& partition_spec) {
return ToJsonString(ToJson(partition_spec));
}
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json, bool allow_field_id_missing) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
int32_t field_id;
if (allow_field_id_missing) {
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValueOrDefault<int32_t>(
json, kFieldId, SchemaField::kInvalidFieldId));
} else {
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
}
ICEBERG_ASSIGN_OR_RAISE(
auto transform,
GetJsonValue<std::string>(json, kTransform).and_then(TransformFromString));
ICEBERG_ASSIGN_OR_RAISE(auto name, GetJsonValue<std::string>(json, kName));
return std::make_unique<PartitionField>(source_id, field_id, name,
std::move(transform));
}
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
const std::shared_ptr<Schema>& schema, const nlohmann::json& json,
int32_t default_spec_id) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
std::vector<PartitionField> partition_fields;
for (const auto& field_json : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
partition_fields.push_back(std::move(*partition_field));
}
std::unique_ptr<PartitionSpec> spec;
if (default_spec_id == spec_id) {
ICEBERG_ASSIGN_OR_RAISE(
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
/*allow_missing_fields=*/false));
} else {
ICEBERG_ASSIGN_OR_RAISE(
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
/*allow_missing_fields=*/true));
}
return spec;
}
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
std::vector<PartitionField> partition_fields;
for (const auto& field_json : fields) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
partition_fields.push_back(std::move(*partition_field));
}
return PartitionSpec::Make(spec_id, std::move(partition_fields));
}
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(
auto type,
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
if (type == SnapshotRefType::kBranch) {
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots_to_keep,
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
.max_snapshot_age_ms = max_snapshot_age_ms,
.max_ref_age_ms = max_ref_age_ms});
} else {
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age_ms,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
return std::make_unique<SnapshotRef>(
snapshot_id, SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms});
}
}
Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
auto timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));
ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot_id,
GetJsonValueOptional<int64_t>(json, kParentSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto summary_json,
GetJsonValueOptional<nlohmann::json>(json, kSummary));
std::unordered_map<std::string, std::string> summary;
if (summary_json.has_value()) {
for (const auto& [key, value] : summary_json->items()) {
// if (!kValidSnapshotSummaryFields.contains(key)) {
// return JsonParseError("Invalid snapshot summary field: {}", key);
// }
if (!value.is_string()) {
return JsonParseError("Invalid snapshot summary field value: {}",
SafeDumpJson(value));
}
if (key == SnapshotSummaryFields::kOperation &&
!kValidDataOperation.contains(value.get<std::string>())) {
return JsonParseError("Invalid snapshot operation: {}", SafeDumpJson(value));
}
summary[key] = value.get<std::string>();
}
// If summary is available but operation is missing, set operation to overwrite.
if (!summary.contains(SnapshotSummaryFields::kOperation)) {
summary[SnapshotSummaryFields::kOperation] = DataOperation::kOverwrite;
}
}
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));
return std::make_unique<Snapshot>(
snapshot_id, parent_snapshot_id,
sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms,
manifest_list, std::move(summary), schema_id);
}
nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
nlohmann::json json;
json[kType] = blob_metadata.type;
json[kSnapshotId] = blob_metadata.source_snapshot_id;
json[kSequenceNumber] = blob_metadata.source_snapshot_sequence_number;
json[kFields] = blob_metadata.fields;
if (!blob_metadata.properties.empty()) {
json[kProperties] = blob_metadata.properties;
}
return json;
}
Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) {
BlobMetadata blob_metadata;
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, kType));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_sequence_number,
GetJsonValue<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.fields,
GetJsonValue<std::vector<int32_t>>(json, kFields));
ICEBERG_ASSIGN_OR_RAISE(
blob_metadata.properties,
(GetJsonValueOrDefault<std::unordered_map<std::string, std::string>>(json,
kProperties)));
return blob_metadata;
}
nlohmann::json ToJson(const StatisticsFile& statistics_file) {
nlohmann::json json;
json[kSnapshotId] = statistics_file.snapshot_id;
json[kStatisticsPath] = statistics_file.path;
json[kFileSizeInBytes] = statistics_file.file_size_in_bytes;
json[kFileFooterSizeInBytes] = statistics_file.file_footer_size_in_bytes;
nlohmann::json blob_metadata_array = nlohmann::json::array();
for (const auto& blob_metadata : statistics_file.blob_metadata) {
blob_metadata_array.push_back(ToJson(blob_metadata));
}
json[kBlobMetadata] = blob_metadata_array;
return json;
}
Result<std::unique_ptr<StatisticsFile>> StatisticsFileFromJson(
const nlohmann::json& json) {
auto stats_file = std::make_unique<StatisticsFile>();
ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(stats_file->path,
GetJsonValue<std::string>(json, kStatisticsPath));
ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes,
GetJsonValue<int64_t>(json, kFileSizeInBytes));
ICEBERG_ASSIGN_OR_RAISE(stats_file->file_footer_size_in_bytes,
GetJsonValue<int64_t>(json, kFileFooterSizeInBytes));
ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata_array,
GetJsonValue<nlohmann::json>(json, kBlobMetadata));
for (const auto& blob_json : blob_metadata_array) {
ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json));
stats_file->blob_metadata.push_back(std::move(blob));
}
return stats_file;
}
nlohmann::json ToJson(const PartitionStatisticsFile& partition_statistics_file) {
nlohmann::json json;
json[kSnapshotId] = partition_statistics_file.snapshot_id;
json[kStatisticsPath] = partition_statistics_file.path;
json[kFileSizeInBytes] = partition_statistics_file.file_size_in_bytes;
return json;
}
Result<std::unique_ptr<PartitionStatisticsFile>> PartitionStatisticsFileFromJson(
const nlohmann::json& json) {
auto stats_file = std::make_unique<PartitionStatisticsFile>();
ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(stats_file->path,
GetJsonValue<std::string>(json, kStatisticsPath));
ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes,
GetJsonValue<int64_t>(json, kFileSizeInBytes));
return stats_file;
}
nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry) {
nlohmann::json json;
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot_log_entry.timestamp_ms);
json[kSnapshotId] = snapshot_log_entry.snapshot_id;
return json;
}
Result<SnapshotLogEntry> SnapshotLogEntryFromJson(const nlohmann::json& json) {
SnapshotLogEntry snapshot_log_entry;
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
snapshot_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(snapshot_log_entry.snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
return snapshot_log_entry;
}
nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry) {
nlohmann::json json;
json[kTimestampMs] = UnixMsFromTimePointMs(metadata_log_entry.timestamp_ms);
json[kMetadataFile] = metadata_log_entry.metadata_file;
return json;
}
Result<MetadataLogEntry> MetadataLogEntryFromJson(const nlohmann::json& json) {
MetadataLogEntry metadata_log_entry;
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
metadata_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
ICEBERG_ASSIGN_OR_RAISE(metadata_log_entry.metadata_file,
GetJsonValue<std::string>(json, kMetadataFile));
return metadata_log_entry;
}
nlohmann::json ToJson(const TableMetadata& table_metadata) {
nlohmann::json json;
json[kFormatVersion] = table_metadata.format_version;
json[kTableUuid] = table_metadata.table_uuid;
json[kLocation] = table_metadata.location;
if (table_metadata.format_version > 1) {
json[kLastSequenceNumber] = table_metadata.last_sequence_number;
}
json[kLastUpdatedMs] = UnixMsFromTimePointMs(table_metadata.last_updated_ms);
json[kLastColumnId] = table_metadata.last_column_id;
// for older readers, continue writing the current schema as "schema".
// this is only needed for v1 because support for schemas and current-schema-id
// is required in v2 and later.
if (table_metadata.format_version == 1) {
for (const auto& schema : table_metadata.schemas) {
if (schema->schema_id() == table_metadata.current_schema_id) {
json[kSchema] = ToJson(*schema);
break;
}
}
}
// write the current schema ID and schema list
json[kCurrentSchemaId] = table_metadata.current_schema_id;
json[kSchemas] = ToJsonList(table_metadata.schemas);
// for older readers, continue writing the default spec as "partition-spec"
if (table_metadata.format_version == 1) {
for (const auto& partition_spec : table_metadata.partition_specs) {
if (partition_spec->spec_id() == table_metadata.default_spec_id) {
json[kPartitionSpec] = ToJson(*partition_spec);
break;
}
}
}
// write the default spec ID and spec list
json[kDefaultSpecId] = table_metadata.default_spec_id;
json[kPartitionSpecs] = ToJsonList(table_metadata.partition_specs);
json[kLastPartitionId] = table_metadata.last_partition_id;
// write the default order ID and sort order list
json[kDefaultSortOrderId] = table_metadata.default_sort_order_id;
json[kSortOrders] = ToJsonList(table_metadata.sort_orders);
// write properties map
json[kProperties] = table_metadata.properties.configs();
if (std::ranges::any_of(table_metadata.snapshots, [&](const auto& snapshot) {
return snapshot->snapshot_id == table_metadata.current_snapshot_id;
})) {
json[kCurrentSnapshotId] = table_metadata.current_snapshot_id;
} else {
json[kCurrentSnapshotId] = nlohmann::json::value_t::null;
}
if (table_metadata.format_version >= 3) {
json[kNextRowId] = table_metadata.next_row_id;
}
json[kRefs] = ToJsonMap(table_metadata.refs);
json[kSnapshots] = ToJsonList(table_metadata.snapshots);
json[kStatistics] = ToJsonList(table_metadata.statistics);
json[kPartitionStatistics] = ToJsonList(table_metadata.partition_statistics);
json[kSnapshotLog] = ToJsonList(table_metadata.snapshot_log);
json[kMetadataLog] = ToJsonList(table_metadata.metadata_log);
return json;
}
Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
return ToJsonString(ToJson(table_metadata));
}
namespace {
/// \brief Parse the schemas from the JSON object.
///
/// \param[in] json The JSON object to parse.
/// \param[in] format_version The format version of the table.
/// \param[out] current_schema_id The current schema ID.
/// \param[out] schemas The list of schemas.
///
/// \return The current schema or parse error.
Result<std::shared_ptr<Schema>> ParseSchemas(
const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id,
std::vector<std::shared_ptr<Schema>>& schemas) {
std::shared_ptr<Schema> current_schema;
if (json.contains(kSchemas)) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_array,
GetJsonValue<nlohmann::json>(json, kSchemas));
if (!schema_array.is_array()) {
return JsonParseError("Cannot parse schemas from non-array: {}",
SafeDumpJson(schema_array));
}
ICEBERG_ASSIGN_OR_RAISE(current_schema_id,
GetJsonValue<int32_t>(json, kCurrentSchemaId));
for (const auto& schema_json : schema_array) {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<Schema> schema,
SchemaFromJson(schema_json));
if (schema->schema_id() == current_schema_id) {
current_schema = schema;
}
schemas.push_back(std::move(schema));
}
if (!current_schema) {
return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId,
current_schema_id, SafeDumpJson(schema_array));
}
} else {
if (format_version != 1) {
return JsonParseError("{} must exist in format v{}", kSchemas, format_version);
}
ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
GetJsonValue<nlohmann::json>(json, kSchema));
ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json));
current_schema_id = current_schema->schema_id();
schemas.push_back(current_schema);
}
return current_schema;
}
/// \brief Parse the partition specs from the JSON object.
///
/// \param[in] json The JSON object to parse.
/// \param[in] format_version The format version of the table.
/// \param[in] current_schema The current schema.
/// \param[out] default_spec_id The default partition spec ID.
/// \param[out] partition_specs The list of partition specs.
Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
const std::shared_ptr<Schema>& current_schema,
int32_t& default_spec_id,
std::vector<std::shared_ptr<PartitionSpec>>& partition_specs) {
if (json.contains(kPartitionSpecs)) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_array,
GetJsonValue<nlohmann::json>(json, kPartitionSpecs));
if (!spec_array.is_array()) {
return JsonParseError("Cannot parse partition specs from non-array: {}",
SafeDumpJson(spec_array));
}
ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));
for (const auto& spec_json : spec_array) {
ICEBERG_ASSIGN_OR_RAISE(
auto spec, PartitionSpecFromJson(current_schema, spec_json, default_spec_id));
partition_specs.push_back(std::move(spec));
}
} else {
if (format_version != 1) {
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
format_version);
}
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec_json,
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
if (!partition_spec_json.is_array()) {
return JsonParseError("Cannot parse v1 partition spec from non-array: {}",
SafeDumpJson(partition_spec_json));
}
int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
std::vector<PartitionField> fields;
for (const auto& entry_json : partition_spec_json) {
ICEBERG_ASSIGN_OR_RAISE(
auto field, PartitionFieldFromJson(
entry_json, /*allow_field_id_missing=*/format_version == 1));
int32_t field_id = field->field_id();
if (field_id == SchemaField::kInvalidFieldId) {
// If the field ID is not set, we need to assign a new one
field_id = next_partition_field_id++;
}
fields.emplace_back(field->source_id(), field_id, std::string(field->name()),
std::move(field->transform()));
}
// Create partition spec with schema validation
ICEBERG_ASSIGN_OR_RAISE(
auto spec,
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
std::move(fields), /*allow_missing_fields=*/false));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}
return {};
}
/// \brief Parse the sort orders from the JSON object.
///
/// \param[in] json The JSON object to parse.
/// \param[in] format_version The format version of the table.
/// \param[in] current_schema The current schema.
/// \param[out] default_sort_order_id The default sort order ID.
/// \param[out] sort_orders The list of sort orders.
Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
const std::shared_ptr<Schema>& current_schema,
int32_t& default_sort_order_id,
std::vector<std::shared_ptr<SortOrder>>& sort_orders) {
if (json.contains(kSortOrders)) {
ICEBERG_ASSIGN_OR_RAISE(default_sort_order_id,
GetJsonValue<int32_t>(json, kDefaultSortOrderId));
ICEBERG_ASSIGN_OR_RAISE(auto sort_order_array,
GetJsonValue<nlohmann::json>(json, kSortOrders));
for (const auto& sort_order_json : sort_order_array) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order,
SortOrderFromJson(sort_order_json, current_schema));
sort_orders.push_back(std::move(sort_order));
}
} else {
if (format_version > 1) {
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
}
auto sort_order = SortOrder::Unsorted();
default_sort_order_id = sort_order->order_id();
sort_orders.push_back(std::move(sort_order));
}
return {};
}
} // namespace
Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::json& json) {
if (!json.is_object()) {
return JsonParseError("Cannot parse metadata from a non-object: {}",
SafeDumpJson(json));
}
auto table_metadata = std::make_unique<TableMetadata>();
ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version,
GetJsonValue<int8_t>(json, kFormatVersion));
if (table_metadata->format_version < 1 ||
table_metadata->format_version > TableMetadata::kSupportedTableFormatVersion) {
return JsonParseError("Cannot read unsupported version: {}",
table_metadata->format_version);
}
ICEBERG_ASSIGN_OR_RAISE(table_metadata->table_uuid,
GetJsonValueOrDefault<std::string>(json, kTableUuid));
ICEBERG_ASSIGN_OR_RAISE(table_metadata->location,
GetJsonValue<std::string>(json, kLocation));
if (table_metadata->format_version > 1) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number,
GetJsonValue<int64_t>(json, kLastSequenceNumber));
} else {
table_metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber;
}
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_column_id,
GetJsonValue<int32_t>(json, kLastColumnId));
ICEBERG_ASSIGN_OR_RAISE(
auto current_schema,
ParseSchemas(json, table_metadata->format_version,
table_metadata->current_schema_id, table_metadata->schemas));
ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs(
json, table_metadata->format_version, current_schema,
table_metadata->default_spec_id, table_metadata->partition_specs));
if (json.contains(kLastPartitionId)) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_partition_id,
GetJsonValue<int32_t>(json, kLastPartitionId));
} else {
if (table_metadata->format_version > 1) {
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
table_metadata->format_version);
}
if (table_metadata->partition_specs.empty()) {
table_metadata->last_partition_id =
PartitionSpec::Unpartitioned()->last_assigned_field_id();
} else {
table_metadata->last_partition_id =
std::ranges::max(table_metadata->partition_specs, {}, [](const auto& spec) {
return spec->last_assigned_field_id();
})->last_assigned_field_id();
}
}
ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(
json, table_metadata->format_version, current_schema,
table_metadata->default_sort_order_id, table_metadata->sort_orders));
if (json.contains(kProperties)) {
ICEBERG_ASSIGN_OR_RAISE(auto properties, FromJsonMap(json, kProperties));
table_metadata->properties = TableProperties::FromMap(std::move(properties));
}
// This field is optional, but internally we set this to -1 when not set
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->current_snapshot_id,
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId, kInvalidSnapshotId));
if (table_metadata->format_version >= 3) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
GetJsonValue<int64_t>(json, kNextRowId));
} else {
table_metadata->next_row_id = TableMetadata::kInitialRowId;
}
ICEBERG_ASSIGN_OR_RAISE(auto last_updated_ms,
GetJsonValue<int64_t>(json, kLastUpdatedMs));
table_metadata->last_updated_ms =
TimePointMs{std::chrono::milliseconds(last_updated_ms)};
if (json.contains(kRefs)) {
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->refs,
FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, SnapshotRefFromJson));
} else if (table_metadata->current_snapshot_id != kInvalidSnapshotId) {
table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
.snapshot_id = table_metadata->current_snapshot_id,
.retention = SnapshotRef::Branch{},
});
}
ICEBERG_ASSIGN_OR_RAISE(table_metadata->snapshots,
FromJsonList<Snapshot>(json, kSnapshots, SnapshotFromJson));
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->statistics,
FromJsonList<StatisticsFile>(json, kStatistics, StatisticsFileFromJson));
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->partition_statistics,
FromJsonList<PartitionStatisticsFile>(json, kPartitionStatistics,
PartitionStatisticsFileFromJson));
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->snapshot_log,
FromJsonList<SnapshotLogEntry>(json, kSnapshotLog, SnapshotLogEntryFromJson));
ICEBERG_ASSIGN_OR_RAISE(
table_metadata->metadata_log,
FromJsonList<MetadataLogEntry>(json, kMetadataLog, MetadataLogEntryFromJson));
return table_metadata;
}
Result<nlohmann::json> FromJsonString(const std::string& json_string) {
auto json =
nlohmann::json::parse(json_string, /*cb=*/nullptr, /*allow_exceptions=*/false);
if (json.is_discarded()) [[unlikely]] {
return JsonParseError("Failed to parse JSON string: {}", json_string);
}
return json;
}
Result<std::string> ToJsonString(const nlohmann::json& json) {
try {
return json.dump();
} catch (const std::exception& e) {
return JsonParseError("Failed to serialize to JSON string: {}", e.what());
}
}
nlohmann::json ToJson(const MappedField& field) {
nlohmann::json json;
if (field.field_id.has_value()) {
json[kFieldId] = field.field_id.value();
}
nlohmann::json names = nlohmann::json::array();
for (const auto& name : field.names) {
names.push_back(name);
}
json[kNames] = names;
if (field.nested_mapping != nullptr) {
json[kFields] = ToJson(*field.nested_mapping);
}
return json;
}
Result<MappedField> MappedFieldFromJson(const nlohmann::json& json) {
if (!json.is_object()) [[unlikely]] {
return JsonParseError("Cannot parse non-object mapping field: {}",
SafeDumpJson(json));
}
ICEBERG_ASSIGN_OR_RAISE(std::optional<int32_t> field_id,
GetJsonValueOptional<int32_t>(json, kFieldId));
std::vector<std::string> names;
if (json.contains(kNames)) {
ICEBERG_ASSIGN_OR_RAISE(names, GetJsonValue<std::vector<std::string>>(json, kNames));
}
std::unique_ptr<MappedFields> nested_mapping;
if (json.contains(kFields)) {
ICEBERG_ASSIGN_OR_RAISE(auto fields_json,
GetJsonValue<nlohmann::json>(json, kFields));
ICEBERG_ASSIGN_OR_RAISE(nested_mapping, MappedFieldsFromJson(fields_json));
}
return MappedField{.names = {names.cbegin(), names.cend()},
.field_id = field_id,
.nested_mapping = std::move(nested_mapping)};
}
nlohmann::json ToJson(const MappedFields& mapped_fields) {
nlohmann::json array = nlohmann::json::array();
for (const auto& field : mapped_fields.fields()) {
array.push_back(ToJson(field));
}
return array;
}
Result<std::unique_ptr<MappedFields>> MappedFieldsFromJson(const nlohmann::json& json) {
if (!json.is_array()) [[unlikely]] {
return JsonParseError("Cannot parse non-array mapping fields: {}",
SafeDumpJson(json));
}
std::vector<MappedField> fields;
for (const auto& field_json : json) {
ICEBERG_ASSIGN_OR_RAISE(auto field, MappedFieldFromJson(field_json));
fields.push_back(std::move(field));
}
return MappedFields::Make(std::move(fields));
}
nlohmann::json ToJson(const NameMapping& name_mapping) {
return ToJson(name_mapping.AsMappedFields());
}
Result<std::unique_ptr<NameMapping>> NameMappingFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto mapped_fields, MappedFieldsFromJson(json));
return NameMapping::Make(std::move(mapped_fields));
}
nlohmann::json ToJson(const TableIdentifier& identifier) {
nlohmann::json json;
json[kNamespace] = identifier.ns.levels;
json[kName] = identifier.name;
return json;
}
Result<TableIdentifier> TableIdentifierFromJson(const nlohmann::json& json) {
TableIdentifier identifier;
ICEBERG_ASSIGN_OR_RAISE(
identifier.ns.levels,
GetJsonValueOrDefault<std::vector<std::string>>(json, kNamespace));
ICEBERG_ASSIGN_OR_RAISE(identifier.name, GetJsonValue<std::string>(json, kName));
return identifier;
}
nlohmann::json ToJson(const Namespace& ns) { return ns.levels; }
Result<Namespace> NamespaceFromJson(const nlohmann::json& json) {
if (!json.is_array()) [[unlikely]] {
return JsonParseError("Cannot parse namespace from non-array:{}", SafeDumpJson(json));
}
Namespace ns;
ICEBERG_ASSIGN_OR_RAISE(ns.levels, GetTypedJsonValue<std::vector<std::string>>(json));
return ns;
}
nlohmann::json ToJson(const TableUpdate& update) {
nlohmann::json json;
switch (update.kind()) {
case TableUpdate::Kind::kAssignUUID: {
const auto& u = internal::checked_cast<const table::AssignUUID&>(update);
json[kAction] = kActionAssignUUID;
json[kUUID] = u.uuid();
break;
}
case TableUpdate::Kind::kUpgradeFormatVersion: {
const auto& u = internal::checked_cast<const table::UpgradeFormatVersion&>(update);
json[kAction] = kActionUpgradeFormatVersion;
json[kFormatVersion] = u.format_version();
break;
}
case TableUpdate::Kind::kAddSchema: {
const auto& u = internal::checked_cast<const table::AddSchema&>(update);
json[kAction] = kActionAddSchema;
if (u.schema()) {
json[kSchema] = ToJson(*u.schema());
} else {
json[kSchema] = nlohmann::json::value_t::null;
}
json[kLastColumnId] = u.last_column_id();
break;
}
case TableUpdate::Kind::kSetCurrentSchema: {
const auto& u = internal::checked_cast<const table::SetCurrentSchema&>(update);
json[kAction] = kActionSetCurrentSchema;
json[kSchemaId] = u.schema_id();
break;
}
case TableUpdate::Kind::kAddPartitionSpec: {
const auto& u = internal::checked_cast<const table::AddPartitionSpec&>(update);
json[kAction] = kActionAddPartitionSpec;
if (u.spec()) {
json[kSpec] = ToJson(*u.spec());
} else {
json[kSpec] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kSetDefaultPartitionSpec: {
const auto& u =
internal::checked_cast<const table::SetDefaultPartitionSpec&>(update);
json[kAction] = kActionSetDefaultPartitionSpec;
json[kSpecId] = u.spec_id();
break;
}
case TableUpdate::Kind::kRemovePartitionSpecs: {
const auto& u = internal::checked_cast<const table::RemovePartitionSpecs&>(update);
json[kAction] = kActionRemovePartitionSpecs;
json[kSpecIds] = u.spec_ids();
break;
}
case TableUpdate::Kind::kRemoveSchemas: {
const auto& u = internal::checked_cast<const table::RemoveSchemas&>(update);
json[kAction] = kActionRemoveSchemas;
json[kSchemaIds] = u.schema_ids();
break;
}
case TableUpdate::Kind::kAddSortOrder: {
const auto& u = internal::checked_cast<const table::AddSortOrder&>(update);
json[kAction] = kActionAddSortOrder;
if (u.sort_order()) {
json[kSortOrder] = ToJson(*u.sort_order());
} else {
json[kSortOrder] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kSetDefaultSortOrder: {
const auto& u = internal::checked_cast<const table::SetDefaultSortOrder&>(update);
json[kAction] = kActionSetDefaultSortOrder;
json[kSortOrderId] = u.sort_order_id();
break;
}
case TableUpdate::Kind::kAddSnapshot: {
const auto& u = internal::checked_cast<const table::AddSnapshot&>(update);
json[kAction] = kActionAddSnapshot;
if (u.snapshot()) {
json[kSnapshot] = ToJson(*u.snapshot());
} else {
json[kSnapshot] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemoveSnapshots: {
const auto& u = internal::checked_cast<const table::RemoveSnapshots&>(update);
json[kAction] = kActionRemoveSnapshots;
json[kSnapshotIds] = u.snapshot_ids();
break;
}
case TableUpdate::Kind::kRemoveSnapshotRef: {
const auto& u = internal::checked_cast<const table::RemoveSnapshotRef&>(update);
json[kAction] = kActionRemoveSnapshotRef;
json[kRefName] = u.ref_name();
break;
}
case TableUpdate::Kind::kSetSnapshotRef: {
const auto& u = internal::checked_cast<const table::SetSnapshotRef&>(update);
json[kAction] = kActionSetSnapshotRef;
json[kRefName] = u.ref_name();
json[kSnapshotId] = u.snapshot_id();
json[kType] = ToString(u.type());
SetOptionalField(json, kMinSnapshotsToKeep, u.min_snapshots_to_keep());
SetOptionalField(json, kMaxSnapshotAgeMs, u.max_snapshot_age_ms());
SetOptionalField(json, kMaxRefAgeMs, u.max_ref_age_ms());
break;
}
case TableUpdate::Kind::kSetProperties: {
const auto& u = internal::checked_cast<const table::SetProperties&>(update);
json[kAction] = kActionSetProperties;
json[kUpdates] = u.updated();
break;
}
case TableUpdate::Kind::kRemoveProperties: {
const auto& u = internal::checked_cast<const table::RemoveProperties&>(update);
json[kAction] = kActionRemoveProperties;
json[kRemovals] = std::vector<std::string>(u.removed().begin(), u.removed().end());
break;
}
case TableUpdate::Kind::kSetLocation: {
const auto& u = internal::checked_cast<const table::SetLocation&>(update);
json[kAction] = kActionSetLocation;
json[kLocation] = u.location();
break;
}
case TableUpdate::Kind::kSetStatistics: {
const auto& u = internal::checked_cast<const table::SetStatistics&>(update);
json[kAction] = kActionSetStatistics;
if (u.statistics_file()) {
json[kStatistics] = ToJson(*u.statistics_file());
} else {
json[kStatistics] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemoveStatistics: {
const auto& u = internal::checked_cast<const table::RemoveStatistics&>(update);
json[kAction] = kActionRemoveStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
case TableUpdate::Kind::kSetPartitionStatistics: {
const auto& u =
internal::checked_cast<const table::SetPartitionStatistics&>(update);
json[kAction] = kActionSetPartitionStatistics;
if (u.partition_statistics_file()) {
json[kPartitionStatistics] = ToJson(*u.partition_statistics_file());
} else {
json[kPartitionStatistics] = nlohmann::json::value_t::null;
}
break;
}
case TableUpdate::Kind::kRemovePartitionStatistics: {
const auto& u =
internal::checked_cast<const table::RemovePartitionStatistics&>(update);
json[kAction] = kActionRemovePartitionStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
}
return json;
}
nlohmann::json ToJson(const TableRequirement& requirement) {
nlohmann::json json;
switch (requirement.kind()) {
case TableRequirement::Kind::kAssertDoesNotExist:
json[kType] = kRequirementAssertDoesNotExist;
break;
case TableRequirement::Kind::kAssertUUID: {
const auto& r = internal::checked_cast<const table::AssertUUID&>(requirement);
json[kType] = kRequirementAssertUUID;
json[kUUID] = r.uuid();
break;
}
case TableRequirement::Kind::kAssertRefSnapshotID: {
const auto& r =
internal::checked_cast<const table::AssertRefSnapshotID&>(requirement);
json[kType] = kRequirementAssertRefSnapshotID;
json[kRefName] = r.ref_name();
if (r.snapshot_id().has_value()) {
json[kSnapshotId] = r.snapshot_id().value();
} else {
json[kSnapshotId] = nlohmann::json::value_t::null;
}
break;
}
case TableRequirement::Kind::kAssertLastAssignedFieldId: {
const auto& r =
internal::checked_cast<const table::AssertLastAssignedFieldId&>(requirement);
json[kType] = kRequirementAssertLastAssignedFieldId;
json[kLastAssignedFieldId] = r.last_assigned_field_id();
break;
}
case TableRequirement::Kind::kAssertCurrentSchemaID: {
const auto& r =
internal::checked_cast<const table::AssertCurrentSchemaID&>(requirement);
json[kType] = kRequirementAssertCurrentSchemaID;
json[kCurrentSchemaId] = r.schema_id();
break;
}
case TableRequirement::Kind::kAssertLastAssignedPartitionId: {
const auto& r = internal::checked_cast<const table::AssertLastAssignedPartitionId&>(
requirement);
json[kType] = kRequirementAssertLastAssignedPartitionId;
json[kLastAssignedPartitionId] = r.last_assigned_partition_id();
break;
}
case TableRequirement::Kind::kAssertDefaultSpecID: {
const auto& r =
internal::checked_cast<const table::AssertDefaultSpecID&>(requirement);
json[kType] = kRequirementAssertDefaultSpecID;
json[kDefaultSpecId] = r.spec_id();
break;
}
case TableRequirement::Kind::kAssertDefaultSortOrderID: {
const auto& r =
internal::checked_cast<const table::AssertDefaultSortOrderID&>(requirement);
json[kType] = kRequirementAssertDefaultSortOrderID;
json[kDefaultSortOrderId] = r.sort_order_id();
break;
}
}
return json;
}
Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto action, GetJsonValue<std::string>(json, kAction));
if (action == kActionAssignUUID) {
ICEBERG_ASSIGN_OR_RAISE(auto uuid, GetJsonValue<std::string>(json, kUUID));
return std::make_unique<table::AssignUUID>(std::move(uuid));
}
if (action == kActionUpgradeFormatVersion) {
ICEBERG_ASSIGN_OR_RAISE(auto format_version,
GetJsonValue<int8_t>(json, kFormatVersion));
return std::make_unique<table::UpgradeFormatVersion>(format_version);
}
if (action == kActionAddSchema) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
GetJsonValue<nlohmann::json>(json, kSchema));
ICEBERG_ASSIGN_OR_RAISE(auto parsed_schema, SchemaFromJson(schema_json));
ICEBERG_ASSIGN_OR_RAISE(auto last_column_id,
GetJsonValue<int32_t>(json, kLastColumnId));
return std::make_unique<table::AddSchema>(std::move(parsed_schema), last_column_id);
}
if (action == kActionSetCurrentSchema) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValue<int32_t>(json, kSchemaId));
return std::make_unique<table::SetCurrentSchema>(schema_id);
}
if (action == kActionAddPartitionSpec) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_json, GetJsonValue<nlohmann::json>(json, kSpec));
ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecFromJson(spec_json));
return std::make_unique<table::AddPartitionSpec>(std::move(spec));
}
if (action == kActionSetDefaultPartitionSpec) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
return std::make_unique<table::SetDefaultPartitionSpec>(spec_id);
}
if (action == kActionRemovePartitionSpecs) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_ids,
GetJsonValue<std::vector<int32_t>>(json, kSpecIds));
return std::make_unique<table::RemovePartitionSpecs>(std::move(spec_ids));
}
if (action == kActionRemoveSchemas) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_ids_vec,
GetJsonValue<std::vector<int32_t>>(json, kSchemaIds));
std::unordered_set<int32_t> schema_ids(schema_ids_vec.begin(), schema_ids_vec.end());
return std::make_unique<table::RemoveSchemas>(std::move(schema_ids));
}
if (action == kActionAddSortOrder) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order_json,
GetJsonValue<nlohmann::json>(json, kSortOrder));
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, SortOrderFromJson(sort_order_json));
return std::make_unique<table::AddSortOrder>(std::move(sort_order));
}
if (action == kActionSetDefaultSortOrder) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order_id,
GetJsonValue<int32_t>(json, kSortOrderId));
return std::make_unique<table::SetDefaultSortOrder>(sort_order_id);
}
if (action == kActionAddSnapshot) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_json,
GetJsonValue<nlohmann::json>(json, kSnapshot));
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, SnapshotFromJson(snapshot_json));
return std::make_unique<table::AddSnapshot>(std::move(snapshot));
}
if (action == kActionRemoveSnapshots) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_ids,
GetJsonValue<std::vector<int64_t>>(json, kSnapshotIds));
return std::make_unique<table::RemoveSnapshots>(std::move(snapshot_ids));
}
if (action == kActionRemoveSnapshotRef) {
ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, kRefName));
return std::make_unique<table::RemoveSnapshotRef>(std::move(ref_name));
}
if (action == kActionSetSnapshotRef) {
ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, kRefName));
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(
auto type,
GetJsonValue<std::string>(json, kType).and_then(SnapshotRefTypeFromString));
ICEBERG_ASSIGN_OR_RAISE(auto min_snapshots,
GetJsonValueOptional<int32_t>(json, kMinSnapshotsToKeep));
ICEBERG_ASSIGN_OR_RAISE(auto max_snapshot_age,
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
if (type == SnapshotRefType::kTag) {
ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *tag);
} else {
ICEBERG_CHECK(type == SnapshotRefType::kBranch,
"Expected branch type for snapshot ref");
ICEBERG_ASSIGN_OR_RAISE(auto branch,
SnapshotRef::MakeBranch(snapshot_id, min_snapshots,
max_snapshot_age, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *branch);
}
}
if (action == kActionSetProperties) {
using StringMap = std::unordered_map<std::string, std::string>;
ICEBERG_ASSIGN_OR_RAISE(auto updates, GetJsonValue<StringMap>(json, kUpdates));
return std::make_unique<table::SetProperties>(std::move(updates));
}
if (action == kActionRemoveProperties) {
ICEBERG_ASSIGN_OR_RAISE(auto removals_vec,
GetJsonValue<std::vector<std::string>>(json, kRemovals));
std::unordered_set<std::string> removals(
std::make_move_iterator(removals_vec.begin()),
std::make_move_iterator(removals_vec.end()));
return std::make_unique<table::RemoveProperties>(std::move(removals));
}
if (action == kActionSetLocation) {
ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue<std::string>(json, kLocation));
return std::make_unique<table::SetLocation>(std::move(location));
}
if (action == kActionSetStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto statistics_json,
GetJsonValue<nlohmann::json>(json, kStatistics));
ICEBERG_ASSIGN_OR_RAISE(auto statistics_file,
StatisticsFileFromJson(statistics_json));
return std::make_unique<table::SetStatistics>(std::move(statistics_file));
}
if (action == kActionRemoveStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemoveStatistics>(snapshot_id);
}
if (action == kActionSetPartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_json,
GetJsonValue<nlohmann::json>(json, kPartitionStatistics));
ICEBERG_ASSIGN_OR_RAISE(auto partition_statistics_file,
PartitionStatisticsFileFromJson(partition_statistics_json));
return std::make_unique<table::SetPartitionStatistics>(
std::move(partition_statistics_file));
}
if (action == kActionRemovePartitionStatistics) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
return std::make_unique<table::RemovePartitionStatistics>(snapshot_id);
}
return JsonParseError("Unknown table update action: {}", action);
}
Result<std::unique_ptr<TableRequirement>> TableRequirementFromJson(
const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto type, GetJsonValue<std::string>(json, kType));
if (type == kRequirementAssertDoesNotExist) {
return std::make_unique<table::AssertDoesNotExist>();
}
if (type == kRequirementAssertUUID) {
ICEBERG_ASSIGN_OR_RAISE(auto uuid, GetJsonValue<std::string>(json, kUUID));
return std::make_unique<table::AssertUUID>(std::move(uuid));
}
if (type == kRequirementAssertRefSnapshotID) {
ICEBERG_ASSIGN_OR_RAISE(auto ref_name, GetJsonValue<std::string>(json, kRefName));
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id_opt,
GetJsonValueOptional<int64_t>(json, kSnapshotId));
return std::make_unique<table::AssertRefSnapshotID>(std::move(ref_name),
snapshot_id_opt);
}
if (type == kRequirementAssertLastAssignedFieldId) {
ICEBERG_ASSIGN_OR_RAISE(auto last_assigned_field_id,
GetJsonValue<int32_t>(json, kLastAssignedFieldId));
return std::make_unique<table::AssertLastAssignedFieldId>(last_assigned_field_id);
}
if (type == kRequirementAssertCurrentSchemaID) {
ICEBERG_ASSIGN_OR_RAISE(auto schema_id,
GetJsonValue<int32_t>(json, kCurrentSchemaId));
return std::make_unique<table::AssertCurrentSchemaID>(schema_id);
}
if (type == kRequirementAssertLastAssignedPartitionId) {
ICEBERG_ASSIGN_OR_RAISE(auto last_assigned_partition_id,
GetJsonValue<int32_t>(json, kLastAssignedPartitionId));
return std::make_unique<table::AssertLastAssignedPartitionId>(
last_assigned_partition_id);
}
if (type == kRequirementAssertDefaultSpecID) {
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));
return std::make_unique<table::AssertDefaultSpecID>(spec_id);
}
if (type == kRequirementAssertDefaultSortOrderID) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order_id,
GetJsonValue<int32_t>(json, kDefaultSortOrderId));
return std::make_unique<table::AssertDefaultSortOrderID>(sort_order_id);
}
return JsonParseError("Unknown table requirement type: {}", type);
}
} // namespace iceberg