blob: 1d08012f9ec32ab865f41d0470643cb9984569ba [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "mosaic.hpp"
#include <arrow/api.h>
#include <arrow/c/bridge.h>
#include <algorithm>
#include <cassert>
#include <cmath>
#include <cstdio>
#include <cstring>
#include <functional>
#include <vector>
#define ASSERT_EQ(a, b) do { if ((a) != (b)) { \
fprintf(stderr, "FAIL %s:%d: %s != %s\n", __FILE__, __LINE__, #a, #b); abort(); } } while(0)
#define ASSERT_TRUE(x) do { if (!(x)) { \
fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, #x); abort(); } } while(0)
struct MemBuffer {
std::vector<uint8_t> data;
size_t pos = 0;
};
static mosaic::OutputFile make_output(MemBuffer& buf) {
mosaic::OutputFile out;
out.write_fn = [&buf](const uint8_t* data, size_t len) -> int {
buf.data.insert(buf.data.end(), data, data + len);
buf.pos += len;
return 0;
};
out.flush_fn = [&buf]() -> int { return 0; };
out.get_pos_fn = [&buf]() -> int64_t { return static_cast<int64_t>(buf.pos); };
return out;
}
static mosaic::InputFile make_input(const MemBuffer& buf) {
mosaic::InputFile in;
in.read_at_fn = [&buf](uint64_t offset, uint8_t* dst, size_t len) -> int {
if (offset + len > buf.data.size()) return -1;
memcpy(dst, buf.data.data() + offset, len);
return 0;
};
in.file_length = buf.data.size();
return in;
}
static std::vector<uint8_t> write_and_get(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::RecordBatch>& batch,
mosaic::WriterOptions opts = {})
{
MemBuffer buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
return buf.data;
}
static std::shared_ptr<arrow::RecordBatch> read_row_group(
mosaic::Reader& reader, uint32_t rg)
{
struct ArrowArray c_array;
struct ArrowSchema c_schema;
reader.read_row_group(rg, &c_array, &c_schema);
auto result = arrow::ImportRecordBatch(&c_array, &c_schema);
assert(result.ok());
return result.ValueUnsafe();
}
// ======================== Tests ========================
static void test_basic_roundtrip() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32(), false),
arrow::field("name", arrow::utf8()),
arrow::field("score", arrow::float64()),
});
arrow::Int32Builder id_b;
arrow::StringBuilder name_b;
arrow::DoubleBuilder score_b;
for (int i = 0; i < 50; i++) {
assert(id_b.Append(i).ok());
assert(name_b.Append("user_" + std::to_string(i)).ok());
assert(score_b.Append(i * 1.5).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 50, {
id_b.Finish().ValueUnsafe(),
name_b.Finish().ValueUnsafe(),
score_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 2;
auto data_vec = write_and_get(schema, batch, opts);
ASSERT_TRUE(data_vec.size() > 32);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
ASSERT_TRUE(reader.num_row_groups() >= 1);
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_rows(), 50);
ASSERT_EQ(rb->num_columns(), 3);
auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("id"));
auto names = std::static_pointer_cast<arrow::StringArray>(rb->GetColumnByName("name"));
auto scores = std::static_pointer_cast<arrow::DoubleArray>(rb->GetColumnByName("score"));
for (int i = 0; i < 50; i++) {
ASSERT_EQ(ids->Value(i), i);
ASSERT_EQ(names->GetString(i), "user_" + std::to_string(i));
ASSERT_TRUE(std::abs(scores->Value(i) - i * 1.5) < 1e-9);
}
printf(" PASS test_basic_roundtrip\n");
}
static void test_null_values() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("name", arrow::utf8()),
});
arrow::Int32Builder id_b;
assert(id_b.Append(1).ok());
assert(id_b.Append(2).ok());
assert(id_b.Append(3).ok());
arrow::StringBuilder name_b;
assert(name_b.Append("hello").ok());
assert(name_b.AppendNull().ok());
assert(name_b.Append("world").ok());
auto batch = arrow::RecordBatch::Make(schema, 3, {
id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe()});
auto data_vec = write_and_get(schema, batch);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_rows(), 3);
auto names = std::static_pointer_cast<arrow::StringArray>(rb->GetColumnByName("name"));
ASSERT_TRUE(!names->IsNull(0));
ASSERT_EQ(names->GetString(0), "hello");
ASSERT_TRUE(names->IsNull(1));
ASSERT_TRUE(!names->IsNull(2));
ASSERT_EQ(names->GetString(2), "world");
printf(" PASS test_null_values\n");
}
static void test_all_types() {
auto schema = arrow::schema({
arrow::field("f_bool", arrow::boolean()),
arrow::field("f_int8", arrow::int8()),
arrow::field("f_int16", arrow::int16()),
arrow::field("f_int32", arrow::int32()),
arrow::field("f_int64", arrow::int64()),
arrow::field("f_float32", arrow::float32()),
arrow::field("f_float64", arrow::float64()),
arrow::field("f_utf8", arrow::utf8()),
arrow::field("f_binary", arrow::binary()),
});
arrow::BooleanBuilder bool_b;
assert(bool_b.Append(true).ok());
arrow::Int8Builder i8_b;
assert(i8_b.Append(42).ok());
arrow::Int16Builder i16_b;
assert(i16_b.Append(1234).ok());
arrow::Int32Builder i32_b;
assert(i32_b.Append(100000).ok());
arrow::Int64Builder i64_b;
assert(i64_b.Append(9999999999LL).ok());
arrow::FloatBuilder f32_b;
assert(f32_b.Append(3.14f).ok());
arrow::DoubleBuilder f64_b;
assert(f64_b.Append(2.718281828).ok());
arrow::StringBuilder utf8_b;
assert(utf8_b.Append("hello").ok());
arrow::BinaryBuilder bin_b;
uint8_t bin_data[] = {0x01, 0x02};
assert(bin_b.Append(bin_data, 2).ok());
auto batch = arrow::RecordBatch::Make(schema, 1, {
bool_b.Finish().ValueUnsafe(),
i8_b.Finish().ValueUnsafe(),
i16_b.Finish().ValueUnsafe(),
i32_b.Finish().ValueUnsafe(),
i64_b.Finish().ValueUnsafe(),
f32_b.Finish().ValueUnsafe(),
f64_b.Finish().ValueUnsafe(),
utf8_b.Finish().ValueUnsafe(),
bin_b.Finish().ValueUnsafe(),
});
auto data_vec = write_and_get(schema, batch);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_rows(), 1);
ASSERT_EQ(rb->num_columns(), 9);
ASSERT_TRUE(std::static_pointer_cast<arrow::BooleanArray>(rb->GetColumnByName("f_bool"))->Value(0));
ASSERT_EQ(std::static_pointer_cast<arrow::Int8Array>(rb->GetColumnByName("f_int8"))->Value(0), 42);
ASSERT_EQ(std::static_pointer_cast<arrow::Int16Array>(rb->GetColumnByName("f_int16"))->Value(0), 1234);
ASSERT_EQ(std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("f_int32"))->Value(0), 100000);
ASSERT_EQ(std::static_pointer_cast<arrow::Int64Array>(rb->GetColumnByName("f_int64"))->Value(0), 9999999999LL);
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::FloatArray>(rb->GetColumnByName("f_float32"))->Value(0) - 3.14f) < 1e-5f);
ASSERT_TRUE(std::abs(std::static_pointer_cast<arrow::DoubleArray>(rb->GetColumnByName("f_float64"))->Value(0) - 2.718281828) < 1e-9);
ASSERT_EQ(std::static_pointer_cast<arrow::StringArray>(rb->GetColumnByName("f_utf8"))->GetString(0), "hello");
printf(" PASS test_all_types\n");
}
static void test_timestamp_ns_roundtrip() {
auto ts_ns_type = arrow::timestamp(arrow::TimeUnit::NANO);
auto ts_ns_tz_type = arrow::timestamp(arrow::TimeUnit::NANO, "Asia/Shanghai");
auto schema = arrow::schema({
arrow::field("ts_ns", ts_ns_type),
arrow::field("ts_ns_tz", ts_ns_tz_type),
});
const int64_t values[] = {1700000000000000123LL, -1LL};
arrow::TimestampBuilder ts_ns_b(ts_ns_type, arrow::default_memory_pool());
assert(ts_ns_b.Append(values[0]).ok());
assert(ts_ns_b.AppendNull().ok());
assert(ts_ns_b.Append(values[1]).ok());
arrow::TimestampBuilder ts_ns_tz_b(ts_ns_tz_type, arrow::default_memory_pool());
assert(ts_ns_tz_b.Append(values[0]).ok());
assert(ts_ns_tz_b.AppendNull().ok());
assert(ts_ns_tz_b.Append(values[1]).ok());
auto batch = arrow::RecordBatch::Make(schema, 3, {
ts_ns_b.Finish().ValueUnsafe(),
ts_ns_tz_b.Finish().ValueUnsafe(),
});
auto data_vec = write_and_get(schema, batch);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto rb = read_row_group(reader, 0);
ASSERT_TRUE(rb->schema()->field(0)->type()->Equals(ts_ns_type));
ASSERT_TRUE(rb->schema()->field(1)->type()->Equals(ts_ns_tz_type));
auto ts_ns = std::static_pointer_cast<arrow::TimestampArray>(rb->column(0));
auto ts_ns_tz = std::static_pointer_cast<arrow::TimestampArray>(rb->column(1));
ASSERT_EQ(ts_ns->Value(0), values[0]);
ASSERT_TRUE(ts_ns->IsNull(1));
ASSERT_EQ(ts_ns->Value(2), values[1]);
ASSERT_EQ(ts_ns_tz->Value(0), values[0]);
ASSERT_TRUE(ts_ns_tz->IsNull(1));
ASSERT_EQ(ts_ns_tz->Value(2), values[1]);
printf(" PASS test_timestamp_ns_roundtrip\n");
}
static void test_projection() {
auto schema = arrow::schema({
arrow::field("a", arrow::int32()),
arrow::field("b", arrow::utf8()),
arrow::field("c", arrow::float64()),
arrow::field("d", arrow::utf8()),
});
arrow::Int32Builder ab;
arrow::StringBuilder bb, db;
arrow::DoubleBuilder cb;
for (int i = 0; i < 20; i++) {
assert(ab.Append(i).ok());
assert(bb.Append("val_" + std::to_string(i)).ok());
assert(cb.Append(static_cast<double>(i)).ok());
assert(db.Append("extra_" + std::to_string(i)).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 20, {
ab.Finish().ValueUnsafe(), bb.Finish().ValueUnsafe(),
cb.Finish().ValueUnsafe(), db.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 2;
auto data_vec = write_and_get(schema, batch, opts);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
const char* cols[] = {"c", "a", "b"};
reader.set_projection(cols, 3);
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_columns(), 3);
ASSERT_EQ(rb->num_rows(), 20);
ASSERT_EQ(rb->schema()->field(0)->name(), "c");
ASSERT_EQ(rb->schema()->field(1)->name(), "a");
ASSERT_EQ(rb->schema()->field(2)->name(), "b");
printf(" PASS test_projection\n");
}
static void test_projection_empty() {
auto schema = arrow::schema({
arrow::field("a", arrow::int32()),
arrow::field("b", arrow::utf8()),
});
arrow::Int32Builder ab;
arrow::StringBuilder bb;
for (int i = 0; i < 5; i++) {
assert(ab.Append(i).ok());
assert(bb.Append("v" + std::to_string(i)).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 5, {
ab.Finish().ValueUnsafe(), bb.Finish().ValueUnsafe(),
});
auto data_vec = write_and_get(schema, batch);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
reader.set_projection(nullptr, 0);
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_columns(), 0);
ASSERT_EQ(rb->num_rows(), 5);
printf(" PASS test_projection_empty\n");
}
static void test_statistics() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("name", arrow::utf8()),
arrow::field("score", arrow::float64()),
});
arrow::Int32Builder id_b;
arrow::StringBuilder name_b;
arrow::DoubleBuilder score_b;
for (int i = 0; i < 10; i++) {
assert(id_b.Append(i * 10).ok());
assert(name_b.Append("item_" + std::to_string(i)).ok());
assert(score_b.Append(i * 1.1).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 10, {
id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
score_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
const char* stats_cols[] = {"id", "score"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 2;
auto data_vec = write_and_get(schema, batch, opts);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto stats = reader.get_row_group_statistics(0);
ASSERT_TRUE(stats.size() > 0);
for (auto& s : stats) {
ASSERT_TRUE(s.column_name == "id" || s.column_name == "score");
ASSERT_EQ(s.null_count, 0u);
ASSERT_TRUE(s.has_min_max());
}
printf(" PASS test_statistics\n");
}
static void test_compression_zstd() {
auto schema = arrow::schema({
arrow::field("x", arrow::int32()),
arrow::field("y", arrow::utf8()),
});
arrow::Int32Builder xb;
arrow::StringBuilder yb;
for (int i = 0; i < 100; i++) {
assert(xb.Append(i).ok());
assert(yb.Append("v_" + std::to_string(i)).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 100, {
xb.Finish().ValueUnsafe(), yb.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.compression = 1;
opts.zstd_level = 3;
auto data_vec = write_and_get(schema, batch, opts);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto rb = read_row_group(reader, 0);
ASSERT_EQ(rb->num_rows(), 100);
auto xs = std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("x"));
for (int i = 0; i < 100; i++) {
ASSERT_EQ(xs->Value(i), i);
}
printf(" PASS test_compression_zstd\n");
}
static void test_schema_roundtrip() {
auto schema = arrow::schema({
arrow::field("name", arrow::utf8(), true),
arrow::field("id", arrow::int32(), false),
arrow::field("score", arrow::float64(), true),
});
arrow::StringBuilder sr_name_b;
assert(sr_name_b.Append("x").ok());
arrow::Int32Builder sr_id_b;
assert(sr_id_b.Append(1).ok());
arrow::DoubleBuilder sr_score_b;
assert(sr_score_b.Append(1.0).ok());
auto batch = arrow::RecordBatch::Make(schema, 1, {
sr_name_b.Finish().ValueUnsafe(),
sr_id_b.Finish().ValueUnsafe(),
sr_score_b.Finish().ValueUnsafe(),
});
auto data_vec = write_and_get(schema, batch);
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
struct ArrowSchema c_schema;
reader.export_schema(&c_schema);
auto imported = arrow::ImportSchema(&c_schema);
assert(imported.ok());
auto read_schema = imported.ValueUnsafe();
ASSERT_EQ(read_schema->num_fields(), 3);
ASSERT_EQ(read_schema->field(0)->name(), "name");
ASSERT_EQ(read_schema->field(1)->name(), "id");
ASSERT_EQ(read_schema->field(2)->name(), "score");
ASSERT_TRUE(read_schema->field(0)->nullable());
ASSERT_TRUE(!read_schema->field(1)->nullable());
printf(" PASS test_schema_roundtrip\n");
}
static void test_multiple_row_groups() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("data", arrow::int64()),
});
mosaic::WriterOptions opts;
opts.compression = 0;
opts.num_buckets = 1;
opts.row_group_max_size = 200;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
const int total_rows = 500;
const int batch_size = 50;
for (int start = 0; start < total_rows; start += batch_size) {
int end = std::min(start + batch_size, total_rows);
int n = end - start;
arrow::Int32Builder id_b;
arrow::Int64Builder data_b;
for (int i = start; i < end; i++) {
assert(id_b.Append(i).ok());
assert(data_b.Append(static_cast<int64_t>(i) * 3).ok());
}
auto batch = arrow::RecordBatch::Make(schema, n, {
id_b.Finish().ValueUnsafe(), data_b.Finish().ValueUnsafe(),
});
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
}
writer.close();
auto data_vec = write_buf.data;
MemBuffer buf;
buf.data = data_vec;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
ASSERT_TRUE(reader.num_row_groups() > 1);
int offset = 0;
for (uint32_t rg = 0; rg < reader.num_row_groups(); rg++) {
auto rb = read_row_group(reader, rg);
auto ids = std::static_pointer_cast<arrow::Int32Array>(rb->GetColumnByName("id"));
auto datas = std::static_pointer_cast<arrow::Int64Array>(rb->GetColumnByName("data"));
for (int64_t i = 0; i < rb->num_rows(); i++) {
ASSERT_EQ(ids->Value(i), offset + static_cast<int>(i));
ASSERT_EQ(datas->Value(i), static_cast<int64_t>(offset + i) * 3);
}
offset += static_cast<int>(rb->num_rows());
}
ASSERT_EQ(offset, 500);
printf(" PASS test_multiple_row_groups\n");
}
static void test_writer_stats() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("name", arrow::utf8()),
arrow::field("score", arrow::float64()),
});
arrow::Int32Builder id_b;
arrow::StringBuilder name_b;
arrow::DoubleBuilder score_b;
for (int i = 0; i < 10; i++) {
assert(id_b.Append(i * 10).ok());
assert(name_b.Append("item_" + std::to_string(i)).ok());
assert(score_b.Append(i * 1.1).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 10, {
id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
score_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
const char* stats_cols[] = {"id", "score"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 2;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
ASSERT_EQ(writer.num_row_groups(), 1u);
auto stats = writer.get_row_group_statistics(0);
ASSERT_TRUE(stats.size() > 0);
for (auto& s : stats) {
ASSERT_TRUE(s.column_name == "id" || s.column_name == "score");
ASSERT_EQ(s.null_count, 0u);
ASSERT_TRUE(s.has_min_max());
}
auto id_stat = std::find_if(stats.begin(), stats.end(),
[](const mosaic::ColumnStatistics& s) { return s.column_name == "id"; });
ASSERT_TRUE(id_stat != stats.end());
ASSERT_EQ(id_stat->min_value.size(), 4u);
ASSERT_EQ(id_stat->max_value.size(), 4u);
int32_t min_id = 0, max_id = 0;
memcpy(&min_id, id_stat->min_value.data(), 4);
memcpy(&max_id, id_stat->max_value.data(), 4);
min_id = __builtin_bswap32(min_id);
max_id = __builtin_bswap32(max_id);
ASSERT_EQ(min_id, 0);
ASSERT_EQ(max_id, 90);
printf(" PASS test_writer_stats\n");
}
static void test_writer_stats_with_nulls() {
auto schema = arrow::schema({
arrow::field("a", arrow::int32()),
arrow::field("b", arrow::int64()),
});
arrow::Int32Builder a_b;
assert(a_b.Append(10).ok());
assert(a_b.AppendNull().ok());
assert(a_b.Append(5).ok());
assert(a_b.Append(20).ok());
arrow::Int64Builder b_b;
assert(b_b.AppendNull().ok());
assert(b_b.AppendNull().ok());
assert(b_b.Append(100).ok());
assert(b_b.Append(50).ok());
auto batch = arrow::RecordBatch::Make(schema, 4, {
a_b.Finish().ValueUnsafe(), b_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 1;
const char* stats_cols[] = {"a", "b"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 2;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
ASSERT_EQ(writer.num_row_groups(), 1u);
auto stats = writer.get_row_group_statistics(0);
ASSERT_EQ(stats.size(), 2u);
auto a_stat = std::find_if(stats.begin(), stats.end(),
[](const mosaic::ColumnStatistics& s) { return s.column_name == "a"; });
ASSERT_TRUE(a_stat != stats.end());
ASSERT_EQ(a_stat->null_count, 1u);
ASSERT_TRUE(a_stat->has_min_max());
int32_t min_a = 0, max_a = 0;
memcpy(&min_a, a_stat->min_value.data(), 4);
memcpy(&max_a, a_stat->max_value.data(), 4);
min_a = __builtin_bswap32(min_a);
max_a = __builtin_bswap32(max_a);
ASSERT_EQ(min_a, 5);
ASSERT_EQ(max_a, 20);
auto b_stat = std::find_if(stats.begin(), stats.end(),
[](const mosaic::ColumnStatistics& s) { return s.column_name == "b"; });
ASSERT_TRUE(b_stat != stats.end());
ASSERT_EQ(b_stat->null_count, 2u);
ASSERT_TRUE(b_stat->has_min_max());
int64_t min_b = 0, max_b = 0;
memcpy(&min_b, b_stat->min_value.data(), 8);
memcpy(&max_b, b_stat->max_value.data(), 8);
min_b = __builtin_bswap64(min_b);
max_b = __builtin_bswap64(max_b);
ASSERT_EQ(min_b, 50);
ASSERT_EQ(max_b, 100);
printf(" PASS test_writer_stats_with_nulls\n");
}
static void test_writer_stats_all_null() {
auto schema = arrow::schema({
arrow::field("x", arrow::int32()),
});
arrow::Int32Builder x_b;
assert(x_b.AppendNull().ok());
assert(x_b.AppendNull().ok());
assert(x_b.AppendNull().ok());
auto batch = arrow::RecordBatch::Make(schema, 3, {
x_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 1;
const char* stats_cols[] = {"x"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 1;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
ASSERT_EQ(writer.num_row_groups(), 1u);
auto stats = writer.get_row_group_statistics(0);
ASSERT_EQ(stats.size(), 1u);
ASSERT_EQ(stats[0].column_name, "x");
ASSERT_EQ(stats[0].null_count, 3u);
ASSERT_TRUE(!stats[0].has_min_max());
printf(" PASS test_writer_stats_all_null\n");
}
static void test_writer_stats_matches_reader() {
auto schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("value", arrow::float64()),
});
arrow::Int32Builder id_b;
arrow::DoubleBuilder val_b;
for (int i = 0; i < 20; i++) {
assert(id_b.Append(i * 5).ok());
assert(val_b.Append(i * 2.5).ok());
}
auto batch = arrow::RecordBatch::Make(schema, 20, {
id_b.Finish().ValueUnsafe(), val_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 1;
const char* stats_cols[] = {"id", "value"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 2;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
auto writer_stats = writer.get_row_group_statistics(0);
MemBuffer buf;
buf.data = write_buf.data;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto reader_stats = reader.get_row_group_statistics(0);
ASSERT_EQ(writer_stats.size(), reader_stats.size());
for (size_t i = 0; i < writer_stats.size(); i++) {
ASSERT_EQ(writer_stats[i].column_name, reader_stats[i].column_name);
ASSERT_EQ(writer_stats[i].null_count, reader_stats[i].null_count);
ASSERT_EQ(writer_stats[i].min_value, reader_stats[i].min_value);
ASSERT_EQ(writer_stats[i].max_value, reader_stats[i].max_value);
}
printf(" PASS test_writer_stats_matches_reader\n");
}
static void test_stats_empty_string_min() {
auto schema = arrow::schema({
arrow::field("s", arrow::utf8()),
});
arrow::StringBuilder s_b;
assert(s_b.Append("").ok());
assert(s_b.Append("b").ok());
auto batch = arrow::RecordBatch::Make(schema, 2, {
s_b.Finish().ValueUnsafe(),
});
mosaic::WriterOptions opts;
opts.num_buckets = 1;
const char* stats_cols[] = {"s"};
opts.stats_columns = stats_cols;
opts.num_stats_columns = 1;
MemBuffer write_buf;
struct ArrowSchema c_schema;
auto st = arrow::ExportSchema(*schema, &c_schema);
assert(st.ok());
mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
struct ArrowArray c_array;
struct ArrowSchema c_batch_schema;
st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
assert(st.ok());
writer.write(&c_array, &c_batch_schema);
writer.close();
// Writer stats
ASSERT_EQ(writer.num_row_groups(), 1u);
auto writer_stats = writer.get_row_group_statistics(0);
ASSERT_EQ(writer_stats.size(), 1u);
ASSERT_EQ(writer_stats[0].column_name, "s");
ASSERT_TRUE(writer_stats[0].has_min_max());
ASSERT_EQ(writer_stats[0].min_value.size(), 0u);
ASSERT_EQ(writer_stats[0].max_value, (std::vector<uint8_t>{'b'}));
ASSERT_EQ(writer_stats[0].null_count, 0u);
// Reader stats
MemBuffer buf;
buf.data = write_buf.data;
auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
auto reader_stats = reader.get_row_group_statistics(0);
ASSERT_EQ(reader_stats.size(), 1u);
ASSERT_EQ(reader_stats[0].column_name, "s");
ASSERT_TRUE(reader_stats[0].has_min_max());
ASSERT_EQ(reader_stats[0].min_value.size(), 0u);
ASSERT_EQ(reader_stats[0].max_value, (std::vector<uint8_t>{'b'}));
ASSERT_EQ(reader_stats[0].null_count, 0u);
printf(" PASS test_stats_empty_string_min\n");
}
int main() {
printf("Running Mosaic C++ tests...\n");
test_basic_roundtrip();
test_null_values();
test_all_types();
test_timestamp_ns_roundtrip();
test_projection();
test_projection_empty();
test_statistics();
test_compression_zstd();
test_schema_roundtrip();
test_multiple_row_groups();
test_writer_stats();
test_writer_stats_with_nulls();
test_writer_stats_all_null();
test_writer_stats_matches_reader();
test_stats_empty_string_min();
printf("All %d tests passed.\n", 15);
return 0;
}