blob: ed9382655d09ef087f45239c81b3d979c22ebb43 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/disk_io_mgr.h"
#include <gtest/gtest.h>
#include <sched.h>
#include <sys/stat.h>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include "util/cpu_info.h"
#include "util/disk_info.h"
#include "util/logging.h"
using std::string;
using std::stringstream;
using std::vector;
using std::list;
using boost::lock_guard;
using boost::unique_lock;
using boost::mutex;
using boost::mem_fn;
using boost::condition_variable;
using boost::scoped_ptr;
using boost::thread;
using boost::thread_group;
namespace doris {
const int MIN_BUFFER_SIZE = 512;
const int MAX_BUFFER_SIZE = 1024;
const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
class DiskIoMgrTest : public testing::Test {
public:
void write_validate_callback(int num_writes, DiskIoMgr::WriteRange** written_range,
DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
int32_t* data, Status expected_status, const Status& status) {
if (expected_status.code() == TStatusCode::CANCELLED) {
EXPECT_TRUE(status.ok() || status.is_cancelled());
} else {
EXPECT_TRUE(status.code() == expected_status.code());
}
if (status.ok()) {
DiskIoMgr::ScanRange* scan_range = _pool->add(new DiskIoMgr::ScanRange());
scan_range->reset(NULL, (*written_range)->file(), (*written_range)->len(),
(*written_range)->offset(), 0, false, false,
DiskIoMgr::ScanRange::NEVER_CACHE);
validate_sync_read(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
sizeof(int32_t));
}
{
lock_guard<mutex> l(_written_mutex);
++_num_ranges_written;
if (_num_ranges_written == num_writes) {
_writes_done.notify_one();
}
}
}
void write_complete_callback(int num_writes, const Status& status) {
EXPECT_TRUE(status.ok());
{
lock_guard<mutex> l(_written_mutex);
++_num_ranges_written;
if (_num_ranges_written == num_writes) {
_writes_done.notify_all();
}
}
}
protected:
void CreateTempFile(const char* filename, const char* data) {
FILE* file = fopen(filename, "w");
EXPECT_TRUE(file != NULL);
fwrite(data, 1, strlen(data), file);
fclose(file);
}
int CreateTempFile(const char* filename, int file_size) {
FILE* file = fopen(filename, "w");
EXPECT_TRUE(file != NULL);
int success = fclose(file);
if (success != 0) {
LOG(ERROR) << "Error closing file " << filename;
return success;
}
return truncate(filename, file_size);
}
// Validates that buffer[i] is \0 or expected[i]
static void validate_empty_or_correct(const char* expected, const char* buffer, int len) {
for (int i = 0; i < len; ++i) {
if (buffer[i] != '\0') {
EXPECT_EQ(expected[i], buffer[i]) << (int)expected[i] << " != " << (int)buffer[i];
}
}
}
static void validate_sync_read(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
DiskIoMgr::ScanRange* range, const char* expected,
int expected_len = -1) {
DiskIoMgr::BufferDescriptor* buffer;
Status status = io_mgr->read(reader, range, &buffer);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(buffer != NULL);
EXPECT_EQ(buffer->len(), range->len());
if (expected_len < 0) {
expected_len = strlen(expected);
}
int cmp = memcmp(buffer->buffer(), expected, expected_len);
EXPECT_TRUE(cmp == 0);
buffer->return_buffer();
}
static void validate_scan_range(DiskIoMgr::ScanRange* range, const char* expected,
int expected_len, const Status& expected_status) {
char result[expected_len + 1];
memset(result, 0, expected_len + 1);
while (true) {
DiskIoMgr::BufferDescriptor* buffer = NULL;
Status status = range->get_next(&buffer);
ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
if (buffer == NULL || !status.ok()) {
if (buffer != NULL) buffer->return_buffer();
break;
}
ASSERT_LE(buffer->len(), expected_len);
memcpy(result + range->offset() + buffer->scan_range_offset(), buffer->buffer(),
buffer->len());
buffer->return_buffer();
}
validate_empty_or_correct(expected, result, expected_len);
}
// Continues pulling scan ranges from the io mgr until they are all done.
// Updates num_ranges_processed with the number of ranges seen by this thread.
static void scan_range_thread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
const char* expected_result, int expected_len,
const Status& expected_status, int max_ranges,
AtomicInt<int>* num_ranges_processed) {
int num_ranges = 0;
while (max_ranges == 0 || num_ranges < max_ranges) {
DiskIoMgr::ScanRange* range;
Status status = io_mgr->get_next_range(reader, &range);
ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
if (range == NULL) break;
validate_scan_range(range, expected_result, expected_len, expected_status);
++(*num_ranges_processed);
++num_ranges;
}
}
DiskIoMgr::ScanRange* init_range(int num_buffers, const char* file_path, int offset, int len,
int disk_id, int64_t mtime, void* meta_data = NULL,
bool is_cached = false) {
DiskIoMgr::ScanRange* range = _pool->add(new DiskIoMgr::ScanRange(num_buffers));
range->reset(NULL, file_path, len, offset, disk_id, is_cached, true, mtime, meta_data);
EXPECT_EQ(mtime, range->mtime());
return range;
}
boost::scoped_ptr<ObjectPool> _pool;
mutex _written_mutex;
condition_variable _writes_done;
int _num_ranges_written;
};
// Test a single writer with multiple disks and threads per disk. Each WriteRange
// writes random 4-byte integers, and upon completion, the written data is validated
// by reading the data back via a separate IoMgr instance. All writes are expected to
// complete successfully.
TEST_F(DiskIoMgrTest, SingleWriter) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
_num_ranges_written = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
int64_t file_size = 1024 * 1024;
int64_t cur_offset = 0;
int success = CreateTempFile(tmp_file.c_str(), file_size);
if (success != 0) {
LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " << file_size;
EXPECT_TRUE(false);
}
boost::scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
Status status = read_io_mgr->init(reader_mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::RequestContext* reader;
status = read_io_mgr->register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
_pool.reset(new ObjectPool);
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::RequestContext* writer;
io_mgr.register_context(&writer, mem_tracker);
for (int i = 0; i < num_ranges; ++i) {
int32_t* data = _pool->add(new int32_t);
*data = rand();
DiskIoMgr::WriteRange** new_range = _pool->add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::write_validate_callback), this, num_ranges,
new_range, read_io_mgr.get(), reader, data, Status::OK(), _1);
*new_range = _pool->add(new DiskIoMgr::WriteRange(
tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->set_data(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
Status add_status = io_mgr.add_write_range(writer, *new_range);
EXPECT_TRUE(add_status.ok());
cur_offset += sizeof(int32_t);
}
{
unique_lock<mutex> lock(_written_mutex);
while (_num_ranges_written < num_ranges) {
_writes_done.wait(lock);
}
}
_num_ranges_written = 0;
io_mgr.unregister_context(writer);
}
}
read_io_mgr->unregister_context(reader);
read_io_mgr.reset();
}
// Perform invalid writes (e.g. non-existent file, negative offset) and validate
// that an error status is returned via the write callback.
TEST_F(DiskIoMgrTest, InvalidWrite) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
_num_ranges_written = 0;
string tmp_file = "/tmp/non-existent.txt";
DiskIoMgr io_mgr(1, 1, 1, 10);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::RequestContext* writer;
status = io_mgr.register_context(&writer);
_pool.reset(new ObjectPool);
int32_t* data = _pool->add(new int32_t);
*data = rand();
// Write to a non-existent file.
DiskIoMgr::WriteRange** new_range = _pool->add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(
mem_fn(&DiskIoMgrTest::write_validate_callback), this, 2, new_range, (DiskIoMgr*)NULL,
(DiskIoMgr::RequestContext*)NULL, data, Status::InternalError("Test Failure"), _1);
*new_range = _pool->add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
(*new_range)->set_data(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
status = io_mgr.add_write_range(writer, *new_range);
EXPECT_TRUE(status.ok());
// Write to a bad location in a file that exists.
tmp_file = "/tmp/disk_io_mgr_test.txt";
int success = CreateTempFile(tmp_file.c_str(), 100);
if (success != 0) {
LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size 100";
EXPECT_TRUE(false);
}
new_range = _pool->add(new DiskIoMgr::WriteRange*);
callback = bind(mem_fn(&DiskIoMgrTest::write_validate_callback), this, 2, new_range,
(DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL, data,
Status::InternalError("Test Failure"), _1);
*new_range = _pool->add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
(*new_range)->set_data(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
status = io_mgr.add_write_range(writer, *new_range);
EXPECT_TRUE(status.ok());
{
unique_lock<mutex> lock(_written_mutex);
while (_num_ranges_written < 2) {
_writes_done.wait(lock);
}
}
_num_ranges_written = 0;
io_mgr.unregister_context(writer);
}
// Issue a number of writes, cancel the writer context and issue more writes.
// add_write_range() is expected to succeed before the cancel and fail after it.
// The writes themselves may finish with status cancelled or ok.
TEST_F(DiskIoMgrTest, SingleWriterCancel) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
_num_ranges_written = 0;
string tmp_file = "/tmp/disk_io_mgr_test.txt";
int num_ranges = 100;
int num_ranges_before_cancel = 25;
int64_t file_size = 1024 * 1024;
int64_t cur_offset = 0;
int success = CreateTempFile(tmp_file.c_str(), file_size);
if (success != 0) {
LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size " << file_size;
EXPECT_TRUE(false);
}
boost::scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
Status status = read_io_mgr->init(reader_mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::RequestContext* reader;
status = read_io_mgr->register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
_pool.reset(new ObjectPool);
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
status = io_mgr.init(mem_tracker);
DiskIoMgr::RequestContext* writer;
io_mgr.register_context(&writer, mem_tracker);
Status validate_status = Status::OK();
for (int i = 0; i < num_ranges; ++i) {
if (i == num_ranges_before_cancel) {
io_mgr.cancel_context(writer);
validate_status = Status::Cancelled("");
}
int32_t* data = _pool->add(new int32_t);
*data = rand();
DiskIoMgr::WriteRange** new_range = _pool->add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::write_validate_callback), this,
num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
Status::Cancelled(""), _1);
*new_range = _pool->add(new DiskIoMgr::WriteRange(
tmp_file, cur_offset, num_ranges % num_disks, callback));
(*new_range)->set_data(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
cur_offset += sizeof(int32_t);
Status add_status = io_mgr.add_write_range(writer, *new_range);
EXPECT_TRUE(add_status.code() == validate_status.code());
}
{
unique_lock<mutex> lock(_written_mutex);
while (_num_ranges_written < num_ranges_before_cancel) {
_writes_done.wait(lock);
}
}
_num_ranges_written = 0;
io_mgr.unregister_context(writer);
}
}
read_io_mgr->unregister_context(reader);
read_io_mgr.reset();
}
// Basic test with a single reader, testing multiple threads, disks and a different
// number of buffers.
TEST_F(DiskIoMgrTest, SingleReader) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
_pool.reset(new ObjectPool);
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks << " num_buffers=" << num_buffers
<< " num_read_threads=" << num_read_threads;
if (++iters % 5000 == 0) {
LOG(ERROR) << "Starting iteration " << iters;
}
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(init_range(num_buffers, tmp_file, 0, len, disk_id,
stat_val.st_mtime));
}
status = io_mgr.add_scan_ranges(reader, ranges);
ASSERT_TRUE(status.ok());
AtomicInt<int> num_ranges_processed;
thread_group threads;
for (int i = 0; i < num_read_threads; ++i) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, reader, data, len,
Status::OK(), 0, &num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed, ranges.size());
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
}
}
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
// This test issues adding additional scan ranges while there are some still in flight.
TEST_F(DiskIoMgrTest, AddScanRangeTest) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
_pool.reset(new ObjectPool);
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks << " num_buffers=" << num_buffers;
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
std::vector<DiskIoMgr::ScanRange*> ranges_first_half;
std::vector<DiskIoMgr::ScanRange*> ranges_second_half;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
if (i > len / 2) {
ranges_second_half.push_back(init_range(num_buffers, tmp_file, i, 1,
disk_id, stat_val.st_mtime));
} else {
ranges_first_half.push_back(init_range(num_buffers, tmp_file, i, 1, disk_id,
stat_val.st_mtime));
}
}
AtomicInt<int> num_ranges_processed;
// Issue first half the scan ranges.
status = io_mgr.add_scan_ranges(reader, ranges_first_half);
ASSERT_TRUE(status.ok());
// Read a couple of them
scan_range_thread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
&num_ranges_processed);
// Issue second half
status = io_mgr.add_scan_ranges(reader, ranges_second_half);
ASSERT_TRUE(status.ok());
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, reader, data,
strlen(data), Status::Cancelled(""), 0,
&num_ranges_processed));
}
threads.join_all();
EXPECT_EQ(num_ranges_processed, len);
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
}
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
// Test to make sure that sync reads and async reads work together
// Note: this test is constructed so the number of buffers is greater than the
// number of scan ranges.
TEST_F(DiskIoMgrTest, SyncReadTest) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
_pool.reset(new ObjectPool);
LOG(INFO) << "Starting SyncReadTest test with num_threads_per_disk="
<< num_threads_per_disk << " num_disk=" << num_disks
<< " num_buffers=" << num_buffers;
if (++iters % 5000 == 0) {
LOG(ERROR) << "Starting iteration " << iters;
}
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::ScanRange* complete_range =
init_range(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
// Issue some reads before the async ones are issued
validate_sync_read(&io_mgr, reader, complete_range, data);
validate_sync_read(&io_mgr, reader, complete_range, data);
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(
init_range(num_buffers, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
status = io_mgr.add_scan_ranges(reader, ranges);
ASSERT_TRUE(status.ok());
AtomicInt<int> num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, reader, data,
strlen(data), Status::OK(), 0,
&num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
validate_sync_read(&io_mgr, reader, complete_range, data);
}
threads.join_all();
validate_sync_read(&io_mgr, reader, complete_range, data);
validate_sync_read(&io_mgr, reader, complete_range, data);
EXPECT_EQ(num_ranges_processed, ranges.size());
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
}
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
// Tests a single reader cancelling half way through scan ranges.
TEST_F(DiskIoMgrTest, SingleReaderCancel) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
int64_t iters = 0;
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
_pool.reset(new ObjectPool);
LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
<< " num_disk=" << num_disks << " num_buffers=" << num_buffers;
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(
init_range(num_buffers, tmp_file, 0, len, disk_id, stat_val.st_mtime));
}
status = io_mgr.add_scan_ranges(reader, ranges);
ASSERT_TRUE(status.ok());
AtomicInt<int> num_ranges_processed;
int num_succesful_ranges = ranges.size() / 2;
// Read half the ranges
for (int i = 0; i < num_succesful_ranges; ++i) {
scan_range_thread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
&num_ranges_processed);
}
EXPECT_EQ(num_ranges_processed, num_succesful_ranges);
// Start up some threads and then cancel
thread_group threads;
for (int i = 0; i < 3; ++i) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, reader, data,
strlen(data), Status::Cancelled(""), 0,
&num_ranges_processed));
}
io_mgr.cancel_context(reader);
sched_yield();
threads.join_all();
EXPECT_TRUE(io_mgr.context_status(reader).is_cancelled());
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
}
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
// Test when the reader goes over the mem limit
TEST_F(DiskIoMgrTest, MemTrackers) {
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
const int num_buffers = 25;
// Give the reader more buffers than the limit
const int mem_limit_num_buffers = 2;
int64_t iters = 0;
{
_pool.reset(new ObjectPool);
if (++iters % 1000 == 0) {
LOG(ERROR) << "Starting iteration " << iters;
}
std::shared_ptr<MemTracker> mem_tracker(
new MemTracker(mem_limit_num_buffers * MAX_BUFFER_SIZE));
DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < num_buffers; ++i) {
ranges.push_back(init_range(num_buffers, tmp_file, 0, len, 0, stat_val.st_mtime));
}
status = io_mgr.add_scan_ranges(reader, ranges);
ASSERT_TRUE(status.ok());
// Don't return buffers to force memory pressure
std::vector<DiskIoMgr::BufferDescriptor*> buffers;
AtomicInt<int> num_ranges_processed;
scan_range_thread(&io_mgr, reader, data, strlen(data), Status::MemoryLimitExceeded("Mem"),
1, &num_ranges_processed);
char result[strlen(data) + 1];
// Keep reading new ranges without returning buffers. This forces us
// to go over the limit eventually.
while (true) {
memset(result, 0, strlen(data) + 1);
DiskIoMgr::ScanRange* range = NULL;
status = io_mgr.get_next_range(reader, &range);
ASSERT_TRUE(status.ok() || status.is_mem_limit_exceeded());
if (range == NULL) break;
while (true) {
DiskIoMgr::BufferDescriptor* buffer = NULL;
Status status = range->get_next(&buffer);
ASSERT_TRUE(status.ok() || status.is_mem_limit_exceeded());
if (buffer == NULL) break;
memcpy(result + range->offset() + buffer->scan_range_offset(), buffer->buffer(),
buffer->len());
buffers.push_back(buffer);
}
validate_empty_or_correct(data, result, strlen(data));
}
for (int i = 0; i < buffers.size(); ++i) {
buffers[i]->return_buffer();
}
EXPECT_TRUE(io_mgr.context_status(reader).is_mem_limit_exceeded());
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
}
#if 0
// Test when some scan ranges are marked as being cached.
// Since these files are not in HDFS, the cached path always fails so this
// only tests the fallback mechanism.
// TODO: we can fake the cached read path without HDFS
TEST_F(DiskIoMgrTest, CachedReads) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "abcdefghijklm";
int len = strlen(data);
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
const int num_disks = 2;
const int num_buffers = 3;
int64_t iters = 0;
{
_pool.reset(new ObjectPool);
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker());
DiskIoMgr::RequestContext* reader;
status = io_mgr.register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
DiskIoMgr::ScanRange* complete_range =
init_range(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
// Issue some reads before the async ones are issued
validate_sync_read(&io_mgr, reader, complete_range, data);
validate_sync_read(&io_mgr, reader, complete_range, data);
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int i = 0; i < len; ++i) {
int disk_id = i % num_disks;
ranges.push_back(init_range(num_buffers, tmp_file, 0, len, disk_id,
stat_val.st_mtime, NULL, true));
}
status = io_mgr.add_scan_ranges(reader, ranges);
ASSERT_TRUE(status.ok());
AtomicInt<int> num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, reader, data,
strlen(data), Status::OK(), 0, &num_ranges_processed));
}
// Issue some more sync ranges
for (int i = 0; i < 5; ++i) {
sched_yield();
validate_sync_read(&io_mgr, reader, complete_range, data);
}
threads.join_all();
validate_sync_read(&io_mgr, reader, complete_range, data);
validate_sync_read(&io_mgr, reader, complete_range, data);
EXPECT_EQ(num_ranges_processed, ranges.size());
io_mgr.unregister_context(reader);
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
#endif // end #if 0
TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const int ITERATIONS = 1;
const char* data = "abcdefghijklmnopqrstuvwxyz";
const int num_contexts = 5;
const int file_size = 4 * 1024;
const int num_writes_queued = 5;
const int num_reads_queued = 5;
string file_name = "/tmp/disk_io_mgr_test.txt";
int success = CreateTempFile(file_name.c_str(), file_size);
if (success != 0) {
LOG(ERROR) << "Error creating temp file " << file_name.c_str() << " of size " << file_size;
ASSERT_TRUE(false);
}
// Get mtime for file
struct stat stat_val;
stat(file_name.c_str(), &stat_val);
int64_t iters = 0;
std::vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
Status status;
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
io_mgr.init(mem_tracker);
for (int file_index = 0; file_index < num_contexts; ++file_index) {
status = io_mgr.register_context(&contexts[file_index]);
ASSERT_TRUE(status.ok());
}
_pool.reset(new ObjectPool);
int read_offset = 0;
int write_offset = 0;
while (read_offset < file_size) {
for (int context_index = 0; context_index < num_contexts; ++context_index) {
if (++iters % 5000 == 0) {
LOG(ERROR) << "Starting iteration " << iters;
}
AtomicInt<int> num_ranges_processed;
thread_group threads;
std::vector<DiskIoMgr::ScanRange*> ranges;
int num_scan_ranges =
std::min<int>(num_reads_queued, write_offset - read_offset);
for (int i = 0; i < num_scan_ranges; ++i) {
ranges.push_back(init_range(1, file_name.c_str(), read_offset, 1,
i % num_disks, stat_val.st_mtime));
threads.add_thread(new thread(
scan_range_thread, &io_mgr, contexts[context_index],
reinterpret_cast<const char*>(data +
(read_offset % strlen(data))),
1, Status::OK(), num_scan_ranges, &num_ranges_processed));
++read_offset;
}
_num_ranges_written = 0;
int num_write_ranges =
std::min<int>(num_writes_queued, file_size - write_offset);
for (int i = 0; i < num_write_ranges; ++i) {
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::write_complete_callback), this,
num_write_ranges, _1);
DiskIoMgr::WriteRange* new_range = _pool->add(new DiskIoMgr::WriteRange(
file_name, write_offset, i % num_disks, callback));
new_range->set_data(reinterpret_cast<const uint8_t*>(
data + (write_offset % strlen(data))),
1);
status = io_mgr.add_write_range(contexts[context_index], new_range);
++write_offset;
}
{
unique_lock<mutex> lock(_written_mutex);
while (_num_ranges_written < num_write_ranges) {
_writes_done.wait(lock);
}
}
threads.join_all();
} // for (int context_index
} // while (read_offset < file_size)
for (int file_index = 0; file_index < num_contexts; ++file_index) {
io_mgr.unregister_context(contexts[file_index]);
}
} // for (int num_disks
} // for (int threads_per_disk
} // for (int iteration
}
// This test will test multiple concurrent reads each reading a different file.
TEST_F(DiskIoMgrTest, MultipleReader) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const int NUM_READERS = 5;
const int DATA_LEN = 50;
const int ITERATIONS = 25;
const int NUM_THREADS_PER_READER = 3;
std::vector<string> file_names;
std::vector<int64_t> mtimes;
std::vector<string> data;
std::vector<DiskIoMgr::RequestContext*> readers;
std::vector<char*> results;
file_names.resize(NUM_READERS);
readers.resize(NUM_READERS);
mtimes.resize(NUM_READERS);
data.resize(NUM_READERS);
results.resize(NUM_READERS);
// Initialize data for each reader. The data will be
// 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
for (int i = 0; i < NUM_READERS; ++i) {
char buf[DATA_LEN];
for (int j = 0; j < DATA_LEN; ++j) {
int c = (j + i) % 26;
buf[j] = 'a' + c;
}
data[i] = string(buf, DATA_LEN);
std::stringstream ss;
ss << "/tmp/disk_io_mgr_test" << i << ".txt";
file_names[i] = ss.str();
CreateTempFile(ss.str().c_str(), data[i].c_str());
// Get mtime for file
struct stat stat_val;
stat(file_names[i].c_str(), &stat_val);
mtimes[i] = stat_val.st_mtime;
results[i] = new char[DATA_LEN + 1];
memset(results[i], 0, DATA_LEN + 1);
}
// This exercises concurrency, run the test multiple times
int64_t iters = 0;
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
_pool.reset(new ObjectPool);
LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
<< " num_disk=" << num_disks << " num_buffers=" << num_buffers;
if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
for (int i = 0; i < NUM_READERS; ++i) {
status = io_mgr.register_context(&readers[i], NULL);
ASSERT_TRUE(status.ok());
std::vector<DiskIoMgr::ScanRange*> ranges;
for (int j = 0; j < DATA_LEN; ++j) {
int disk_id = j % num_disks;
ranges.push_back(init_range(num_buffers, file_names[i].c_str(), j, 1,
disk_id, mtimes[i]));
}
status = io_mgr.add_scan_ranges(readers[i], ranges);
ASSERT_TRUE(status.ok());
}
AtomicInt<int> num_ranges_processed;
thread_group threads;
for (int i = 0; i < NUM_READERS; ++i) {
for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
threads.add_thread(new thread(scan_range_thread, &io_mgr, readers[i],
data[i].c_str(), data[i].size(),
Status::OK(), 0, &num_ranges_processed));
}
}
threads.join_all();
EXPECT_EQ(num_ranges_processed, DATA_LEN * NUM_READERS);
for (int i = 0; i < NUM_READERS; ++i) {
io_mgr.unregister_context(readers[i]);
}
}
}
}
}
EXPECT_EQ(mem_tracker->consumption(), 0);
}
#if 0
// Stress test for multiple clients with cancellation
// TODO: the stress app should be expanded to include sync reads and adding scan
// ranges in the middle.
TEST_F(DiskIoMgrTest, StressTest) {
// Run the test with 5 disks, 5 threads per disk, 10 clients and with cancellation
DiskIoMgrStress test(5, 5, 10, true);
test.Run(2); // In seconds
}
#endif
TEST_F(DiskIoMgrTest, Buffers) {
// Test default min/max buffer size
int min_buffer_size = 1024;
int max_buffer_size = 8 * 1024 * 1024; // 8 MB
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(max_buffer_size * 2));
DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
Status status = io_mgr.init(mem_tracker);
ASSERT_TRUE(status.ok());
ASSERT_EQ(mem_tracker->consumption(), 0);
// buffer length should be rounded up to min buffer size
int64_t buffer_len = 1;
char* buf = io_mgr.get_free_buffer(&buffer_len);
EXPECT_EQ(buffer_len, min_buffer_size);
EXPECT_EQ(io_mgr._num_allocated_buffers, 1);
io_mgr.return_free_buffer(buf, buffer_len);
EXPECT_EQ(mem_tracker->consumption(), min_buffer_size);
// reuse buffer
buffer_len = min_buffer_size;
buf = io_mgr.get_free_buffer(&buffer_len);
EXPECT_EQ(buffer_len, min_buffer_size);
EXPECT_EQ(io_mgr._num_allocated_buffers, 1);
io_mgr.return_free_buffer(buf, buffer_len);
EXPECT_EQ(mem_tracker->consumption(), min_buffer_size);
// bump up to next buffer size
buffer_len = min_buffer_size + 1;
buf = io_mgr.get_free_buffer(&buffer_len);
EXPECT_EQ(buffer_len, min_buffer_size * 2);
EXPECT_EQ(io_mgr._num_allocated_buffers, 2);
EXPECT_EQ(mem_tracker->consumption(), min_buffer_size * 3);
// gc unused buffer
io_mgr.gc_io_buffers();
EXPECT_EQ(io_mgr._num_allocated_buffers, 1);
EXPECT_EQ(mem_tracker->consumption(), min_buffer_size * 2);
io_mgr.return_free_buffer(buf, buffer_len);
// max buffer size
buffer_len = max_buffer_size;
buf = io_mgr.get_free_buffer(&buffer_len);
EXPECT_EQ(buffer_len, max_buffer_size);
EXPECT_EQ(io_mgr._num_allocated_buffers, 2);
io_mgr.return_free_buffer(buf, buffer_len);
EXPECT_EQ(mem_tracker->consumption(), min_buffer_size * 2 + max_buffer_size);
// gc buffers
io_mgr.gc_io_buffers();
EXPECT_EQ(io_mgr._num_allocated_buffers, 0);
EXPECT_EQ(mem_tracker->consumption(), 0);
}
// IMPALA-2366: handle partial read where range goes past end of file.
TEST_F(DiskIoMgrTest, PartialRead) {
std::shared_ptr<MemTracker> mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
const char* data = "the quick brown fox jumped over the lazy dog";
int len = strlen(data);
int read_len = len + 1000; // Read past end of file.
CreateTempFile(tmp_file, data);
// Get mtime for file
struct stat stat_val;
stat(tmp_file, &stat_val);
_pool.reset(new ObjectPool);
boost::scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
Status status = io_mgr->init(mem_tracker);
ASSERT_TRUE(status.ok());
std::shared_ptr<MemTracker> reader_mem_tracker(new MemTracker(LARGE_MEM_LIMIT));
DiskIoMgr::RequestContext* reader;
status = io_mgr->register_context(&reader, reader_mem_tracker);
ASSERT_TRUE(status.ok());
// We should not read past the end of file.
DiskIoMgr::ScanRange* range = init_range(1, tmp_file, 0, read_len, 0, stat_val.st_mtime);
DiskIoMgr::BufferDescriptor* buffer;
status = io_mgr->read(reader, range, &buffer);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(buffer->eosr());
ASSERT_EQ(len, buffer->len());
ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
buffer->return_buffer();
io_mgr->unregister_context(reader);
_pool.reset();
io_mgr.reset();
EXPECT_EQ(reader_mem_tracker->consumption(), 0);
EXPECT_EQ(mem_tracker->consumption(), 0);
}
} // end namespace doris
int main(int argc, char** argv) {
// std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
// if (!doris::config::init(conffile.c_str(), false)) {
// fprintf(stderr, "error read config file. \n");
// return -1;
// }
doris::config::query_scratch_dirs = "/tmp";
doris::config::max_free_io_buffers = 128;
doris::config::disable_mem_pools = false;
doris::init_glog("be-test");
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
doris::DiskInfo::init();
return RUN_ALL_TESTS();
}