blob: 70582b5decd310003bf3ea841487d6a7864309c1 [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "arrow/c/bridge.h"
#include "arrow/ipc/api.h"
#include "paimon/api.h"
#include "paimon/catalog/catalog.h"
#include "paimon/commit_context.h"
#include "paimon/common/data/blob_descriptor.h"
#include "paimon/common/data/blob_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/core/operation/append_only_file_store_write.h"
#include "paimon/core/operation/file_store_commit_impl.h"
#include "paimon/core/schema/schema_manager.h"
#include "paimon/core/table/sink/commit_message_impl.h"
#include "paimon/core/utils/file_store_path_factory.h"
#include "paimon/core/utils/snapshot_manager.h"
#include "paimon/data/blob.h"
#include "paimon/file_store_commit.h"
#include "paimon/file_store_write.h"
#include "paimon/fs/file_system_factory.h"
#include "paimon/record_batch.h"
#include "paimon/result.h"
#include "paimon/table/source/startup_mode.h"
#include "paimon/table/source/table_scan.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
#include "paimon/write_context.h"
namespace paimon::test {
class TestHelper {
public:
static Result<std::unique_ptr<TestHelper>> Create(
const std::string& root_path, const std::shared_ptr<arrow::Schema>& schema,
const std::vector<std::string>& partition_keys,
const std::vector<std::string>& primary_keys,
const std::map<std::string, std::string>& options, bool is_streaming_mode) {
// only for test && only check the key
auto new_options = options;
new_options["enable-object-store-catalog-in-inte-test"] = "";
PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(root_path, new_options));
PAIMON_RETURN_NOT_OK(
catalog->CreateDatabase("foo", new_options, /*ignore_if_exists=*/false));
::ArrowSchema c_schema;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &c_schema));
PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &c_schema,
partition_keys, primary_keys, new_options,
/*ignore_if_exists=*/false));
std::string table_path = PathUtil::JoinPath(root_path, "foo.db/bar");
return Create(table_path, new_options, is_streaming_mode);
}
static Result<std::unique_ptr<TestHelper>> Create(
const std::string& table_path, const std::map<std::string, std::string>& options,
bool is_streaming_mode) {
std::string file_system_identifier = "local";
auto fs_iter = options.find(Options::FILE_SYSTEM);
if (fs_iter != options.end()) {
file_system_identifier = StringUtils::ToLowerCase(fs_iter->second);
}
PAIMON_ASSIGN_OR_RAISE(auto file_system,
FileSystemFactory::Get(file_system_identifier, table_path, options));
std::string commit_user = "commit_user";
WriteContextBuilder context_builder(table_path, commit_user);
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context,
context_builder.SetOptions(options)
.WithStreamingMode(is_streaming_mode)
.WithWriteId(12345)
.Finish());
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStoreWrite> write,
FileStoreWrite::Create(std::move(write_context)));
std::map<std::string, std::string> new_options = options;
// only for test && only check the key
new_options["enable-pk-commit-in-inte-test"] = "";
new_options["enable-object-store-commit-in-inte-test"] = "";
CommitContextBuilder commit_context_builder(table_path, commit_user);
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<CommitContext> commit_context,
commit_context_builder.SetOptions(new_options).IgnoreEmptyCommit(false).Finish());
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStoreCommit> commit,
FileStoreCommit::Create(std::move(commit_context)));
return std::unique_ptr<TestHelper>(new TestHelper(std::move(file_system), std::move(write),
std::move(commit), commit_user,
table_path, options));
}
static Result<std::unique_ptr<RecordBatch>> MakeRecordBatch(
const std::shared_ptr<arrow::DataType>& data_type, const std::string& data_str,
const std::map<std::string, std::string>& partition_map, int32_t bucket,
const std::vector<RecordBatch::RowKind>& row_kinds) {
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
auto array, arrow::ipc::internal::json::ArrayFromJSON(data_type, data_str));
::ArrowArray arrow_array;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array));
RecordBatchBuilder batch_builder(&arrow_array);
return batch_builder.SetPartition(partition_map)
.SetBucket(bucket)
.SetRowKinds(row_kinds)
.Finish();
}
Result<std::vector<std::shared_ptr<CommitMessage>>> WriteAndCommit(
std::unique_ptr<RecordBatch>&& record_batch, int64_t commit_identifier,
const std::optional<std::vector<std::shared_ptr<CommitMessage>>>&
expected_commit_messages) {
std::vector<std::unique_ptr<RecordBatch>> batches;
batches.emplace_back(std::move(record_batch));
return WriteAndCommit(std::move(batches), commit_identifier, expected_commit_messages);
}
Result<std::vector<std::shared_ptr<CommitMessage>>> WriteAndCommit(
std::vector<std::unique_ptr<RecordBatch>>&& record_batches, int64_t commit_identifier,
const std::optional<std::vector<std::shared_ptr<CommitMessage>>>&
expected_commit_messages) {
for (auto& record_batch : record_batches) {
PAIMON_RETURN_NOT_OK(write_->Write(std::move(record_batch)));
}
PAIMON_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<CommitMessage>> commit_messages,
write_->PrepareCommit(/*wait_compaction=*/false, commit_identifier));
if (expected_commit_messages) {
CheckCommitMessages(expected_commit_messages.value(), commit_messages);
CheckExternalPath(commit_messages);
}
PAIMON_RETURN_NOT_OK(commit_->Commit(commit_messages, commit_identifier));
return commit_messages;
}
Result<std::vector<std::shared_ptr<Split>>> NewScan(StartupMode startup_mode,
std::optional<int64_t> snapshot_id,
bool is_streaming = true) {
ScanContextBuilder scan_context_builder(table_path_);
scan_context_builder.WithStreamingMode(is_streaming)
.SetOptions(options_)
.AddOption(Options::SCAN_MODE, startup_mode.ToString());
if (snapshot_id) {
scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID,
std::to_string(snapshot_id.value()));
}
PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(scan_, TableScan::Create(std::move(scan_context)));
return Scan();
}
Result<std::vector<std::shared_ptr<Split>>> Scan() {
if (scan_ == nullptr) {
return Status::Invalid("need call NewScan first");
}
PAIMON_ASSIGN_OR_RAISE(auto result_plan, scan_->CreatePlan());
return result_plan->Splits();
}
static Result<bool> CheckBlobsEqual(const std::vector<std::shared_ptr<Blob>>& result_blobs,
const std::vector<std::shared_ptr<Blob>>& expected_blobs,
const std::shared_ptr<FileSystem>& fs) {
if (result_blobs.size() != expected_blobs.size()) {
std::cout << "[result_blobs.size]: " << result_blobs.size() << std::endl;
std::cout << "[expected_blobs.size]: " << expected_blobs.size() << std::endl;
return false;
}
for (uint32_t i = 0; i < result_blobs.size(); ++i) {
PAIMON_ASSIGN_OR_RAISE(auto result_stream, result_blobs[i]->NewInputStream(fs));
PAIMON_ASSIGN_OR_RAISE(auto expected_stream, expected_blobs[i]->NewInputStream(fs));
PAIMON_ASSIGN_OR_RAISE(uint64_t result_length, result_stream->Length());
PAIMON_ASSIGN_OR_RAISE(uint64_t expected_length, expected_stream->Length());
if (result_length != expected_length) {
auto result_descriptor_bytes = result_blobs[i]->ToDescriptor(GetDefaultPool());
auto expected_descriptor_bytes = expected_blobs[i]->ToDescriptor(GetDefaultPool());
PAIMON_ASSIGN_OR_RAISE(
auto result_descriptor,
BlobDescriptor::Deserialize(result_descriptor_bytes->data(),
result_descriptor_bytes->size()));
PAIMON_ASSIGN_OR_RAISE(
auto expected_descriptor,
BlobDescriptor::Deserialize(expected_descriptor_bytes->data(),
expected_descriptor_bytes->size()));
std::cout << "blobs[" << i << "]: " << std::endl;
std::cout << "[result_length(" << result_length << ") != expected_length("
<< expected_length << ")]" << std::endl;
std::cout << "[result_descriptor]: " << result_descriptor->ToString() << std::endl;
std::cout << "[expected_descriptor]: " << expected_descriptor->ToString()
<< std::endl;
return false;
}
std::vector<char> result_bytes(result_length, 0);
std::vector<char> expected_bytes(expected_length, 0);
PAIMON_RETURN_NOT_OK(result_stream->Read(result_bytes.data(), result_length));
PAIMON_RETURN_NOT_OK(expected_stream->Read(expected_bytes.data(), expected_length));
if (result_bytes != expected_bytes) {
std::cout << "blobs[" << i << "]: " << std::endl;
std::cout << "[result_bytes != expected_bytes]" << std::endl;
return false;
}
}
return true;
}
static Result<std::vector<std::shared_ptr<Blob>>> ToBlobs(
const std::shared_ptr<arrow::StructArray>& blob_struct_array) {
std::vector<std::shared_ptr<Blob>> result_blobs;
auto child_array = blob_struct_array->field(0);
assert(blob_struct_array->num_fields() == 1);
assert(blob_struct_array->null_count() == 0);
assert(child_array->null_count() == 0);
assert(child_array->type_id() == arrow::Type::type::LARGE_BINARY);
const auto& blob_array =
arrow::internal::checked_cast<const arrow::LargeBinaryArray&>(*child_array);
for (int64_t i = 0; i < blob_array.length(); ++i) {
std::string_view descriptor = blob_array.GetView(i);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Blob> blob,
Blob::FromDescriptor(descriptor.data(), descriptor.size()));
result_blobs.emplace_back(blob);
}
return result_blobs;
}
// need to reconstruct the blob array, because the array in read result do not have blob meta
Result<std::shared_ptr<arrow::Array>> ReconstructBlobArray(
const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Schema>& schema) {
::ArrowArray c_array;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &c_array));
::ArrowSchema new_c_schema;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &new_c_schema));
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto new_array,
arrow::ImportArray(&c_array, &new_c_schema));
return new_array;
}
Result<bool> ReadAndCheckResultForBlobTable(
const std::shared_ptr<arrow::Schema>& all_columns_schema,
const std::vector<std::shared_ptr<Split>>& splits, const std::string& main_expected_json,
const std::vector<PAIMON_UNIQUE_PTR<Bytes>>& expected_blob_descriptors) {
ReadContextBuilder read_context_builder(table_path_);
read_context_builder.SetOptions(options_);
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReadContext> read_context,
read_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context)));
PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(splits));
PAIMON_ASSIGN_OR_RAISE(auto read_result,
ReadResultCollector::CollectResult(batch_reader.get()));
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto concat_array,
arrow::Concatenate(read_result->chunks()));
PAIMON_ASSIGN_OR_RAISE(auto reconstruct_array,
ReconstructBlobArray(concat_array, all_columns_schema));
PAIMON_ASSIGN_OR_RAISE(
auto separated_array,
BlobUtils::SeparateBlobArray(
std::dynamic_pointer_cast<arrow::StructArray>(reconstruct_array)));
arrow::EqualOptions equal_options = arrow::EqualOptions::Defaults();
// check main columns
auto separated_schema = BlobUtils::SeparateBlobSchema(all_columns_schema);
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
auto main_expected_array,
arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_(separated_schema.main_schema->fields()), main_expected_json));
auto main_expected_chunk_array = std::make_shared<arrow::ChunkedArray>(main_expected_array);
bool main_equal = main_expected_chunk_array->Equals(
arrow::ChunkedArray(separated_array.main_array), equal_options.diff_sink(&std::cout));
if (!main_equal) {
std::cout << "[expected_data_type]" << main_expected_chunk_array->type()->ToString()
<< std::endl;
std::cout << "[actual_data_type]" << separated_array.main_array->type()->ToString()
<< std::endl;
std::cout << "[expected]:" << main_expected_chunk_array->ToString() << std::endl;
std::cout << "[actual]: " << separated_array.main_array->ToString() << std::endl;
}
// check blob column
std::vector<std::shared_ptr<Blob>> expected_blobs;
for (const auto& descriptor : expected_blob_descriptors) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Blob> blob,
Blob::FromDescriptor(descriptor->data(), descriptor->size()));
expected_blobs.emplace_back(blob);
}
PAIMON_ASSIGN_OR_RAISE(auto result_blobs, ToBlobs(separated_array.blob_array));
PAIMON_ASSIGN_OR_RAISE(bool blob_equal, CheckBlobsEqual(result_blobs, expected_blobs, fs_));
table_read.reset();
return main_equal && blob_equal;
}
Result<bool> ReadAndCheckResult(const std::shared_ptr<arrow::DataType>& data_type,
const std::vector<std::shared_ptr<Split>>& splits,
const std::string& expected_result) {
ReadContextBuilder read_context_builder(table_path_);
read_context_builder.SetOptions(options_);
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReadContext> read_context,
read_context_builder.Finish());
PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context)));
PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(splits));
PAIMON_ASSIGN_OR_RAISE(auto read_result,
ReadResultCollector::CollectResult(batch_reader.get()));
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
auto expected_array,
arrow::ipc::internal::json::ArrayFromJSON(data_type, expected_result));
auto expected_chunk_array = std::make_shared<arrow::ChunkedArray>(expected_array);
arrow::EqualOptions equal_options = arrow::EqualOptions::Defaults();
bool is_equal =
expected_chunk_array->Equals(read_result, equal_options.diff_sink(&std::cout));
if (!is_equal) {
std::cout << "[expected_data_type]" << expected_chunk_array->type()->ToString()
<< std::endl;
std::cout << "[actual_data_type]" << read_result->type()->ToString() << std::endl;
std::cout << "[expected]:" << expected_chunk_array->ToString() << std::endl;
std::cout << "[actual]: " << read_result->ToString() << std::endl;
}
table_read.reset();
return is_equal;
}
Result<std::optional<Snapshot>> LatestSnapshot() const {
auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(commit_.get());
return commit_impl->snapshot_manager_->LatestSnapshotOfUser(commit_user_);
}
Result<std::optional<std::shared_ptr<TableSchema>>> LatestSchema() const {
auto commit_impl = dynamic_cast<FileStoreCommitImpl*>(commit_.get());
return commit_impl->schema_manager_->Latest();
}
Result<std::string> PartitionStr(const BinaryRow& partition) const {
auto abstract_write = dynamic_cast<AbstractFileStoreWrite*>(write_.get());
return abstract_write->file_store_path_factory_->GetPartitionString(partition);
}
static void CheckCommitMessages(std::vector<std::shared_ptr<CommitMessage>> expected,
std::vector<std::shared_ptr<CommitMessage>> actual) {
ASSERT_EQ(expected.size(), actual.size());
auto commit_message_sort_function = [](const std::shared_ptr<CommitMessage>& lhs,
const std::shared_ptr<CommitMessage>& rhs) -> bool {
auto lhs_impl = std::dynamic_pointer_cast<CommitMessageImpl>(lhs);
auto rhs_impl = std::dynamic_pointer_cast<CommitMessageImpl>(rhs);
if (lhs_impl->Partition() == rhs_impl->Partition()) {
return lhs_impl->Bucket() < rhs_impl->Bucket();
}
return lhs_impl->Partition().HashCode() < rhs_impl->Partition().HashCode();
};
std::stable_sort(expected.begin(), expected.end(), commit_message_sort_function);
std::stable_sort(actual.begin(), actual.end(), commit_message_sort_function);
for (size_t i = 0; i < actual.size(); i++) {
auto result_impl = std::dynamic_pointer_cast<CommitMessageImpl>(actual[i]);
auto expect_impl = std::dynamic_pointer_cast<CommitMessageImpl>(expected[i]);
bool is_equal = result_impl->TEST_Equal(*expect_impl);
if (!is_equal) {
std::cout << "actual_msg:" << result_impl->ToString() << std::endl;
std::cout << "expect_msg:" << expect_impl->ToString() << std::endl;
}
ASSERT_TRUE(is_equal);
}
}
private:
void CheckExternalPath(const std::vector<std::shared_ptr<CommitMessage>>& actuals) {
for (const auto& actual : actuals) {
auto msg = std::dynamic_pointer_cast<CommitMessageImpl>(actual);
auto files = msg->GetNewFilesIncrement().NewFiles();
for (auto& file : files) {
if (file->external_path) {
ASSERT_OK_AND_ASSIGN(bool file_exist, fs_->Exists(file->external_path.value()));
ASSERT_TRUE(file_exist);
}
}
}
}
TestHelper(std::unique_ptr<FileSystem> fs, std::unique_ptr<FileStoreWrite> write,
std::unique_ptr<FileStoreCommit> commit, const std::string& commit_user,
const std::string& table_path, const std::map<std::string, std::string>& options)
: fs_(std::move(fs)),
write_(std::move(write)),
commit_(std::move(commit)),
commit_user_(commit_user),
table_path_(table_path),
options_(options) {}
const std::string& CommitUser() const {
return commit_user_;
}
std::shared_ptr<FileSystem> fs_;
std::unique_ptr<FileStoreWrite> write_;
std::unique_ptr<FileStoreCommit> commit_;
std::unique_ptr<TableScan> scan_;
std::string commit_user_;
std::string table_path_;
std::map<std::string, std::string> options_;
};
} // namespace paimon::test