blob: 124e7b7ea3a761991413ddf3991e78ab32f3cdd9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <arrow/filesystem/localfs.h>
#include <arrow/result.h>
#include <gtest/gtest.h>
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/test/temp_file_test_base.h"
namespace iceberg::avro {
class AVROStreamTest : public TempFileTestBase {
public:
void SetUp() override {
TempFileTestBase::SetUp();
temp_filepath_ = CreateNewTempFilePath();
local_fs_ = std::make_shared<::arrow::fs::LocalFileSystem>();
}
std::shared_ptr<AvroOutputStream> CreateOutputStream(const std::string& path,
int64_t buffer_size) {
auto arrow_out_ret = local_fs_->OpenOutputStream(path);
if (!arrow_out_ret.ok()) {
throw std::runtime_error("Failed to open output stream: " +
arrow_out_ret.status().message());
}
return std::make_shared<AvroOutputStream>(std::move(arrow_out_ret.ValueUnsafe()),
buffer_size);
}
std::shared_ptr<AvroInputStream> CreateInputStream(const std::string& path,
int64_t buffer_size) {
auto arrow_in_ret = local_fs_->OpenInputFile(path);
if (!arrow_in_ret.ok()) {
throw std::runtime_error("Failed to open input stream: " +
arrow_in_ret.status().message());
}
return std::make_shared<AvroInputStream>(std::move(arrow_in_ret.ValueUnsafe()),
buffer_size);
}
void WriteDataToStream(const std::shared_ptr<AvroOutputStream>& avro_output_stream,
const std::string& data) {
uint8_t* buf;
size_t buf_size;
ASSERT_TRUE(avro_output_stream->next(&buf, &buf_size));
std::memcpy(buf, data.data(), data.size());
avro_output_stream->backup(1024 - data.size());
avro_output_stream->flush();
}
void ReadDataFromStream(const std::shared_ptr<AvroInputStream>& avro_input_stream,
std::string& data) {
const uint8_t* buf{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&buf, &len));
data = std::string(reinterpret_cast<const char*>(buf), len);
}
void CheckStreamEof(const std::shared_ptr<AvroInputStream>& avro_input_stream) {
const uint8_t* buf{};
size_t len{};
ASSERT_FALSE(avro_input_stream->next(&buf, &len));
}
int64_t buffer_size_ = 1024;
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::string temp_filepath_;
};
TEST_F(AVROStreamTest, TestAvroBasicStream) {
// Write test data
const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
WriteDataToStream(avro_output_stream, test_data);
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
const uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, test_data.size());
EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len), test_data);
ASSERT_FALSE(avro_input_stream->next(&data, &len));
}
TEST_F(AVROStreamTest, InputStreamBackup) {
// Write test data
const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
WriteDataToStream(avro_output_stream, test_data);
// Create a test input stream
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
// Read data
const uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, test_data.size());
// Backup 10 bytes
const size_t backupSize = 10;
avro_input_stream->backup(backupSize);
// Check byteCount after backup
EXPECT_EQ(avro_input_stream->byteCount(), test_data.size() - backupSize);
// Read the backed-up data again
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, backupSize);
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
test_data.substr(test_data.size() - backupSize));
// Check that we've reached the end of the stream
ASSERT_FALSE(avro_input_stream->next(&data, &len));
}
TEST_F(AVROStreamTest, InputStreamSkip) {
// Write test data
const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
WriteDataToStream(avro_output_stream, test_data);
// Create a test input stream
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
// Skip the first 10 bytes
const size_t skipSize = 10;
avro_input_stream->skip(skipSize);
// Check byteCount after skip
EXPECT_EQ(avro_input_stream->byteCount(), skipSize);
// Read the remaining data
const uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, test_data.size() - skipSize);
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
test_data.substr(skipSize));
// Check that we've reached the end of the stream
ASSERT_FALSE(avro_input_stream->next(&data, &len));
}
TEST_F(AVROStreamTest, InputStreamSeek) {
// Write test data
const std::string test_data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
WriteDataToStream(avro_output_stream, test_data);
// Create a test input stream
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
// Seek to position 15
const int64_t seekPos = 15;
avro_input_stream->seek(seekPos);
// Check byteCount after seek
EXPECT_EQ(avro_input_stream->byteCount(), static_cast<size_t>(seekPos));
// Read the remaining data
const uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_input_stream->next(&data, &len));
EXPECT_EQ(len, test_data.size() - seekPos);
EXPECT_EQ(std::string(reinterpret_cast<const char*>(data), len),
test_data.substr(seekPos));
// Check that we've reached the end of the stream
ASSERT_FALSE(avro_input_stream->next(&data, &len));
}
TEST_F(AVROStreamTest, OutputStreamBasic) {
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
// Test next()
uint8_t* data{};
size_t len{};
ASSERT_TRUE(avro_output_stream->next(&data, &len));
EXPECT_EQ(len, static_cast<size_t>(buffer_size_));
// Write some data
const std::string test_data = "Hello, Avro!";
std::memcpy(data, test_data.data(), test_data.size());
// Backup unused bytes
avro_output_stream->backup(len - test_data.size());
// Check byteCount
EXPECT_EQ(avro_output_stream->byteCount(), test_data.size());
// Flush the data
avro_output_stream->flush();
// Verify the written data
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
std::string read_data;
ReadDataFromStream(avro_input_stream, read_data);
EXPECT_EQ(read_data, test_data);
CheckStreamEof(avro_input_stream);
}
TEST_F(AVROStreamTest, OutputStreamMultipleWrites) {
auto avro_output_stream = CreateOutputStream(temp_filepath_, buffer_size_);
// Write first chunk
const std::string chunk1 = "First chunk of data. ";
uint8_t* data1{};
size_t len1{};
ASSERT_TRUE(avro_output_stream->next(&data1, &len1));
std::memcpy(data1, chunk1.data(), chunk1.size());
avro_output_stream->backup(len1 - chunk1.size());
// Write second chunk
const std::string chunk2 = "Second chunk of data.";
uint8_t* data2{};
size_t len2{};
ASSERT_TRUE(avro_output_stream->next(&data2, &len2));
std::memcpy(data2, chunk2.data(), chunk2.size());
avro_output_stream->backup(len2 - chunk2.size());
// Check byteCount
EXPECT_EQ(avro_output_stream->byteCount(), chunk1.size() + chunk2.size());
// Flush and close
avro_output_stream->flush();
// Verify the written data
std::string expectedData = chunk1 + chunk2;
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
std::string read_data;
ReadDataFromStream(avro_input_stream, read_data);
EXPECT_EQ(read_data, expectedData);
CheckStreamEof(avro_input_stream);
}
TEST_F(AVROStreamTest, InputStreamLargeData) {
// Create a large test data set (larger than the buffer size)
const size_t dataSize = buffer_size_ * 2.5;
std::vector<char> test_data(dataSize);
for (size_t i = 0; i < dataSize; ++i) {
test_data[i] = static_cast<char>(i % 256);
}
auto arrow_out_ret = local_fs_->OpenOutputStream(temp_filepath_);
if (!arrow_out_ret.ok()) {
throw std::runtime_error("Failed to open output stream: " +
arrow_out_ret.status().message());
}
auto arrow_out = arrow_out_ret.ValueUnsafe();
// Write the test data
auto status = arrow_out->Write(test_data.data(), test_data.size());
ASSERT_TRUE(status.ok());
status = arrow_out->Flush();
ASSERT_TRUE(status.ok());
status = arrow_out->Close();
ASSERT_TRUE(status.ok());
// Create an AvroInputStream with a smaller buffer
const int64_t smallBufferSize = buffer_size_ / 2;
auto avro_input_stream = CreateInputStream(temp_filepath_, smallBufferSize);
// Read all the data in chunks
std::vector<char> readData;
const uint8_t* data{};
size_t len{};
while (avro_input_stream->next(&data, &len)) {
readData.insert(readData.end(), data, data + len);
}
// Verify all data was read correctly
EXPECT_EQ(readData.size(), test_data.size());
EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0);
EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
}
TEST_F(AVROStreamTest, OutputStreamLargeData) {
// Create an AvroOutputStream with a small buffer
const int64_t smallBufferSize = 256;
auto avro_output_stream = CreateOutputStream(temp_filepath_, smallBufferSize);
// Create a large test data set (larger than the buffer size)
const size_t dataSize = smallBufferSize * 3.5;
std::vector<char> test_data(dataSize);
for (size_t i = 0; i < dataSize; ++i) {
test_data[i] = static_cast<char>(i % 256);
}
// Write the data in chunks
size_t bytesWritten = 0;
while (bytesWritten < dataSize) {
uint8_t* buffer{};
size_t bufferSize{};
ASSERT_TRUE(avro_output_stream->next(&buffer, &bufferSize));
size_t bytesToWrite = std::min(bufferSize, dataSize - bytesWritten);
std::memcpy(buffer, test_data.data() + bytesWritten, bytesToWrite);
if (bytesToWrite < bufferSize) {
avro_output_stream->backup(bufferSize - bytesToWrite);
}
bytesWritten += bytesToWrite;
}
// Flush and close
avro_output_stream->flush();
// Verify the written data
auto avro_input_stream = CreateInputStream(temp_filepath_, buffer_size_);
std::vector<char> readData;
const uint8_t* data{};
size_t len{};
while (avro_input_stream->next(&data, &len)) {
readData.insert(readData.end(), data, data + len);
}
// Verify all data was read correctly
EXPECT_EQ(readData.size(), test_data.size());
EXPECT_EQ(std::memcmp(readData.data(), test_data.data(), test_data.size()), 0);
EXPECT_EQ(avro_input_stream->byteCount(), test_data.size());
}
} // namespace iceberg::avro