blob: 5aeaaa58a19f382bc3ac318014cdd4ffa28d470a [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "paimon/testing/utils/data_generator.h"
#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstdlib>
#include <map>
#include <utility>
#include "arrow/api.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "fmt/format.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/data/binary_string.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/core/utils/file_store_path_factory.h"
namespace arrow {
class Array;
class ArrayBuilder;
} // namespace arrow
namespace paimon {
class MemoryPool;
} // namespace paimon
namespace paimon::test {
DataGenerator::DataGenerator(const std::shared_ptr<TableSchema>& table_schema,
const std::shared_ptr<MemoryPool>& memory_pool)
: table_schema_(table_schema), memory_pool_(memory_pool) {
assert(table_schema->Id() == 0);
}
Status DataGenerator::WriteBinaryRow(const BinaryRow& src_row, int32_t src_field_id,
const std::shared_ptr<arrow::DataType>& src_type,
int32_t target_field_id, BinaryRowWriter* target_row_writer) {
arrow::Type::type type_id = src_type->id();
switch (type_id) {
case arrow::Type::type::BOOL: {
target_row_writer->WriteBoolean(target_field_id, src_row.GetBoolean(src_field_id));
break;
}
case arrow::Type::type::INT8: {
target_row_writer->WriteByte(target_field_id, src_row.GetByte(src_field_id));
break;
}
case arrow::Type::type::INT16: {
target_row_writer->WriteShort(target_field_id, src_row.GetShort(src_field_id));
break;
}
case arrow::Type::type::INT32: {
target_row_writer->WriteInt(target_field_id, src_row.GetInt(src_field_id));
break;
}
case arrow::Type::type::INT64: {
target_row_writer->WriteLong(target_field_id, src_row.GetLong(src_field_id));
break;
}
case arrow::Type::type::FLOAT: {
target_row_writer->WriteFloat(target_field_id, src_row.GetFloat(src_field_id));
break;
}
case arrow::Type::type::DOUBLE: {
target_row_writer->WriteDouble(target_field_id, src_row.GetDouble(src_field_id));
break;
}
case arrow::Type::type::STRING: {
target_row_writer->WriteString(target_field_id, src_row.GetString(src_field_id));
break;
}
default:
return Status::Invalid(
fmt::format("type {} not support in write partial row", src_type->ToString()));
}
return Status::OK();
}
Result<BinaryRow> DataGenerator::ExtractPartialRow(const BinaryRow& binary_row,
const std::vector<DataField>& partition_fields) {
BinaryRow partial_row(static_cast<int32_t>(partition_fields.size()));
BinaryRowWriter writer(&partial_row, /*initial_size=*/0, memory_pool_.get());
for (size_t field_idx = 0; field_idx < partition_fields.size(); field_idx++) {
int32_t id = partition_fields[field_idx].Id();
auto type = partition_fields[field_idx].Type();
PAIMON_RETURN_NOT_OK(WriteBinaryRow(binary_row, id, type, field_idx, &writer));
}
writer.Complete();
return partial_row;
}
Status DataGenerator::AppendValue(const BinaryRow& row, int32_t field_id,
const std::shared_ptr<arrow::DataType>& type,
arrow::StructBuilder* struct_builder) {
arrow::Type::type type_id = type->id();
switch (type_id) {
case arrow::Type::type::BOOL: {
auto builder =
static_cast<arrow::BooleanBuilder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetBoolean(field_id)));
break;
}
case arrow::Type::type::INT8: {
auto builder =
static_cast<arrow::Int8Builder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetByte(field_id)));
break;
}
case arrow::Type::type::INT16: {
auto builder =
static_cast<arrow::Int16Builder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetShort(field_id)));
break;
}
case arrow::Type::type::INT32: {
auto builder =
static_cast<arrow::Int32Builder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetInt(field_id)));
break;
}
case arrow::Type::type::INT64: {
auto builder =
static_cast<arrow::Int64Builder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetLong(field_id)));
break;
}
case arrow::Type::type::FLOAT: {
auto builder =
static_cast<arrow::FloatBuilder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetFloat(field_id)));
break;
}
case arrow::Type::type::DOUBLE: {
auto builder =
static_cast<arrow::DoubleBuilder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetDouble(field_id)));
break;
}
case arrow::Type::type::STRING: {
auto builder =
static_cast<arrow::StringBuilder*>(struct_builder->field_builder(field_id));
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetString(field_id).ToString()));
break;
}
default:
return Status::Invalid(
fmt::format("type {} not support in append value", type->ToString()));
}
return Status::OK();
}
Result<std::shared_ptr<arrow::StructBuilder>> DataGenerator::MakeStructBuilder(
const std::vector<DataField>& fields) {
auto data_type = DataField::ConvertDataFieldsToArrowStructType(fields);
std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
for (const auto& field : fields) {
auto type_id = field.Type()->id();
switch (type_id) {
case arrow::Type::type::BOOL: {
field_builders.push_back(std::make_shared<arrow::BooleanBuilder>());
break;
}
case arrow::Type::type::INT8: {
field_builders.push_back(std::make_shared<arrow::Int8Builder>());
break;
}
case arrow::Type::type::INT16: {
field_builders.push_back(std::make_shared<arrow::Int16Builder>());
break;
}
case arrow::Type::type::INT32: {
field_builders.push_back(std::make_shared<arrow::Int32Builder>());
break;
}
case arrow::Type::type::INT64: {
field_builders.push_back(std::make_shared<arrow::Int64Builder>());
break;
}
case arrow::Type::type::FLOAT: {
field_builders.push_back(std::make_shared<arrow::FloatBuilder>());
break;
}
case arrow::Type::type::DOUBLE: {
field_builders.push_back(std::make_shared<arrow::DoubleBuilder>());
break;
}
case arrow::Type::type::STRING: {
field_builders.push_back(std::make_shared<arrow::StringBuilder>());
break;
}
default:
return Status::Invalid(fmt::format("type {} not support in make struct builder",
field.Type()->ToString()));
}
}
return std::make_shared<arrow::StructBuilder>(data_type, arrow::default_memory_pool(),
field_builders);
}
Result<std::vector<std::unique_ptr<RecordBatch>>> DataGenerator::SplitArrayByPartitionAndBucket(
const std::vector<BinaryRow>& binary_rows) {
auto fields = table_schema_->Fields();
auto partition_keys = table_schema_->PartitionKeys();
PAIMON_ASSIGN_OR_RAISE(auto partition_fields, table_schema_->GetFields(partition_keys));
PAIMON_ASSIGN_OR_RAISE(auto bucket_fields,
table_schema_->GetFields(table_schema_->BucketKeys()));
int32_t num_buckets = table_schema_->NumBuckets();
// map: {partition_map, bucket_id} -> arrow::StructBuilder
std::map<std::pair<std::map<std::string, std::string>, int32_t>,
std::shared_ptr<arrow::StructBuilder>>
struct_builder_holder;
std::map<std::pair<std::map<std::string, std::string>, int32_t>,
std::vector<RecordBatch::RowKind>>
row_kinds_holder;
auto schema = DataField::ConvertDataFieldsToArrowSchema(fields);
PAIMON_ASSIGN_OR_RAISE(
auto path_factory,
FileStorePathFactory::Create(
/*root=*/"/tmp", schema, partition_keys,
/*default_part_value=*/"__DEFAULT_PARTITION__",
/*identifier=*/"orc", /*data_file_prefix=*/"data-",
/*legacy_partition_name_enabled=*/true, /*external_paths=*/std::vector<std::string>(),
/*global_index_external_path=*/std::nullopt,
/*index_file_in_data_file_dir=*/false, memory_pool_));
for (const auto& binary_row : binary_rows) {
PAIMON_ASSIGN_OR_RAISE(BinaryRow partition_row,
ExtractPartialRow(binary_row, partition_fields));
std::vector<std::pair<std::string, std::string>> part_values;
PAIMON_ASSIGN_OR_RAISE(part_values, path_factory->GeneratePartitionVector(partition_row));
std::map<std::string, std::string> partition_map;
for (const auto& part_value : part_values) {
partition_map[part_value.first] = part_value.second;
}
PAIMON_ASSIGN_OR_RAISE(BinaryRow bucket_row, ExtractPartialRow(binary_row, bucket_fields));
int32_t bucket_id = std::abs(bucket_row.HashCode() % num_buckets);
auto struct_builder_iter = struct_builder_holder.find({partition_map, bucket_id});
if (struct_builder_iter == struct_builder_holder.end()) {
PAIMON_ASSIGN_OR_RAISE(auto struct_builder, MakeStructBuilder(fields));
auto insert_result = struct_builder_holder.emplace(
std::make_pair(partition_map, bucket_id), struct_builder);
if (!insert_result.second) {
return Status::Invalid("insert element to struct_builder_holder failed");
}
struct_builder_iter = insert_result.first;
}
auto struct_builder = struct_builder_iter->second;
PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->Append());
for (size_t i = 0; i < fields.size(); i++) {
PAIMON_RETURN_NOT_OK(
AppendValue(binary_row, i, fields[i].Type(), struct_builder.get()));
}
auto row_kinds_iter = row_kinds_holder.find({partition_map, bucket_id});
if (row_kinds_iter == row_kinds_holder.end()) {
auto insert_result = row_kinds_holder.emplace(std::make_pair(partition_map, bucket_id),
std::vector<RecordBatch::RowKind>());
if (!insert_result.second) {
return Status::Invalid("insert element to row_kinds_holder failed");
}
row_kinds_iter = insert_result.first;
}
auto& row_kinds = row_kinds_iter->second;
PAIMON_ASSIGN_OR_RAISE(auto row_kind, binary_row.GetRowKind());
row_kinds.push_back(static_cast<RecordBatch::RowKind>(row_kind->ToByteValue()));
}
std::vector<std::unique_ptr<RecordBatch>> batches;
for (const auto& [identifier, struct_builder] : struct_builder_holder) {
std::shared_ptr<arrow::Array> array;
PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->Finish(&array));
auto& row_kinds = row_kinds_holder[identifier];
ArrowArray arrow_array;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array));
RecordBatchBuilder batch_builder(&arrow_array);
PAIMON_ASSIGN_OR_RAISE(auto record_batch, batch_builder.SetPartition(identifier.first)
.SetBucket(identifier.second)
.SetRowKinds(row_kinds)
.Finish());
batches.push_back(std::move(record_batch));
}
return batches;
}
} // namespace paimon::test