blob: 348c18a4aeacb017877b380ccf7b1a6a0d5ce101 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <cstring>
#include "common/tablet.h"
#include "common/tsblock/tsblock.h"
#include "cwrapper/tsfile_cwrapper.h"
#include "utils/db_utils.h"
// Forward declarations for arrow namespace (functions are defined in
// arrow_c.cc)
namespace arrow {
// Type aliases for Arrow types (defined in tsfile_cwrapper.h)
using ArrowArray = ::ArrowArray;
using ArrowSchema = ::ArrowSchema;
#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4
// Function declarations (defined in arrow_c.cc)
int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array,
ArrowSchema* out_schema);
int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array,
const ArrowSchema* in_schema,
const storage::TableSchema* reg_schema,
storage::Tablet** out_tablet, int time_col_index);
} // namespace arrow
static void VerifyArrowSchema(
const ::arrow::ArrowSchema* schema,
const std::vector<std::string>& expected_names,
const std::vector<const char*>& expected_formats) {
ASSERT_NE(schema, nullptr);
EXPECT_STREQ(schema->format, "+s");
EXPECT_EQ(schema->n_children, expected_names.size());
ASSERT_NE(schema->children, nullptr);
for (size_t i = 0; i < expected_names.size(); ++i) {
const arrow::ArrowSchema* child = schema->children[i];
ASSERT_NE(child, nullptr);
EXPECT_STREQ(child->name, expected_names[i].c_str());
EXPECT_STREQ(child->format, expected_formats[i]);
EXPECT_EQ(child->flags, ARROW_FLAG_NULLABLE);
}
}
static void VerifyArrowArrayData(const arrow::ArrowArray* array,
uint32_t expected_length) {
ASSERT_NE(array, nullptr);
EXPECT_EQ(array->length, expected_length);
EXPECT_EQ(array->n_children, 3);
ASSERT_NE(array->children, nullptr);
}
TEST(ArrowTsBlockTest, NormalTsBlock_NoNulls) {
common::TupleDesc tuple_desc;
common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY,
common::RLE);
common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY,
common::RLE);
common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY,
common::RLE);
tuple_desc.push_back(col1);
tuple_desc.push_back(col2);
tuple_desc.push_back(col3);
common::TsBlock tsblock(&tuple_desc, 10);
ASSERT_EQ(tsblock.init(), common::E_OK);
common::RowAppender row_appender(&tsblock);
for (int i = 0; i < 5; ++i) {
ASSERT_TRUE(row_appender.add_row());
int32_t int_val = 100 + i;
row_appender.append(0, reinterpret_cast<const char*>(&int_val),
sizeof(int32_t));
double double_val = 3.14 + i;
row_appender.append(1, reinterpret_cast<const char*>(&double_val),
sizeof(double));
std::string str_val = "test" + std::to_string(i);
row_appender.append(2, str_val.c_str(), str_val.length());
}
EXPECT_EQ(tsblock.get_row_count(), 5);
arrow::ArrowArray array;
arrow::ArrowSchema schema;
int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema);
ASSERT_EQ(ret, common::E_OK);
std::vector<std::string> expected_names = {"int_col", "double_col",
"string_col"};
std::vector<const char*> expected_formats = {"i", "g", "u"};
VerifyArrowSchema(&schema, expected_names, expected_formats);
VerifyArrowArrayData(&array, 5);
ASSERT_NE(array.children, nullptr);
ASSERT_NE(array.children[0], nullptr);
ASSERT_NE(array.children[1], nullptr);
ASSERT_NE(array.children[2], nullptr);
const ArrowArray* int_array = array.children[0];
EXPECT_EQ(int_array->length, 5);
EXPECT_EQ(int_array->null_count, 0);
ASSERT_NE(int_array->buffers, nullptr);
const int32_t* int_data = reinterpret_cast<const int32_t*>(
int_array->buffers[int_array->n_buffers - 1]);
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(int_data[i], 100 + i);
}
const arrow::ArrowArray* double_array = array.children[1];
EXPECT_EQ(double_array->length, 5);
EXPECT_EQ(double_array->null_count, 0);
const double* double_data = reinterpret_cast<const double*>(
double_array->buffers[double_array->n_buffers - 1]);
for (int i = 0; i < 5; ++i) {
EXPECT_DOUBLE_EQ(double_data[i], 3.14 + i);
}
const arrow::ArrowArray* string_array = array.children[2];
EXPECT_EQ(string_array->length, 5);
EXPECT_EQ(string_array->null_count, 0);
ASSERT_NE(string_array->buffers, nullptr);
const int32_t* offsets =
reinterpret_cast<const int32_t*>(string_array->buffers[1]);
const char* string_data =
reinterpret_cast<const char*>(string_array->buffers[2]);
for (int i = 0; i < 5; ++i) {
int32_t start = offsets[i];
int32_t end = offsets[i + 1];
std::string expected_str = "test" + std::to_string(i);
std::string actual_str(string_data + start, end - start);
EXPECT_EQ(actual_str, expected_str);
}
if (array.release != nullptr) {
array.release(&array);
}
if (schema.release != nullptr) {
schema.release(&schema);
}
}
TEST(ArrowTsBlockTest, TsBlock_WithNulls) {
common::TupleDesc tuple_desc;
common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY,
common::RLE);
common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY,
common::RLE);
common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY,
common::RLE);
tuple_desc.push_back(col1);
tuple_desc.push_back(col2);
tuple_desc.push_back(col3);
common::TsBlock tsblock(&tuple_desc, 10);
ASSERT_EQ(tsblock.init(), common::E_OK);
common::RowAppender row_appender(&tsblock);
for (int i = 0; i < 5; ++i) {
ASSERT_TRUE(row_appender.add_row());
if (i == 1) {
row_appender.append_null(0);
row_appender.append_null(1);
row_appender.append_null(2);
} else if (i == 3) {
row_appender.append_null(0);
double double_val = 3.14 + i;
row_appender.append(1, reinterpret_cast<const char*>(&double_val),
sizeof(double));
std::string str_val = "test" + std::to_string(i);
row_appender.append(2, str_val.c_str(), str_val.length());
} else {
int32_t int_val = 100 + i;
row_appender.append(0, reinterpret_cast<const char*>(&int_val),
sizeof(int32_t));
double double_val = 3.14 + i;
row_appender.append(1, reinterpret_cast<const char*>(&double_val),
sizeof(double));
std::string str_val = "test" + std::to_string(i);
row_appender.append(2, str_val.c_str(), str_val.length());
}
}
EXPECT_EQ(tsblock.get_row_count(), 5);
arrow::ArrowArray array;
arrow::ArrowSchema schema;
int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema);
ASSERT_EQ(ret, common::E_OK);
std::vector<std::string> expected_names = {"int_col", "double_col",
"string_col"};
std::vector<const char*> expected_formats = {"i", "g", "u"};
VerifyArrowSchema(&schema, expected_names, expected_formats);
VerifyArrowArrayData(&array, 5);
const arrow::ArrowArray* int_array = array.children[0];
EXPECT_EQ(int_array->null_count, 2);
const arrow::ArrowArray* double_array = array.children[1];
EXPECT_EQ(double_array->null_count, 1);
const arrow::ArrowArray* string_array = array.children[2];
EXPECT_EQ(string_array->null_count, 1);
ASSERT_NE(int_array->buffers[0], nullptr);
const uint8_t* null_bitmap =
reinterpret_cast<const uint8_t*>(int_array->buffers[0]);
EXPECT_FALSE(null_bitmap[0] & (1 << 1));
EXPECT_FALSE(null_bitmap[0] & (1 << 3));
EXPECT_TRUE(null_bitmap[0] & (1 << 0));
EXPECT_TRUE(null_bitmap[0] & (1 << 2));
EXPECT_TRUE(null_bitmap[0] & (1 << 4));
const int32_t* int_data = reinterpret_cast<const int32_t*>(
int_array->buffers[int_array->n_buffers - 1]);
EXPECT_NE(int_data, nullptr);
if (array.release != nullptr) {
array.release(&array);
}
if (schema.release != nullptr) {
schema.release(&schema);
}
}
TEST(ArrowTsBlockTest, TsBlock_EdgeCases) {
{
common::TupleDesc tuple_desc;
common::ColumnSchema col1("single_col", common::INT64, common::SNAPPY,
common::RLE);
tuple_desc.push_back(col1);
common::TsBlock tsblock(&tuple_desc, 5);
ASSERT_EQ(tsblock.init(), common::E_OK);
common::RowAppender row_appender(&tsblock);
for (int i = 0; i < 3; ++i) {
ASSERT_TRUE(row_appender.add_row());
int64_t val = 1000 + i;
row_appender.append(0, reinterpret_cast<const char*>(&val),
sizeof(int64_t));
}
arrow::ArrowArray array;
arrow::ArrowSchema schema;
int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema);
ASSERT_EQ(ret, common::E_OK);
EXPECT_STREQ(schema.format, "+s");
EXPECT_EQ(schema.n_children, 1);
EXPECT_STREQ(schema.children[0]->name, "single_col");
EXPECT_STREQ(schema.children[0]->format, "l");
EXPECT_EQ(array.length, 3);
EXPECT_EQ(array.n_children, 1);
if (array.release != nullptr) {
array.release(&array);
}
if (schema.release != nullptr) {
schema.release(&schema);
}
}
{
common::TupleDesc tuple_desc;
common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY,
common::RLE);
common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY,
common::RLE);
tuple_desc.push_back(col1);
tuple_desc.push_back(col2);
const int row_count = 1000;
common::TsBlock tsblock(&tuple_desc, row_count);
ASSERT_EQ(tsblock.init(), common::E_OK);
common::RowAppender row_appender(&tsblock);
for (int i = 0; i < row_count; ++i) {
ASSERT_TRUE(row_appender.add_row());
int32_t int_val = i;
row_appender.append(0, reinterpret_cast<const char*>(&int_val),
sizeof(int32_t));
double double_val = i * 0.5;
row_appender.append(1, reinterpret_cast<const char*>(&double_val),
sizeof(double));
}
arrow::ArrowArray array;
arrow::ArrowSchema schema;
int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema);
ASSERT_EQ(ret, common::E_OK);
EXPECT_EQ(array.length, row_count);
EXPECT_EQ(array.n_children, 2);
const arrow::ArrowArray* int_array = array.children[0];
const int32_t* int_data =
reinterpret_cast<const int32_t*>(int_array->buffers[1]);
EXPECT_EQ(int_data[0], 0);
EXPECT_EQ(int_data[row_count - 1], row_count - 1);
const arrow::ArrowArray* double_array = array.children[1];
const double* double_data =
reinterpret_cast<const double*>(double_array->buffers[1]);
EXPECT_DOUBLE_EQ(double_data[0], 0.0);
EXPECT_DOUBLE_EQ(double_data[row_count - 1], (row_count - 1) * 0.5);
if (array.release != nullptr) {
array.release(&array);
}
if (schema.release != nullptr) {
schema.release(&schema);
}
}
}
// Test ArrowStructToTablet with sliced Arrow arrays (offset > 0).
// Full arrays have 5 rows; offset=2 on every child means only rows [2..4]
// (3 rows) are consumed. Row index 3 in the full array (local index 1 in the
// slice) carries a null in the INT32 column.
TEST(ArrowStructToTabletTest, SlicedArray_WithOffset) {
// --- timestamps (int64, no nulls) ---
int64_t ts_data[5] = {1000, 1001, 1002, 1003, 1004};
const void* ts_bufs[2] = {nullptr, ts_data};
ArrowArray ts_arr = {};
ts_arr.length = 3;
ts_arr.offset = 2;
ts_arr.null_count = 0;
ts_arr.n_buffers = 2;
ts_arr.buffers = ts_bufs;
ArrowSchema ts_schema = {};
ts_schema.format = "l";
ts_schema.name = "time";
ts_schema.flags = ARROW_FLAG_NULLABLE;
// --- INT32 column: values [100..104], row 3 (global) = local row 1 null
// Arrow validity bitmap: bit=1 means valid.
// bits 0,1,2,4=valid, bit 3=null → byte 0 = 0b00010111 = 0x17
int32_t int_data[5] = {100, 101, 102, 103, 104};
uint8_t int_validity[1] = {0x17};
const void* int_bufs[2] = {int_validity, int_data};
ArrowArray int_arr = {};
int_arr.length = 3;
int_arr.offset = 2;
int_arr.null_count = 1;
int_arr.n_buffers = 2;
int_arr.buffers = int_bufs;
ArrowSchema int_schema = {};
int_schema.format = "i";
int_schema.name = "int_col";
int_schema.flags = ARROW_FLAG_NULLABLE;
// --- DOUBLE column: values [10.0..14.0], no nulls ---
double dbl_data[5] = {10.0, 11.0, 12.0, 13.0, 14.0};
const void* dbl_bufs[2] = {nullptr, dbl_data};
ArrowArray dbl_arr = {};
dbl_arr.length = 3;
dbl_arr.offset = 2;
dbl_arr.null_count = 0;
dbl_arr.n_buffers = 2;
dbl_arr.buffers = dbl_bufs;
ArrowSchema dbl_schema = {};
dbl_schema.format = "g";
dbl_schema.name = "dbl_col";
dbl_schema.flags = ARROW_FLAG_NULLABLE;
// --- UTF-8 string column: "str0".."str4", no nulls ---
// With offset=2, the slice covers "str2","str3","str4".
const char str_chars[] = "str0str1str2str3str4";
int32_t str_offs[6] = {0, 4, 8, 12, 16, 20};
const void* str_bufs[3] = {nullptr, str_offs, str_chars};
ArrowArray str_arr = {};
str_arr.length = 3;
str_arr.offset = 2;
str_arr.null_count = 0;
str_arr.n_buffers = 3;
str_arr.buffers = str_bufs;
ArrowSchema str_schema = {};
str_schema.format = "u";
str_schema.name = "str_col";
str_schema.flags = ARROW_FLAG_NULLABLE;
// --- parent struct array ---
ArrowArray* children[4] = {&ts_arr, &int_arr, &dbl_arr, &str_arr};
ArrowArray parent = {};
parent.length = 3;
parent.n_buffers = 0;
parent.n_children = 4;
parent.children = children;
ArrowSchema* child_schemas[4] = {&ts_schema, &int_schema, &dbl_schema,
&str_schema};
ArrowSchema parent_schema = {};
parent_schema.format = "+s";
parent_schema.n_children = 4;
parent_schema.children = child_schemas;
storage::Tablet* tablet = nullptr;
// time_col_index=0 → timestamp from ts_arr; data cols are int, dbl, str
int ret = arrow::ArrowStructToTablet("test_table", &parent, &parent_schema,
nullptr, &tablet, 0);
ASSERT_EQ(ret, common::E_OK);
ASSERT_NE(tablet, nullptr);
EXPECT_EQ(tablet->get_cur_row_size(), 3u);
common::TSDataType dtype;
void* v;
// INT32 col (schema_index=0): local rows 0,1,2 → 102, null, 104
v = tablet->get_value(0, 0, dtype);
ASSERT_NE(v, nullptr);
EXPECT_EQ(*static_cast<int32_t*>(v), 102);
v = tablet->get_value(1, 0, dtype);
EXPECT_EQ(v, nullptr); // row 3 in original data is null
v = tablet->get_value(2, 0, dtype);
ASSERT_NE(v, nullptr);
EXPECT_EQ(*static_cast<int32_t*>(v), 104);
// DOUBLE col (schema_index=1): local rows 0,1,2 → 12.0, 13.0, 14.0
v = tablet->get_value(0, 1, dtype);
ASSERT_NE(v, nullptr);
EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 12.0);
v = tablet->get_value(1, 1, dtype);
ASSERT_NE(v, nullptr);
EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 13.0);
v = tablet->get_value(2, 1, dtype);
ASSERT_NE(v, nullptr);
EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 14.0);
// STRING col (schema_index=2): local rows 0,1,2 → "str2","str3","str4"
// Arrow "u" maps to common::TEXT; offset normalization in arrow_c.cc
// ensures offsets[0]==0 before calling set_column_string_values.
v = tablet->get_value(0, 2, dtype);
ASSERT_NE(v, nullptr);
{
common::String* s = static_cast<common::String*>(v);
EXPECT_EQ(std::string(s->buf_, s->len_), "str2");
}
v = tablet->get_value(1, 2, dtype);
ASSERT_NE(v, nullptr);
{
common::String* s = static_cast<common::String*>(v);
EXPECT_EQ(std::string(s->buf_, s->len_), "str3");
}
v = tablet->get_value(2, 2, dtype);
ASSERT_NE(v, nullptr);
{
common::String* s = static_cast<common::String*>(v);
EXPECT_EQ(std::string(s->buf_, s->len_), "str4");
}
delete tablet;
}