blob: 5d1d7dc207cef7f9ae545fbbe42dbc4c42c010c1 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include "common/config.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column_array.h"
#include "core/column/column_map.h"
#include "core/column/column_struct.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_struct.h"
#include "core/data_type/data_type_variant.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/field.h"
#include "format/native/native_format.h"
#include "format/native/native_reader.h"
#include "format/transformer/vnative_transformer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "runtime/runtime_state.h"
#include "util/jsonb_writer.h"
#include "util/uid_util.h"
namespace doris {
class NativeReaderWriterTest : public ::testing::Test {};
static void fill_primitive_columns(Block& block, size_t rows) {
DataTypePtr int_type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
DataTypePtr str_type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false));
{
MutableColumnPtr col = int_type->create_column();
for (size_t i = 0; i < rows; ++i) {
if (i % 3 == 0) {
// null
col->insert_default();
} else {
// insert int value via Field to match column interface
auto field =
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
col->insert(field);
}
}
block.insert(ColumnWithTypeAndName(std::move(col), int_type, "int_col"));
}
{
MutableColumnPtr col = str_type->create_column();
for (size_t i = 0; i < rows; ++i) {
if (i % 4 == 0) {
col->insert_default();
} else {
std::string v = "s" + std::to_string(i);
// insert varchar value via Field
auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
col->insert(field);
}
}
block.insert(ColumnWithTypeAndName(std::move(col), str_type, "str_col"));
}
}
static void fill_array_column(Block& block, size_t rows) {
// array<int>
DataTypePtr arr_nested_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false);
DataTypePtr arr_type = make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
{
MutableColumnPtr col = arr_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
auto& array_col = assert_cast<ColumnArray&>(nested);
auto& offsets = array_col.get_offsets();
auto& data = array_col.get_data();
auto mutable_data = data.assume_mutable();
for (size_t i = 0; i < rows; ++i) {
if (i % 5 == 0) {
// null array
nullable_col->insert_default();
} else {
// non-null array with 3 elements: [i, i+1, i+2]
null_map.push_back(0);
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
}
}
block.insert(ColumnWithTypeAndName(std::move(col), arr_type, "arr_col"));
}
}
static void fill_map_column(Block& block, size_t rows) {
// map<string, int>
DataTypePtr map_key_type = DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
DataTypePtr map_value_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false);
DataTypePtr map_type =
make_nullable(std::make_shared<DataTypeMap>(map_key_type, map_value_type));
// map<string, int> column
{
MutableColumnPtr col = map_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
for (size_t i = 0; i < rows; ++i) {
if (i % 7 == 0) {
// null map
nullable_col->insert_default();
} else {
null_map.push_back(0);
auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
auto& values = assert_cast<ColumnMap&>(nested).get_values();
auto mutable_keys = keys.assume_mutable();
auto mutable_values = values.assume_mutable();
std::string k1 = "k" + std::to_string(i);
std::string k2 = "k" + std::to_string(i + 1);
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
mutable_values->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
mutable_values->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
}
}
block.insert(ColumnWithTypeAndName(std::move(col), map_type, "map_col"));
}
}
static void fill_struct_column(Block& block, size_t rows) {
// struct<si:int, ss:string>
DataTypes struct_fields;
struct_fields.emplace_back(
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
struct_fields.emplace_back(
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false)));
DataTypePtr struct_type = make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
// struct<si:int, ss:string> column
{
MutableColumnPtr col = struct_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
auto& struct_col = assert_cast<ColumnStruct&>(nested);
const auto& fields = struct_col.get_columns();
for (size_t i = 0; i < rows; ++i) {
if (i % 6 == 0) {
nullable_col->insert_default();
} else {
null_map.push_back(0);
auto mutable_field0 = fields[0]->assume_mutable();
auto mutable_field1 = fields[1]->assume_mutable();
// int field
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
static_cast<int32_t>(i * 100)));
// string field
std::string vs = "ss" + std::to_string(i);
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
}
}
block.insert(ColumnWithTypeAndName(std::move(col), struct_type, "struct_col"));
}
}
static void fill_variant_column(Block& block, size_t rows) {
// variant
DataTypePtr variant_type = make_nullable(std::make_shared<DataTypeVariant>());
// variant column: use JSON strings + deserialize_column_from_json_vector to populate ColumnVariant
{
MutableColumnPtr col = variant_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column(); // ColumnVariant
// Prepare JSON strings with variable number of keys per row
std::vector<std::string> json_rows;
json_rows.reserve(rows);
std::vector<Slice> slices;
slices.reserve(rows);
size_t key_cnt = 100;
for (size_t i = 0; i < rows; ++i) {
// key count cycles between 1, 2, 3, ... for better coverage
std::string json = "{";
for (size_t k = 0; k < key_cnt; ++k) {
if (k > 0) {
json += ",";
}
// keys: "k0", "k1", ...
json += "\"k" + std::to_string(k) + "\":";
// mix int and string values
if ((i + k) % 2 == 0) {
json += std::to_string(static_cast<int32_t>(i * 10 + k));
} else {
json += "\"v" + std::to_string(i * 10 + k) + "\"";
}
}
json += "}";
json_rows.emplace_back(std::move(json));
slices.emplace_back(json_rows.back().data(), json_rows.back().size());
}
// Use Variant SerDe to parse JSON into ColumnVariant
auto variant_type_inner = std::make_shared<DataTypeVariant>();
auto serde = variant_type_inner->get_serde();
auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
uint64_t num_deserialized = 0;
DataTypeSerDe::FormatOptions options;
Status st = variant_serde->deserialize_column_from_json_vector(nested, slices,
&num_deserialized, options);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(rows, num_deserialized);
// All rows are treated as non-null here
auto& null_map = nullable_col->get_null_map_data();
null_map.clear();
null_map.resize_fill(rows, 0);
block.insert(ColumnWithTypeAndName(std::move(col), variant_type, "variant_col"));
}
}
static void fill_complex_columns(Block& block, size_t rows) {
fill_array_column(block, rows);
fill_map_column(block, rows);
fill_struct_column(block, rows);
fill_variant_column(block, rows);
}
static Block create_test_block(size_t rows) {
Block block;
// simple schema with primitive + complex types:
// int_col, str_col, arr_col, map_col, struct_col, variant_col
fill_primitive_columns(block, rows);
fill_complex_columns(block, rows);
return block;
}
TEST_F(NativeReaderWriterTest, round_trip_native_file) {
// Prepare a temporary local file path
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_" + uuid + ".native";
// 1. Write block to Native file via VNativeTransformer
auto fs = io::global_local_filesystem();
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_test_block(16);
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
// VNativeTransformer::close() will also close the underlying FileWriter,
// so we don't need (and must not) close file_writer again here.
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
// 2. Read back via NativeReader using normal file scan path
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(src_block.rows(), read_rows);
ASSERT_EQ(src_block.columns(), dst_block.columns());
// Compare column-wise values
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src = src_block.get_by_position(col);
const auto& dst = dst_block.get_by_position(col);
ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
ASSERT_EQ(src.column->size(), dst.column->size());
for (size_t row = 0; row < src_block.rows(); ++row) {
auto src_field = (*src.column)[row];
auto dst_field = (*dst.column)[row];
ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row;
}
}
// Next call should hit EOF
Block dst_block2;
read_rows = 0;
eof = false;
st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(eof);
ASSERT_EQ(read_rows, 0);
// Clean up temp file
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// Edge cases: empty file and single-row file
TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) {
auto fs = io::global_local_filesystem();
// 1. Empty native file (no data blocks)
{
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_empty_" + uuid + ".native";
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
// Write an empty block, should not produce any data block in file.
Block empty_block;
st = transformer.write(empty_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
// Read back: should directly hit EOF with 0 rows.
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(0U, read_rows);
ASSERT_TRUE(eof);
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// 2. Single-row native file
{
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_single_row_" + uuid + ".native";
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_test_block(1);
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(src_block.rows(), read_rows);
ASSERT_EQ(src_block.columns(), dst_block.columns());
ASSERT_FALSE(eof);
// Verify data equality for the single row.
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src = src_block.get_by_position(col);
const auto& dst = dst_block.get_by_position(col);
ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
ASSERT_EQ(src.column->size(), dst.column->size());
auto src_field = (*src.column)[0];
auto dst_field = (*dst.column)[0];
ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=0";
}
// Next call should hit EOF
Block dst_block2;
read_rows = 0;
eof = false;
st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(eof);
ASSERT_EQ(read_rows, 0U);
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
}
// Large volume test: verify round-trip correctness with many rows.
TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) {
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_large_" + uuid + ".native";
auto fs = io::global_local_filesystem();
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
// Use a relatively large number of rows to test stability and performance characteristics.
const size_t kRows = 9999;
Block src_block = create_test_block(kRows);
// Split into multiple blocks and write them one by one to exercise multi-block path.
const size_t kBatchRows = 128;
for (size_t offset = 0; offset < kRows; offset += kBatchRows) {
size_t len = std::min(kBatchRows, kRows - offset);
// Create a sub block with the same schema and a row range [offset, offset+len)
Block sub_block = src_block.clone_empty();
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src_col = *src_block.get_by_position(col).column;
const auto& dst_col_holder = *sub_block.get_by_position(col).column;
auto dst_mutable = dst_col_holder.assume_mutable();
dst_mutable->insert_range_from(src_col, offset, len);
}
st = transformer.write(sub_block);
ASSERT_TRUE(st.ok()) << st;
}
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
// Read back in multiple blocks and merge into a single result block.
Block merged_block;
bool first_block = true;
size_t total_read_rows = 0;
while (true) {
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
if (read_rows > 0) {
if (first_block) {
merged_block = dst_block;
total_read_rows = read_rows;
first_block = false;
} else {
MutableBlock merged_mutable(&merged_block);
Status add_st = merged_mutable.add_rows(&dst_block, 0, read_rows);
ASSERT_TRUE(add_st.ok()) << add_st;
total_read_rows += read_rows;
}
}
if (eof) {
break;
}
if (read_rows == 0) {
break;
}
}
ASSERT_EQ(src_block.rows(), total_read_rows);
ASSERT_EQ(src_block.columns(), merged_block.columns());
// Compare column-wise values
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src = src_block.get_by_position(col);
const auto& dst = merged_block.get_by_position(col);
ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
ASSERT_EQ(src.column->size(), dst.column->size());
for (size_t row = 0; row < src_block.rows(); row += 10) {
auto src_field = (*src.column)[row];
auto dst_field = (*dst.column)[row];
ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row;
}
}
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// Verify that NativeReader forces all columns to nullable, even if the writer
// serialized non-nullable columns.
TEST_F(NativeReaderWriterTest, non_nullable_columns_forced_nullable) {
auto fs = io::global_local_filesystem();
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_non_nullable_" + uuid + ".native";
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
// Use default compression type.
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
// Build a block with non-nullable columns.
Block src_block;
DataTypePtr int_type =
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
{
MutableColumnPtr col = int_type->create_column();
for (int i = 0; i < 8; ++i) {
auto field = Field::create_field<PrimitiveType::TYPE_INT>(i * 3);
col->insert(field);
}
src_block.insert(ColumnWithTypeAndName(std::move(col), int_type, "int_nn"));
}
DataTypePtr str_type =
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARCHAR, false);
{
MutableColumnPtr col = str_type->create_column();
for (int i = 0; i < 8; ++i) {
std::string v = "v" + std::to_string(i);
auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
col->insert(field);
}
src_block.insert(ColumnWithTypeAndName(std::move(col), str_type, "str_nn"));
}
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
// Read back via NativeReader.
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(src_block.rows(), read_rows);
ASSERT_FALSE(eof);
// All columns returned by NativeReader should be nullable.
for (size_t col = 0; col < dst_block.columns(); ++col) {
const auto& dst = dst_block.get_by_position(col);
ASSERT_TRUE(dst.type->is_nullable()) << "column " << col << " should be nullable";
}
// Values should be preserved.
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src = src_block.get_by_position(col);
const auto& dst = dst_block.get_by_position(col);
ASSERT_EQ(src.column->size(), dst.column->size());
for (size_t row = 0; row < src_block.rows(); ++row) {
auto src_field = (*src.column)[row];
auto dst_field = (*dst.column)[row];
ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row;
}
}
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// Verify that VNativeTransformer writes the native file header and that
// NativeReader can transparently read files with this header.
TEST_F(NativeReaderWriterTest, transformer_writes_header_and_reader_handles_it) {
auto fs = io::global_local_filesystem();
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_with_header_" + uuid + ".native";
// Write a small block via VNativeTransformer.
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_test_block(4);
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
// Read back raw bytes and verify the header.
{
io::FileReaderSPtr file_reader;
st = fs->open_file(file_path, &file_reader);
ASSERT_TRUE(st.ok()) << st;
size_t file_size = file_reader->size();
ASSERT_GE(file_size, sizeof(uint64_t) + 12);
char header[12];
Slice header_slice(header, sizeof(header));
size_t bytes_read = 0;
st = file_reader->read_at(0, header_slice, &bytes_read);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(sizeof(header), bytes_read);
ASSERT_EQ(0, memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)));
uint32_t version = 0;
memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
ASSERT_EQ(DORIS_NATIVE_FORMAT_VERSION, version);
}
// Now read via NativeReader; it should detect the header and skip it.
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(src_block.rows(), read_rows);
ASSERT_FALSE(eof);
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// Verify get_columns and get_parsed_schema can probe schema without scanning
// the whole file.
TEST_F(NativeReaderWriterTest, get_columns_and_parsed_schema) {
auto fs = io::global_local_filesystem();
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_schema_" + uuid + ".native";
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_test_block(5);
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
std::unordered_map<std::string, DataTypePtr> name_to_type;
st = reader_impl.get_columns(&name_to_type);
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(reader_impl.missing_cols().empty());
// All columns from src_block should appear in name_to_type.
for (size_t i = 0; i < src_block.columns(); ++i) {
const auto& col = src_block.get_by_position(i);
auto it = name_to_type.find(col.name);
ASSERT_TRUE(it != name_to_type.end()) << "missing column " << col.name;
ASSERT_TRUE(it->second->is_nullable()) << "schema type should be nullable for " << col.name;
}
std::vector<std::string> col_names;
std::vector<DataTypePtr> col_types;
st = reader_impl.get_parsed_schema(&col_names, &col_types);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(col_names.size(), col_types.size());
ASSERT_EQ(col_names.size(), src_block.columns());
for (size_t i = 0; i < col_names.size(); ++i) {
ASSERT_TRUE(col_types[i]->is_nullable());
}
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
// Create a test block containing all known primitive and complex types with a single row.
// This function covers:
// - Basic integers: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT
// - Floating point: FLOAT, DOUBLE
// - Date/Time: DATE, DATETIME, DATEV2, DATETIMEV2, TIMEV2
// - String types: CHAR, VARCHAR, STRING
// - Decimal types: DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128I, DECIMAL256
// - IP types: IPV4, IPV6
// - JSON: JSONB
// - Complex types: ARRAY, MAP, STRUCT, VARIANT
static Block create_all_types_test_block() {
Block block;
// 1. BOOLEAN
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_BOOLEAN>(1)); // true
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_boolean"));
}
// 2. TINYINT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_TINYINT, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_TINYINT>(42));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_tinyint"));
}
// 3. SMALLINT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_SMALLINT>(static_cast<int16_t>(1234)));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_smallint"));
}
// 4. INT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(123456)));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_int"));
}
// 5. BIGINT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BIGINT, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_BIGINT>(
static_cast<int64_t>(9876543210LL)));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_bigint"));
}
// 6. LARGEINT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, false));
MutableColumnPtr col = type->create_column();
Int128 large_val = Int128(123456789012345LL) * 1000000;
col->insert(Field::create_field<PrimitiveType::TYPE_LARGEINT>(large_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_largeint"));
}
// 7. FLOAT
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_FLOAT, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_FLOAT>(3.14F));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_float"));
}
// 8. DOUBLE
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_DOUBLE>(1.234567890123456789));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_double"));
}
// 9. DATE (DateV1)
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATE, false));
MutableColumnPtr col = type->create_column();
// DateV1 stores as int64 representing binary date
VecDateTimeValue dt;
dt.from_date_int64(20231215); // 2023-12-15
col->insert(Field::create_field<PrimitiveType::TYPE_DATE>(dt));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_date"));
}
// 10. DATETIME (DateTimeV1)
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATETIME, false));
MutableColumnPtr col = type->create_column();
VecDateTimeValue dt;
dt.from_date_int64(20231215103045LL); // 2023-12-15 10:30:45
col->insert(Field::create_field<PrimitiveType::TYPE_DATETIME>(dt));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datetime"));
}
// 11. DATEV2
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATEV2, false));
MutableColumnPtr col = type->create_column();
DateV2Value<DateV2ValueType> dv2;
dv2.from_date_int64(20231215);
col->insert(Field::create_field<PrimitiveType::TYPE_DATEV2>(dv2));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datev2"));
}
// 12. DATETIMEV2 (scale=0)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, false, 0, 0));
MutableColumnPtr col = type->create_column();
DateV2Value<DateTimeV2ValueType> dtv2;
dtv2.from_date_int64(20231215103045LL);
col->insert(Field::create_field<PrimitiveType::TYPE_DATETIMEV2>(dtv2));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datetimev2"));
}
// 13. TIMEV2 (scale=0)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, false, 0, 0));
MutableColumnPtr col = type->create_column();
// TIMEV2 is stored as Float64 representing seconds
col->insert(
Field::create_field<PrimitiveType::TYPE_TIMEV2>(37845.0)); // 10:30:45 in seconds
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_timev2"));
}
// 14. CHAR
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_CHAR, false, 0, 0, 20));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_CHAR>(std::string("fixed_char_val")));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_char"));
}
// 15. VARCHAR
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false));
MutableColumnPtr col = type->create_column();
col->insert(
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("variable_string")));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_varchar"));
}
// 16. STRING
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_STRING, false));
MutableColumnPtr col = type->create_column();
col->insert(Field::create_field<PrimitiveType::TYPE_STRING>(
std::string("long_text_content_here")));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_string"));
}
// 17. DECIMALV2 (precision=27, scale=9)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DECIMALV2, false, 27, 9));
MutableColumnPtr col = type->create_column();
DecimalV2Value dec_val(static_cast<Int128>(123456789123456789LL));
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMALV2>(dec_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimalv2"));
}
// 18. DECIMAL32 (precision=9, scale=2)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DECIMAL32, false, 9, 2));
MutableColumnPtr col = type->create_column();
Decimal32 dec_val(static_cast<Int32>(12345678));
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL32>(dec_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal32"));
}
// 19. DECIMAL64 (precision=18, scale=4)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DECIMAL64, false, 18, 4));
MutableColumnPtr col = type->create_column();
Decimal64 dec_val(static_cast<Int64>(123456789012345678LL));
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL64>(dec_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal64"));
}
// 20. DECIMAL128I (precision=38, scale=6)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, false, 38, 6));
MutableColumnPtr col = type->create_column();
Decimal128V3 dec_val(static_cast<Int128>(123456789012345678LL) * 100);
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL128I>(dec_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal128"));
}
// 21. DECIMAL256 (precision=76, scale=10)
{
DataTypePtr type = make_nullable(
DataTypeFactory::instance().create_data_type(TYPE_DECIMAL256, false, 76, 10));
MutableColumnPtr col = type->create_column();
wide::Int256 wide_val = wide::Int256(123456789012345678LL) * 10000000000LL;
Decimal256 dec_val(wide_val);
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL256>(dec_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal256"));
}
// 22. IPV4
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV4, false));
MutableColumnPtr col = type->create_column();
IPv4 ip_val = (192U << 24) | (168U << 16) | (1U << 8) | 100U; // 192.168.1.100
col->insert(Field::create_field<PrimitiveType::TYPE_IPV4>(ip_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv4"));
}
// 23. IPV6
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV6, false));
MutableColumnPtr col = type->create_column();
// ::ffff:192.168.1.100 in IPv6
IPv6 ip6_val = 0;
ip6_val = (static_cast<IPv6>(0xFFFF) << 32) |
((192U << 24) | (168U << 16) | (1U << 8) | 100U);
col->insert(Field::create_field<PrimitiveType::TYPE_IPV6>(ip6_val));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv6"));
}
// 24. JSONB
{
DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_JSONB, false));
MutableColumnPtr col = type->create_column();
// Use JsonbWriter to create JSONB binary data
JsonbWriter writer;
writer.writeStartObject();
writer.writeKey("key", static_cast<uint8_t>(3));
writer.writeStartString();
writer.writeString("value");
writer.writeEndString();
writer.writeKey("num", static_cast<uint8_t>(3));
writer.writeInt32(123);
writer.writeEndObject();
const char* jsonb_data = writer.getOutput()->getBuffer();
size_t jsonb_size = writer.getOutput()->getSize();
JsonbField field(jsonb_data, jsonb_size);
col->insert(Field::create_field<PrimitiveType::TYPE_JSONB>(std::move(field)));
block.insert(ColumnWithTypeAndName(std::move(col), type, "col_jsonb"));
}
// 25. ARRAY<INT>
{
DataTypePtr arr_nested_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false);
DataTypePtr arr_type = make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
MutableColumnPtr col = arr_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
null_map.push_back(0); // non-null
auto& array_col = assert_cast<ColumnArray&>(nested);
auto& offsets = array_col.get_offsets();
auto& data = array_col.get_data();
auto mutable_data = data.assume_mutable();
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(10)));
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(20)));
mutable_data->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(30)));
offsets.push_back(3); // 3 elements
block.insert(ColumnWithTypeAndName(std::move(col), arr_type, "col_array"));
}
// 26. MAP<STRING, INT>
{
DataTypePtr map_key_type =
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
DataTypePtr map_value_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false);
DataTypePtr map_type =
make_nullable(std::make_shared<DataTypeMap>(map_key_type, map_value_type));
MutableColumnPtr col = map_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
null_map.push_back(0); // non-null
auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
auto& values = assert_cast<ColumnMap&>(nested).get_values();
auto mutable_keys = keys.assume_mutable();
auto mutable_values = values.assume_mutable();
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key1")));
mutable_values->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(100)));
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key2")));
mutable_values->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(200)));
offsets.push_back(2); // 2 key-value pairs
block.insert(ColumnWithTypeAndName(std::move(col), map_type, "col_map"));
}
// 27. STRUCT<si:INT, ss:VARCHAR>
{
DataTypes struct_fields;
struct_fields.emplace_back(
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
struct_fields.emplace_back(
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false)));
Strings field_names = {"si", "ss"};
DataTypePtr struct_type =
make_nullable(std::make_shared<DataTypeStruct>(struct_fields, field_names));
MutableColumnPtr col = struct_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
auto& null_map = nullable_col->get_null_map_data();
null_map.push_back(0); // non-null
auto& struct_col = assert_cast<ColumnStruct&>(nested);
const auto& fields = struct_col.get_columns();
auto mutable_field0 = fields[0]->assume_mutable();
auto mutable_field1 = fields[1]->assume_mutable();
mutable_field0->insert(
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(999)));
mutable_field1->insert(
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("struct_val")));
block.insert(ColumnWithTypeAndName(std::move(col), struct_type, "col_struct"));
}
// 28. VARIANT (JSON object)
{
DataTypePtr variant_type = make_nullable(std::make_shared<DataTypeVariant>());
MutableColumnPtr col = variant_type->create_column();
auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
auto& nested = nullable_col->get_nested_column();
std::string json_str = R"({"name":"test","value":12345})";
std::vector<Slice> slices = {Slice(json_str.data(), json_str.size())};
auto variant_type_inner = std::make_shared<DataTypeVariant>();
auto serde = variant_type_inner->get_serde();
auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
uint64_t num_deserialized = 0;
DataTypeSerDe::FormatOptions options;
Status st = variant_serde->deserialize_column_from_json_vector(nested, slices,
&num_deserialized, options);
EXPECT_TRUE(st.ok()) << st;
EXPECT_EQ(1U, num_deserialized);
auto& null_map = nullable_col->get_null_map_data();
null_map.clear();
null_map.resize_fill(1, 0);
block.insert(ColumnWithTypeAndName(std::move(col), variant_type, "col_variant"));
}
return block;
}
// Pre-generated native file path for all types test.
// The file is stored in: be/test/data/vec/native/all_types_single_row.native
static std::string get_all_types_native_file_path() {
auto root_dir = std::string(getenv("ROOT"));
return root_dir + "/be/test/data/vec/native/all_types_single_row.native";
}
// Generator test: Generate native file with all types (DISABLED by default).
// Run this test manually to regenerate the test data file:
// ./run-be-ut.sh --run --filter=*generate_all_types_native_file*
// Then copy the generated file to: be/test/data/vec/native/all_types_single_row.native
TEST_F(NativeReaderWriterTest, generate_all_types_native_file) {
// Output to current directory, user needs to copy it to test data dir
std::string file_path = "./all_types_single_row.native";
auto fs = io::global_local_filesystem();
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_all_types_test_block();
ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 row";
ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 columns for all types";
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
std::cout << "Generated native file: " << file_path << std::endl;
std::cout << "Please copy it to: be/test/data/vec/native/all_types_single_row.native"
<< std::endl;
}
// Test: read pre-generated native file with all types and verify data
TEST_F(NativeReaderWriterTest, read_all_types_from_pregenerated_file) {
std::string file_path = get_all_types_native_file_path();
auto fs = io::global_local_filesystem();
bool exists = false;
Status st = fs->exists(file_path, &exists);
DCHECK(exists) << "Pre-generated native file not found: " << file_path
<< ". Run generate_all_types_native_file to generate it.";
RuntimeState state;
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(1U, read_rows) << "Should read exactly 1 row";
ASSERT_EQ(28U, dst_block.columns()) << "Should have 28 columns for all types";
ASSERT_FALSE(eof);
// Regenerate expected block and compare
Block expected_block = create_all_types_test_block();
// Verify data equality for the single row
for (size_t col = 0; col < expected_block.columns(); ++col) {
const auto& src = expected_block.get_by_position(col);
const auto& dst = dst_block.get_by_position(col);
// Type family should match
ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
<< "Type mismatch at col=" << col << " (" << src.name << ")";
ASSERT_EQ(src.column->size(), dst.column->size())
<< "Size mismatch at col=" << col << " (" << src.name << ")";
// Compare field values
auto src_field = (*src.column)[0];
auto dst_field = (*dst.column)[0];
ASSERT_EQ(src_field, dst_field)
<< "Value mismatch at col=" << col << " (" << src.name << ")";
}
// Next call should hit EOF
Block dst_block2;
read_rows = 0;
eof = false;
st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(eof);
ASSERT_EQ(read_rows, 0U);
}
// Test: round-trip all known types with a single row (generates temp file each time)
TEST_F(NativeReaderWriterTest, round_trip_all_types_single_row) {
UniqueId uid = UniqueId::gen_uid();
std::string uuid = uid.to_string();
std::string file_path = "./native_format_all_types_" + uuid + ".native";
auto fs = io::global_local_filesystem();
io::FileWriterPtr file_writer;
Status st = fs->create_file(file_path, &file_writer);
ASSERT_TRUE(st.ok()) << st;
RuntimeState state;
VExprContextSPtrs exprs;
VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
st = transformer.open();
ASSERT_TRUE(st.ok()) << st;
Block src_block = create_all_types_test_block();
ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 row";
ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 columns for all types";
st = transformer.write(src_block);
ASSERT_TRUE(st.ok()) << st;
st = transformer.close();
ASSERT_TRUE(st.ok()) << st;
// Read back via NativeReader
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
TFileRangeDesc scan_range;
scan_range.__set_path(file_path);
scan_range.__set_file_type(TFileType::FILE_LOCAL);
NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state);
Block dst_block;
size_t read_rows = 0;
bool eof = false;
st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(src_block.rows(), read_rows);
ASSERT_EQ(src_block.columns(), dst_block.columns());
ASSERT_FALSE(eof);
// Verify data equality for the single row
for (size_t col = 0; col < src_block.columns(); ++col) {
const auto& src = src_block.get_by_position(col);
const auto& dst = dst_block.get_by_position(col);
// Type family should match
ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
<< "Type mismatch at col=" << col << " (" << src.name << ")";
ASSERT_EQ(src.column->size(), dst.column->size())
<< "Size mismatch at col=" << col << " (" << src.name << ")";
// Compare field values
auto src_field = (*src.column)[0];
auto dst_field = (*dst.column)[0];
ASSERT_EQ(src_field, dst_field)
<< "Value mismatch at col=" << col << " (" << src.name << ")";
}
// Next call should hit EOF
Block dst_block2;
read_rows = 0;
eof = false;
st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
ASSERT_TRUE(st.ok()) << st;
ASSERT_TRUE(eof);
ASSERT_EQ(read_rows, 0U);
// Clean up temp file
bool exists = false;
st = fs->exists(file_path, &exists);
if (st.ok() && exists) {
static_cast<void>(fs->delete_file(file_path));
}
}
} // namespace doris