blob: f27d19ef32769e205fc6161b217856367c91b179 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include "olap/byte_buffer.h"
#include "olap/file_stream.h"
#include "olap/in_stream.h"
#include "olap/out_stream.h"
#include "olap/rowset/column_reader.h"
#include "olap/rowset/run_length_byte_reader.h"
#include "olap/rowset/run_length_byte_writer.h"
#include "olap/stream_index_reader.h"
#include "olap/stream_index_writer.h"
#include "util/logging.h"
namespace doris {
using namespace testing;
TEST(TestStream, UncompressOutStream) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
out_stream->write(0x5a);
out_stream->flush();
ASSERT_EQ(out_stream->get_stream_length(), sizeof(StreamHead) + 1);
ASSERT_EQ(out_stream->output_buffers().size(), 1);
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_EQ((*it)->position(), 0);
StreamHead head;
(*it)->get((char*)&head, sizeof(head));
ASSERT_EQ(head.type, StreamHead::UNCOMPRESSED);
ASSERT_EQ(head.length, 1);
char data;
ASSERT_EQ(OLAP_SUCCESS, (*it)->get((char*)&data));
ASSERT_EQ(0x5A, data);
ASSERT_NE(OLAP_SUCCESS, (*it)->get((char*)&data));
SAFE_DELETE(out_stream);
}
TEST(TestStream, UncompressOutStream2) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
out_stream->write(0x5a);
}
out_stream->write(0x5a);
out_stream->flush();
uint64_t stream_length = sizeof(StreamHead) * 2 + OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + 1;
ASSERT_EQ(out_stream->get_stream_length(), stream_length);
ASSERT_EQ(out_stream->output_buffers().size(), 2);
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(sizeof(StreamHead) + OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), NULL,
out_stream->get_total_buffer_size());
char data;
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE - 1; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, UncompressOutStream3) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
out_stream->write(0x5a);
}
char write_data[2] = {0x5a, 0x5a};
out_stream->write(write_data, 2);
out_stream->flush();
uint64_t stream_length = sizeof(StreamHead) * 2 + OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE + 2;
ASSERT_EQ(out_stream->get_stream_length(), stream_length);
ASSERT_EQ(out_stream->output_buffers().size(), 2);
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(sizeof(StreamHead) + OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), NULL,
out_stream->get_total_buffer_size());
char data;
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(in_stream);
SAFE_DELETE(out_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, UncompressInStream) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
out_stream->write(0x5a);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
const auto& it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
inputs.push_back(StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity()));
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), NULL,
out_stream->get_total_buffer_size());
SAFE_DELETE(out_stream);
ASSERT_EQ(in_stream->available(), 1);
char data;
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
// the length after compress must be smaller than original stream, then the compressor will be called.
TEST(TestStream, CompressOutStream) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
char* write_data = new char[OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE];
memset(write_data, 0x5a, OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
out_stream->write(write_data, OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
out_stream->flush();
//ASSERT_EQ(out_stream->get_stream_length(), sizeof(StreamHead) + 2);
//ASSERT_EQ(out_stream->output_buffers().size(), 1);
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
StreamHead head;
(*it)->get((char*)&head, sizeof(head));
ASSERT_EQ(head.type, StreamHead::COMPRESSED);
// if lzo, this should be 49
ASSERT_EQ(51, head.length);
SAFE_DELETE_ARRAY(write_data);
SAFE_DELETE(out_stream);
}
TEST(TestStream, CompressOutStream2) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
out_stream->write(0x5a);
}
out_stream->write(0x5a);
out_stream->flush();
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(59); // if lzo, this should be 57
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
char data;
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(in_stream);
SAFE_DELETE(out_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, CompressOutStream3) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
out_stream->write(0x5a);
}
char write_data[100];
for (int32_t i = 0; i < sizeof(write_data); i++) {
write_data[i] = 0x5a;
}
out_stream->write(write_data, sizeof(write_data));
out_stream->flush();
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(57);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
char data;
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
for (int32_t i = 0; i < sizeof(write_data); i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, write_data[i]);
}
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(in_stream);
SAFE_DELETE(out_stream);
for (auto input : inputs) {
delete input;
}
}
//test for _slice() in [while (len > 0 && m_current_range < m_inputs.size())]
TEST(TestStream, CompressOutStream4) {
// write data
OutStream* out_stream = new (std::nothrow) OutStream(18, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
for (int32_t i = 0; i < 15; i++) {
out_stream->write(0x5a);
}
out_stream->_spill();
for (int32_t i = 0; i < 12; i++) {
out_stream->write(0x5a);
}
for (int32_t i = 0; i < 6; i++) {
out_stream->write(i);
}
out_stream->flush();
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(16);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
char data;
for (int32_t i = 0; i < 15; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
for (int32_t i = 0; i < 12; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
for (int32_t i = 0; i < 6; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, i);
}
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(in_stream);
SAFE_DELETE(out_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, CompressMassOutStream) {
// write data
OutStream* out_stream = new (std::nothrow) OutStream(100, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
for (int32_t i = 0; i < 100; i++) {
out_stream->write(0x5a);
}
//out_stream->write(0);
for (int32_t i = 0; i < 100; i++) {
out_stream->write(i);
}
//out_stream->write(100);
out_stream->flush();
std::vector<StorageByteBuffer*> inputs;
for (const auto& it : out_stream->output_buffers()) {
inputs.push_back(StorageByteBuffer::reference_buffer(it, 0, it->limit()));
}
std::vector<uint64_t> offsets;
offsets.push_back(0);
offsets.push_back(19); // if lzo, this should be 17
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
SAFE_DELETE(out_stream);
char data;
for (int32_t i = 0; i < 100; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
for (int32_t i = 0; i < 100; i++) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, i);
}
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, CompressInStream) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor != NULL);
char* write_data = new char[OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE];
memset(write_data, 0x5a, OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
out_stream->write(write_data, OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
StorageByteBuffer* tmp_byte_buffer =
StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity());
inputs.push_back(tmp_byte_buffer);
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
ASSERT_EQ(in_stream->available(), OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE);
char data;
for (int32_t i = 0; i < OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE - 1; ++i) {
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
}
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5a);
ASSERT_NE(in_stream->read(&data), OLAP_SUCCESS);
SAFE_DELETE_ARRAY(write_data);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, SeekUncompress) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
out_stream->write(0x5a);
PositionEntryWriter index_entry;
out_stream->get_position(&index_entry);
out_stream->write(0x5b);
ASSERT_EQ(index_entry.positions_count(), 2);
ASSERT_EQ(index_entry.positions(0), 0);
ASSERT_EQ(index_entry.positions(1), 1);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
StorageByteBuffer* tmp_byte_buffer =
StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity());
inputs.push_back(tmp_byte_buffer);
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), NULL,
out_stream->get_total_buffer_size());
ASSERT_EQ(in_stream->available(), 2);
char buffer[256];
index_entry.write_to_buffer(buffer);
StreamIndexHeader header;
header.position_format = index_entry.positions_count();
header.statistic_format = OLAP_FIELD_TYPE_TINYINT;
PositionEntryReader entry;
entry.init(&header, OLAP_FIELD_TYPE_TINYINT, false);
entry.attach(buffer);
PositionProvider position(&entry);
ASSERT_EQ(entry.positions_count(), 2);
ASSERT_EQ(entry.positions(0), 0);
ASSERT_EQ(entry.positions(1), 1);
in_stream->seek(&position);
char data;
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5b);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, SkipUncompress) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(out_stream != NULL);
ASSERT_TRUE(out_stream->_compressor == NULL);
char write_data[] = {0x5a, 0x5b, 0x5c, 0x5d};
for (int32_t i = 0; i < sizeof(write_data); i++) {
out_stream->write(write_data[i]);
}
out_stream->write(0x5e);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
StorageByteBuffer* tmp_byte_buffer =
StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity());
inputs.push_back(tmp_byte_buffer);
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(), NULL,
out_stream->get_total_buffer_size());
ASSERT_EQ(in_stream->available(), sizeof(write_data) + 1);
in_stream->skip(sizeof(write_data) - 1);
char data;
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, write_data[sizeof(write_data) - 1]);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, SeekCompress) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
for (int32_t i = 0; i < 10; i++) {
out_stream->write(0x5a);
}
PositionEntryWriter index_entry;
out_stream->get_position(&index_entry);
out_stream->write(0x5b);
ASSERT_EQ(index_entry.positions_count(), 2);
ASSERT_EQ(index_entry.positions(0), 0);
ASSERT_EQ(index_entry.positions(1), 10);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
StorageByteBuffer* tmp_byte_buffer =
StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity());
inputs.push_back(tmp_byte_buffer);
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
//ASSERT_EQ(in_stream->available(), 2);
char buffer[256];
index_entry.write_to_buffer(buffer);
StreamIndexHeader header;
header.position_format = index_entry.positions_count();
header.statistic_format = OLAP_FIELD_TYPE_TINYINT;
PositionEntryReader entry;
entry.init(&header, OLAP_FIELD_TYPE_TINYINT, false);
entry.attach(buffer);
PositionProvider position(&entry);
in_stream->seek(&position);
char data;
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5b);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
TEST(TestStream, SkipCompress) {
// write data
OutStream* out_stream =
new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, lz4_compress);
ASSERT_TRUE(out_stream != NULL);
for (int32_t i = 0; i < 10; i++) {
out_stream->write(0x5a);
}
out_stream->write(0x5e);
out_stream->flush();
// read data
std::vector<StorageByteBuffer*> inputs;
std::vector<StorageByteBuffer*>::const_iterator it = out_stream->output_buffers().begin();
ASSERT_NE(it, out_stream->output_buffers().end());
StorageByteBuffer* tmp_byte_buffer =
StorageByteBuffer::reference_buffer(*it, 0, (*it)->capacity());
inputs.push_back(tmp_byte_buffer);
std::vector<uint64_t> offsets;
offsets.assign(inputs.size(), 0);
InStream* in_stream =
new (std::nothrow) InStream(&inputs, offsets, out_stream->get_stream_length(),
lz4_decompress, out_stream->get_total_buffer_size());
in_stream->skip(10);
char data;
ASSERT_EQ(in_stream->read(&data), OLAP_SUCCESS);
ASSERT_EQ(data, 0x5e);
SAFE_DELETE(out_stream);
SAFE_DELETE(in_stream);
for (auto input : inputs) {
delete input;
}
}
class TestRunLengthByte : public testing::Test {
public:
TestRunLengthByte() {}
virtual ~TestRunLengthByte() {}
virtual void SetUp() {
system("mkdir -p ./ut_dir");
system("rm -rf ./ut_dir/tmp_file");
_out_stream = new (std::nothrow) OutStream(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, NULL);
ASSERT_TRUE(_out_stream != NULL);
_writer = new (std::nothrow) RunLengthByteWriter(_out_stream);
ASSERT_TRUE(_writer != NULL);
}
virtual void TearDown() {
SAFE_DELETE(_reader);
SAFE_DELETE(_out_stream);
SAFE_DELETE(_writer);
SAFE_DELETE(_shared_buffer);
SAFE_DELETE(_stream);
}
void CreateReader() {
ASSERT_EQ(OLAP_SUCCESS,
helper.open_with_mode(_file_path.c_str(), O_CREAT | O_EXCL | O_WRONLY,
S_IRUSR | S_IWUSR));
_out_stream->write_to_file(&helper, 0);
helper.close();
ASSERT_EQ(OLAP_SUCCESS,
helper.open_with_mode(_file_path.c_str(), O_RDONLY, S_IRUSR | S_IWUSR));
_shared_buffer = StorageByteBuffer::create(OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE +
sizeof(StreamHead));
ASSERT_TRUE(_shared_buffer != NULL);
_stream = new (std::nothrow)
ReadOnlyFileStream(&helper, &_shared_buffer, 0, helper.length(), NULL,
OLAP_DEFAULT_COLUMN_STREAM_BUFFER_SIZE, &_stats);
ASSERT_EQ(OLAP_SUCCESS, _stream->init());
_reader = new (std::nothrow) RunLengthByteReader(_stream);
ASSERT_TRUE(_reader != NULL);
}
RunLengthByteReader* _reader;
OutStream* _out_stream;
RunLengthByteWriter* _writer;
FileHandler helper;
StorageByteBuffer* _shared_buffer;
ReadOnlyFileStream* _stream;
OlapReaderStatistics _stats;
std::string _file_path = "./ut_dir/tmp_file";
};
TEST_F(TestRunLengthByte, ReadWriteOneByte) {
_writer->write(0x5a);
_writer->flush();
CreateReader();
ASSERT_TRUE(_reader->has_next());
char value = 0xff;
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0X5A);
ASSERT_FALSE(_reader->has_next());
}
TEST_F(TestRunLengthByte, ReadWriteMultiBytes) {
// write data
char write_data[] = {0x5a, 0x5b, 0x5c, 0x5d};
for (int32_t i = 0; i < sizeof(write_data); i++) {
_writer->write(write_data[i]);
}
_writer->flush();
// the stream contain head, control byte and four byte literal
ASSERT_EQ(_out_stream->get_stream_length(), sizeof(StreamHead) + 1 + 4);
// read data
CreateReader();
for (int32_t i = 0; i < sizeof(write_data); i++) {
ASSERT_TRUE(_reader->has_next());
char value = 0xff;
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, write_data[i]);
}
ASSERT_FALSE(_reader->has_next());
}
TEST_F(TestRunLengthByte, ReadWriteSameBytes) {
// write data
char write_data[] = {0x5a, 0x5a, 0x5a, 0x5a};
for (int32_t i = 0; i < sizeof(write_data); i++) {
_writer->write(write_data[i]);
}
_writer->flush();
// the stream contain head, control byte(4-3) and one byte literal
ASSERT_EQ(_out_stream->get_stream_length(), sizeof(StreamHead) + 1 + 1);
// read data
CreateReader();
for (int32_t i = 0; i < sizeof(write_data); i++) {
ASSERT_TRUE(_reader->has_next());
char value = 0xff;
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, write_data[i]);
}
ASSERT_FALSE(_reader->has_next());
}
TEST_F(TestRunLengthByte, Seek) {
// write data
char write_data[] = {0x5a, 0x5b, 0x5c, 0x5d};
for (int32_t i = 0; i < sizeof(write_data); i++) {
_writer->write(write_data[i]);
}
PositionEntryWriter index_entry;
_writer->get_position(&index_entry);
_writer->write(0x5e);
_writer->write(0x5f);
_writer->write(0x60);
_writer->write(0x61);
_writer->flush();
// read data
CreateReader();
char buffer[256];
index_entry.write_to_buffer(buffer);
StreamIndexHeader header;
header.position_format = index_entry.positions_count();
header.statistic_format = OLAP_FIELD_TYPE_TINYINT;
PositionEntryReader entry;
entry.init(&header, OLAP_FIELD_TYPE_TINYINT, false);
entry.attach(buffer);
PositionProvider position(&entry);
_reader->seek(&position);
char value = 0xff;
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0x5e);
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0x5f);
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0x60);
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0x61);
}
TEST_F(TestRunLengthByte, Skip) {
// write data
char write_data[] = {0x5a, 0x5b, 0x5c, 0x5d};
for (int32_t i = 0; i < sizeof(write_data); i++) {
_writer->write(write_data[i]);
}
_writer->write(0x5e);
_writer->flush();
// read data
CreateReader();
_reader->skip(sizeof(write_data) - 1);
char value = 0xff;
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, write_data[sizeof(write_data) - 1]);
ASSERT_EQ(OLAP_SUCCESS, _reader->next(&value));
ASSERT_EQ(value, 0x5e);
}
} // namespace doris
int main(int argc, char** argv) {
std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
if (!doris::config::init(conffile.c_str(), false)) {
fprintf(stderr, "error read config file. \n");
return -1;
}
doris::init_glog("be-test");
int ret = doris::OLAP_SUCCESS;
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
return ret;
}