blob: b9b0685eb62e87954b25a312f0083b0986bed2d3 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/utils/binary_row_partition_computer.h"
#include <algorithm>
#include <cstddef>
#include "arrow/type.h"
#include "fmt/format.h"
#include "fmt/ranges.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/binary_row_writer.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/macros.h"
#include "paimon/result.h"
#include "paimon/status.h"
namespace paimon {
class MemoryPool;
BinaryRowPartitionComputer::BinaryRowPartitionComputer(
const std::vector<std::string>& partition_keys, const std::shared_ptr<arrow::Schema>& schema,
const std::string& default_part_value,
const std::vector<PartitionConverter>& partition_converters,
const std::shared_ptr<MemoryPool>& memory_pool)
: memory_pool_(memory_pool),
partition_keys_(partition_keys),
schema_(schema),
default_part_value_(default_part_value),
partition_converters_(partition_converters) {}
Result<std::unique_ptr<BinaryRowPartitionComputer>> BinaryRowPartitionComputer::Create(
const std::vector<std::string>& partition_keys, const std::shared_ptr<arrow::Schema>& schema,
const std::string& default_part_value, bool legacy_partition_name_enabled,
const std::shared_ptr<MemoryPool>& memory_pool) {
if (PAIMON_UNLIKELY(schema == nullptr)) {
return Status::Invalid(
"create binary row partition computer failed, schema is null pointer.");
}
if (PAIMON_UNLIKELY(memory_pool == nullptr)) {
return Status::Invalid(
"create binary row partition computer failed, memory pool is null pointer.");
}
std::vector<PartitionConverter> partition_converters;
for (const auto& partition_key : partition_keys) {
PAIMON_ASSIGN_OR_RAISE(arrow::Type::type type_id,
GetTypeFromArrowSchema(schema, partition_key));
PAIMON_ASSIGN_OR_RAISE(
DataConverterUtils::StrToBinaryRowConverter converter,
DataConverterUtils::CreateDataToBinaryRowConverter(type_id, memory_pool.get()));
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter reconverter,
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
type_id, legacy_partition_name_enabled));
partition_converters.emplace_back(partition_key, std::move(converter),
std::move(reconverter));
}
return std::unique_ptr<BinaryRowPartitionComputer>(new BinaryRowPartitionComputer(
partition_keys, schema, default_part_value, partition_converters, memory_pool));
}
Result<BinaryRow> BinaryRowPartitionComputer::ToBinaryRow(
const std::map<std::string, std::string>& partition) const {
BinaryRow binary_row(partition_converters_.size());
BinaryRowWriter writer(&binary_row, /*initial_size=*/0, memory_pool_.get());
for (size_t field_idx = 0; field_idx < partition_converters_.size(); field_idx++) {
const auto& partition_extractor = partition_converters_[field_idx];
const auto& partition_key = partition_extractor.partition_key;
const auto& to_binary_row = partition_extractor.converter;
auto input_iter = partition.find(partition_key);
if (input_iter == partition.end()) {
return Status::Invalid(
fmt::format("can not find partition key '{}' in input partition '{}'",
partition_key, partition));
}
const auto& value_str = input_iter->second;
if (value_str == default_part_value_) {
// TODO(yonghao.fyh): when support decimal/ timestamp in partition, use
// WriteTimestamp(null) for non compact precision
writer.SetNullAt(field_idx);
} else {
PAIMON_RETURN_NOT_OK(to_binary_row(value_str, field_idx, &writer));
}
}
writer.Complete();
return binary_row;
}
Result<std::vector<std::pair<std::string, std::string>>>
BinaryRowPartitionComputer::GeneratePartitionVector(const BinaryRow& partition) const {
if (static_cast<size_t>(partition.GetFieldCount()) != partition_converters_.size()) {
return Status::Invalid(fmt::format(
"partition binary row field count {} not match with partition converter size {}",
partition.GetFieldCount(), partition_converters_.size()));
}
std::vector<std::pair<std::string, std::string>> result;
for (size_t field_idx = 0; field_idx < partition_converters_.size(); field_idx++) {
const auto& partition_extractor = partition_converters_[field_idx];
const auto& partition_key = partition_extractor.partition_key;
const auto& to_str = partition_extractor.reconverter;
if (partition.IsNullAt(field_idx)) {
result.emplace_back(partition_key, default_part_value_);
} else {
PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str, to_str(partition, field_idx));
if (StringUtils::IsNullOrWhitespaceOnly(partition_field_str)) {
partition_field_str = default_part_value_;
}
result.emplace_back(partition_key, partition_field_str);
}
}
return result;
}
Result<arrow::Type::type> BinaryRowPartitionComputer::GetTypeFromArrowSchema(
const std::shared_ptr<arrow::Schema>& schema, const std::string& field_name) {
auto field = schema->GetFieldByName(field_name);
if (field == nullptr) {
return Status::Invalid(
fmt::format("field {} not in schema {}", field_name, schema->ToString()));
}
return field->type()->id();
}
Result<std::string> BinaryRowPartitionComputer::PartToSimpleString(
const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow& partition,
const std::string& delimiter, int32_t max_length) {
std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> partition_converters;
partition_converters.reserve(partition_type->num_fields());
for (const auto& field : partition_type->fields()) {
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter converter,
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
field->type()->id(), /*legacy_partition_name_enabled=*/true));
partition_converters.emplace_back(converter);
}
std::vector<std::string> partition_vec;
partition_vec.reserve(partition_converters.size());
for (size_t field_idx = 0; field_idx < partition_converters.size(); field_idx++) {
const auto& to_str = partition_converters[field_idx];
if (partition.IsNullAt(field_idx)) {
partition_vec.push_back("null");
} else {
PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str, to_str(partition, field_idx));
partition_vec.push_back(partition_field_str);
}
}
return fmt::format("{}", fmt::join(partition_vec, delimiter)).substr(0, max_length);
}
} // namespace paimon