blob: eefa326b1aaefc04edd362cd278e7e910684466a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/buffered_block_mgr2.h"
#include <gtest/gtest.h>
#include <sys/stat.h>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include "runtime/disk_io_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/test_env.h"
#include "runtime/tmp_file_mgr.h"
#include "util/cpu_info.h"
#include "util/disk_info.h"
#include "util/filesystem_util.h"
#include "util/logging.h"
#include "util/monotime.h"
using boost::filesystem::directory_iterator;
using boost::filesystem::remove;
using boost::scoped_ptr;
using boost::unordered_map;
using boost::thread;
using std::string;
using std::stringstream;
using std::vector;
// Note: This is the default scratch dir created by doris.
// config::query_scratch_dirs + TmpFileMgr::_s_tmp_sub_dir_name.
const static string SCRATCH_DIR = "/tmp/doris-scratch";
// This suffix is appended to a tmp dir
const static string SCRATCH_SUFFIX = "/doris-scratch";
// Number of milliseconds to wait to ensure write completes
const static int WRITE_WAIT_MILLIS = 500;
// How often to check for write completion
const static int WRITE_CHECK_INTERVAL_MILLIS = 10;
namespace doris {
class BufferedBlockMgrTest : public ::testing::Test {
protected:
const static int _block_size = 1024;
virtual void SetUp() {
_test_env.reset(new TestEnv());
_client_tracker.reset(new MemTracker(-1));
}
virtual void TearDown() {
TearDownMgrs();
_test_env.reset();
_client_tracker.reset();
// Tests modify permissions, so make sure we can delete if they didn't clean up.
for (int i = 0; i < _created_tmp_dirs.size(); ++i) {
chmod((_created_tmp_dirs[i] + SCRATCH_SUFFIX).c_str(), S_IRWXU);
}
FileSystemUtil::remove_paths(_created_tmp_dirs);
_created_tmp_dirs.clear();
}
// Reinitialize _test_env to have multiple temporary directories.
std::vector<string> InitMultipleTmpDirs(int num_dirs) {
std::vector<string> tmp_dirs;
for (int i = 0; i < num_dirs; ++i) {
std::stringstream dir_str;
dir_str << "/tmp/buffered-block-mgr-test." << i;
const string& dir = dir_str.str();
// Fix permissions in case old directories were left from previous runs of test.
chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
EXPECT_TRUE(FileSystemUtil::create_directory(dir).ok());
tmp_dirs.push_back(dir);
_created_tmp_dirs.push_back(dir);
}
_test_env->init_tmp_file_mgr(tmp_dirs, false);
EXPECT_EQ(num_dirs, _test_env->tmp_file_mgr()->num_active_tmp_devices());
return tmp_dirs;
}
static void ValidateBlock(BufferedBlockMgr2::Block* block, int32_t data) {
EXPECT_TRUE(block->valid_data_len() == sizeof(int32_t));
EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->buffer()) == data);
}
static int32_t* MakeRandomSizeData(BufferedBlockMgr2::Block* block) {
// Format is int32_t size, followed by size bytes of data
int32_t size = (rand() % 252) + 4; // So blocks have 4-256 bytes of data
uint8_t* data = block->allocate<uint8_t>(size);
*(reinterpret_cast<int32_t*>(data)) = size;
int i = 0;
for (i = 4; i < size - 5; ++i) {
data[i] = i;
}
for (; i < size; ++i) { // End marker of at least 5 0xff's
data[i] = 0xff;
}
return reinterpret_cast<int32_t*>(data); // Really returns a pointer to size
}
static void ValidateRandomSizeData(BufferedBlockMgr2::Block* block, int32_t size) {
int32_t bsize = *(reinterpret_cast<int32_t*>(block->buffer()));
uint8_t* data = reinterpret_cast<uint8_t*>(block->buffer());
int i = 0;
EXPECT_EQ(block->valid_data_len(), size);
EXPECT_EQ(size, bsize);
for (i = 4; i < size - 5; ++i) {
EXPECT_EQ(data[i], i);
}
for (; i < size; ++i) {
EXPECT_EQ(data[i], 0xff);
}
}
/// Helper to create a simple block manager.
BufferedBlockMgr2* CreateMgr(int64_t query_id, int max_buffers, int block_size,
RuntimeState** query_state = NULL) {
RuntimeState* state = NULL;
EXPECT_TRUE(_test_env->create_query_state(query_id, max_buffers, block_size, &state).ok());
if (query_state != NULL) {
*query_state = state;
}
return state->block_mgr2();
}
BufferedBlockMgr2* CreateMgrAndClient(int64_t query_id, int max_buffers, int block_size,
int reserved_blocks,
const std::shared_ptr<MemTracker>& tracker,
BufferedBlockMgr2::Client** client) {
RuntimeState* state = NULL;
BufferedBlockMgr2* mgr = CreateMgr(query_id, max_buffers, block_size, &state);
EXPECT_TRUE(mgr->register_client(reserved_blocks, tracker, state, client).ok());
EXPECT_TRUE(client != NULL);
return mgr;
}
void CreateMgrsAndClients(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
int block_size, int reserved_blocks_per_client,
const std::shared_ptr<MemTracker>& tracker,
std::vector<BufferedBlockMgr2*>* mgrs,
std::vector<BufferedBlockMgr2::Client*>* clients) {
for (int i = 0; i < num_mgrs; ++i) {
BufferedBlockMgr2::Client* client;
BufferedBlockMgr2* mgr =
CreateMgrAndClient(start_query_id + i, buffers_per_mgr, _block_size,
reserved_blocks_per_client, tracker, &client);
mgrs->push_back(mgr);
clients->push_back(client);
}
}
// Destroy all created query states and associated block managers.
void TearDownMgrs() {
// Freeing all block managers should clean up all consumed memory.
_test_env->tear_down_query_states();
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
}
void AllocateBlocks(BufferedBlockMgr2* block_mgr, BufferedBlockMgr2::Client* client,
int num_blocks, std::vector<BufferedBlockMgr2::Block*>* blocks) {
int32_t* data = NULL;
Status status;
BufferedBlockMgr2::Block* new_block;
for (int i = 0; i < num_blocks; ++i) {
status = block_mgr->get_new_block(client, NULL, &new_block);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(new_block != NULL);
data = new_block->allocate<int32_t>(sizeof(int32_t));
*data = blocks->size();
blocks->push_back(new_block);
}
}
// Pin all blocks, expecting they are pinned successfully.
void PinBlocks(const std::vector<BufferedBlockMgr2::Block*>& blocks) {
for (int i = 0; i < blocks.size(); ++i) {
bool pinned = false;
EXPECT_TRUE(blocks[i]->pin(&pinned).ok());
EXPECT_TRUE(pinned);
}
}
// Pin all blocks, expecting no errors from unpin() calls.
void UnpinBlocks(const std::vector<BufferedBlockMgr2::Block*>& blocks) {
for (int i = 0; i < blocks.size(); ++i) {
EXPECT_TRUE(blocks[i]->unpin().ok());
}
}
static void WaitForWrites(BufferedBlockMgr2* block_mgr) {
std::vector<BufferedBlockMgr2*> block_mgrs;
block_mgrs.push_back(block_mgr);
WaitForWrites(block_mgrs);
}
// Wait for writes issued through block managers to complete.
static void WaitForWrites(const std::vector<BufferedBlockMgr2*>& block_mgrs) {
int max_attempts = WRITE_WAIT_MILLIS / WRITE_CHECK_INTERVAL_MILLIS;
for (int i = 0; i < max_attempts; ++i) {
SleepFor(MonoDelta::FromMilliseconds(WRITE_CHECK_INTERVAL_MILLIS));
if (AllWritesComplete(block_mgrs)) {
return;
}
}
EXPECT_TRUE(false) << "Writes did not complete after " << WRITE_WAIT_MILLIS << "ms";
}
static bool AllWritesComplete(const std::vector<BufferedBlockMgr2*>& block_mgrs) {
for (int i = 0; i < block_mgrs.size(); ++i) {
RuntimeProfile::Counter* writes_outstanding =
block_mgrs[i]->profile()->get_counter("BlockWritesOutstanding");
if (writes_outstanding->value() != 0) {
return false;
}
}
return true;
}
// Delete the temporary file backing a block - all subsequent writes to the file
// should fail. Expects backing file has already been allocated.
static void DeleteBackingFile(BufferedBlockMgr2::Block* block) {
const string& path = block->tmp_file_path();
EXPECT_GT(path.size(), 0);
EXPECT_TRUE(remove(path));
LOG(INFO) << "Injected fault by deleting file " << path;
}
// Check that the file backing the block has dir as a prefix of its path.
static bool BlockInDir(BufferedBlockMgr2::Block* block, const string& dir) {
return block->tmp_file_path().find(dir) == 0;
}
// Find a block in the list that is backed by a file with the given directory as prefix
// of its path.
static BufferedBlockMgr2::Block* FindBlockForDir(
const std::vector<BufferedBlockMgr2::Block*>& blocks, const string& dir) {
for (int i = 0; i < blocks.size(); ++i) {
if (BlockInDir(blocks[i], dir)) {
return blocks[i];
}
}
return NULL;
}
void TestGetNewBlockImpl(int block_size) {
Status status;
int max_num_blocks = 5;
BufferedBlockMgr2* block_mgr = NULL;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker, &client);
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
// Allocate blocks until max_num_blocks, they should all succeed and memory
// usage should go up.
BufferedBlockMgr2::Block* new_block;
BufferedBlockMgr2::Block* first_block = NULL;
for (int i = 0; i < max_num_blocks; ++i) {
status = block_mgr->get_new_block(client, NULL, &new_block);
EXPECT_TRUE(new_block != NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size);
if (first_block == NULL) {
first_block = new_block;
}
}
// Trying to allocate a new one should fail.
status = block_mgr->get_new_block(client, NULL, &new_block);
EXPECT_TRUE(new_block == NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
// We can allocate a new block by transferring an already allocated one.
uint8_t* old_buffer = first_block->buffer();
status = block_mgr->get_new_block(client, first_block, &new_block);
EXPECT_TRUE(new_block != NULL);
EXPECT_TRUE(old_buffer == new_block->buffer());
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
EXPECT_TRUE(!first_block->is_pinned());
// Trying to allocate a new one should still fail.
status = block_mgr->get_new_block(client, NULL, &new_block);
EXPECT_TRUE(new_block == NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
EXPECT_EQ(block_mgr->writes_issued(), 1);
TearDownMgrs();
}
void TestEvictionImpl(int block_size) {
Status status;
DCHECK_GT(block_size, 0);
int max_num_buffers = 5;
BufferedBlockMgr2* block_mgr = NULL;
BufferedBlockMgr2::Client* client = NULL;
block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker, &client);
// Check counters.
RuntimeProfile* profile = block_mgr->profile();
RuntimeProfile::Counter* buffered_pin = profile->get_counter("BufferedPins");
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
// Re-pinning all blocks
for (int i = 0; i < blocks.size(); ++i) {
bool pinned = false;
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
ValidateBlock(blocks[i], i);
}
int buffered_pins_expected = blocks.size();
EXPECT_EQ(buffered_pin->value(), buffered_pins_expected);
// Unpin all blocks
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
// Get two new blocks.
AllocateBlocks(block_mgr, client, 2, &blocks);
// At least two writes must be issued. The first (num_blocks - 2) must be in memory.
EXPECT_GE(block_mgr->writes_issued(), 2);
for (int i = 0; i < (max_num_buffers - 2); ++i) {
bool pinned = false;
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
ValidateBlock(blocks[i], i);
}
EXPECT_GE(buffered_pin->value(), buffered_pins_expected);
// can not pin any more
for (int i = (max_num_buffers - 2); i < max_num_buffers; ++i) {
bool pinned = true;
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(pinned);
}
// the last 2 block has already been pinned
for (int i = max_num_buffers; i < blocks.size(); ++i) {
bool pinned = false;
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
ValidateBlock(blocks[i], i);
}
TearDownMgrs();
}
// Test that randomly issues GetFreeBlock(), pin(), unpin(), del() and Close()
// calls. All calls made are legal - error conditions are not expected until the first
// call to Close(). This is called 2 times with encryption+integrity on/off.
// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID.
static const int SINGLE_THREADED_TID = -1;
void TestRandomInternalImpl(RuntimeState* state, BufferedBlockMgr2* block_mgr, int num_buffers,
int tid) {
DCHECK(block_mgr != NULL);
const int num_iterations = 100000;
const int iters_before_close = num_iterations - 5000;
bool close_called = false;
unordered_map<BufferedBlockMgr2::Block*, int> pinned_block_map;
std::vector<std::pair<BufferedBlockMgr2::Block*, int32_t>> pinned_blocks;
unordered_map<BufferedBlockMgr2::Block*, int> unpinned_block_map;
std::vector<std::pair<BufferedBlockMgr2::Block*, int32_t>> unpinned_blocks;
typedef enum { Pin, New, Unpin, Delete, Close } ApiFunction;
ApiFunction api_function;
BufferedBlockMgr2::Client* client;
Status status = block_mgr->register_client(0, _client_tracker, state, &client);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client != NULL);
pinned_blocks.reserve(num_buffers);
BufferedBlockMgr2::Block* new_block;
for (int i = 0; i < num_iterations; ++i) {
if ((i % 20000) == 0) {
LOG(ERROR) << " Iteration " << i << std::endl;
}
if (i > iters_before_close && (rand() % 5 == 0)) {
api_function = Close;
} else if (pinned_blocks.size() == 0 && unpinned_blocks.size() == 0) {
api_function = New;
} else if (pinned_blocks.size() == 0) {
// Pin or New. Can't unpin or delete.
api_function = static_cast<ApiFunction>(rand() % 2);
} else if (pinned_blocks.size() >= num_buffers) {
// Unpin or delete. Can't pin or get new.
api_function = static_cast<ApiFunction>(2 + (rand() % 2));
} else if (unpinned_blocks.size() == 0) {
// Can't pin. Unpin, new or delete.
api_function = static_cast<ApiFunction>(1 + (rand() % 3));
} else {
// Any api function.
api_function = static_cast<ApiFunction>(rand() % 4);
}
std::pair<BufferedBlockMgr2::Block*, int32_t> block_data;
int rand_pick = 0;
int32_t* data = NULL;
bool pinned = false;
switch (api_function) {
case New:
status = block_mgr->get_new_block(client, NULL, &new_block);
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
EXPECT_TRUE(new_block == NULL);
EXPECT_TRUE(status.is_cancelled());
continue;
}
EXPECT_TRUE(status.ok());
EXPECT_TRUE(new_block != NULL);
data = MakeRandomSizeData(new_block);
block_data = std::make_pair(new_block, *data);
pinned_blocks.push_back(block_data);
pinned_block_map.insert(std::make_pair(block_data.first, pinned_blocks.size() - 1));
break;
case Pin:
rand_pick = rand() % unpinned_blocks.size();
block_data = unpinned_blocks[rand_pick];
status = block_data.first->pin(&pinned);
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
EXPECT_TRUE(status.is_cancelled());
// In single-threaded runs the block should not have been pinned.
// In multi-threaded runs pin() may return the block pinned but the status to
// be cancelled. In this case we could move the block from unpinned_blocks
// to pinned_blocks. We do not do that because after is_cancelled() no actual
// block operations should take place.
// reason: when block_mgr is cancelled in one thread, the same block_mgr
// is waiting for scan-range to be ready.
if (tid == SINGLE_THREADED_TID) {
EXPECT_FALSE(pinned);
}
continue;
}
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
ValidateRandomSizeData(block_data.first, block_data.second);
unpinned_blocks[rand_pick] = unpinned_blocks.back();
unpinned_blocks.pop_back();
unpinned_block_map[unpinned_blocks[rand_pick].first] = rand_pick;
pinned_blocks.push_back(block_data);
pinned_block_map.insert(std::make_pair(block_data.first, pinned_blocks.size() - 1));
break;
case Unpin:
rand_pick = rand() % pinned_blocks.size();
block_data = pinned_blocks[rand_pick];
status = block_data.first->unpin();
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
EXPECT_TRUE(status.is_cancelled());
continue;
}
EXPECT_TRUE(status.ok());
pinned_blocks[rand_pick] = pinned_blocks.back();
pinned_blocks.pop_back();
pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
unpinned_blocks.push_back(block_data);
unpinned_block_map.insert(
std::make_pair(block_data.first, unpinned_blocks.size() - 1));
break;
case Delete:
rand_pick = rand() % pinned_blocks.size();
block_data = pinned_blocks[rand_pick];
block_data.first->del();
pinned_blocks[rand_pick] = pinned_blocks.back();
pinned_blocks.pop_back();
pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
break;
case Close:
block_mgr->cancel();
close_called = true;
break;
} // end switch (apiFunction)
} // end for ()
}
// Single-threaded execution of the TestRandomInternalImpl.
void TestRandomInternalSingle(int block_size) {
DCHECK_GT(block_size, 0);
DCHECK(_test_env.get() != NULL);
const int max_num_buffers = 100;
RuntimeState* state = NULL;
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &state);
TestRandomInternalImpl(state, block_mgr, max_num_buffers, SINGLE_THREADED_TID);
TearDownMgrs();
}
// Multi-threaded execution of the TestRandomInternalImpl.
void TestRandomInternalMulti(int num_threads, int block_size) {
DCHECK_GT(num_threads, 0);
DCHECK_GT(block_size, 0);
DCHECK(_test_env.get() != NULL);
const int max_num_buffers = 100;
RuntimeState* state = NULL;
BufferedBlockMgr2* block_mgr =
CreateMgr(0, num_threads * max_num_buffers, block_size, &state);
boost::thread_group workers;
for (int i = 0; i < num_threads; ++i) {
thread* t = new boost::thread(boost::bind(&BufferedBlockMgrTest::TestRandomInternalImpl,
this, state, block_mgr, max_num_buffers, i));
workers.add_thread(t);
}
workers.join_all();
TearDownMgrs();
}
// Repeatedly call BufferedBlockMgr2::Create() and BufferedBlockMgr2::~BufferedBlockMgr2().
void CreateDestroyThread(int index, RuntimeState* state) {
const int num_buffers = 10;
const int iters = 100;
for (int i = 0; i < iters; ++i) {
LOG(WARNING) << "CreateDestroyThread thread " << index << " begin " << i << std::endl;
boost::shared_ptr<BufferedBlockMgr2> mgr;
Status status = BufferedBlockMgr2::create(
state, _test_env->block_mgr_parent_tracker(), state->runtime_profile(),
_test_env->tmp_file_mgr(), _block_size * num_buffers, _block_size, &mgr);
LOG(WARNING) << "CreateDestroyThread thread " << index << " end " << i << std::endl;
}
}
// IMPALA-2286: Test for races between BufferedBlockMgr2::Create() and
// BufferedBlockMgr2::~BufferedBlockMgr2().
void CreateDestroyMulti() {
const int num_threads = 4;
boost::thread_group workers;
// Create a shared RuntimeState with no BufferedBlockMgr2.
RuntimeState* shared_state = new RuntimeState(TUniqueId(), TQueryOptions(), TQueryGlobals(),
_test_env->exec_env());
for (int i = 0; i < num_threads; ++i) {
thread* t = new boost::thread(
boost::bind(&BufferedBlockMgrTest::CreateDestroyThread, this, i, shared_state));
workers.add_thread(t);
}
workers.join_all();
}
boost::scoped_ptr<TestEnv> _test_env;
std::shared_ptr<MemTracker> _client_tracker;
std::vector<string> _created_tmp_dirs;
};
TEST_F(BufferedBlockMgrTest, get_new_block) {
TestGetNewBlockImpl(1024);
TestGetNewBlockImpl(8 * 1024);
TestGetNewBlockImpl(8 * 1024 * 1024);
LOG(WARNING) << "finish test get_new_block." << std::endl;
}
TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
const int block_size = 1024;
int max_num_blocks = 3;
BufferedBlockMgr2* block_mgr;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker, &client);
EXPECT_EQ(0, _test_env->block_mgr_parent_tracker()->consumption());
std::vector<BufferedBlockMgr2::Block*> blocks;
// Allocate a small block.
BufferedBlockMgr2::Block* new_block = NULL;
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, 128).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), 0);
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
EXPECT_EQ(_client_tracker->consumption(), 128);
EXPECT_TRUE(new_block->is_pinned());
EXPECT_EQ(new_block->bytes_remaining(), 128);
EXPECT_TRUE(new_block->buffer() != NULL);
blocks.push_back(new_block);
// Allocate a normal block
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), block_mgr->max_block_size());
EXPECT_EQ(_client_tracker->consumption(), 128 + block_mgr->max_block_size());
EXPECT_TRUE(new_block->is_pinned());
EXPECT_EQ(new_block->bytes_remaining(), block_mgr->max_block_size());
EXPECT_TRUE(new_block->buffer() != NULL);
blocks.push_back(new_block);
// Allocate another small block.
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, 512).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), block_mgr->max_block_size());
EXPECT_EQ(_client_tracker->consumption(), 128 + 512 + block_mgr->max_block_size());
EXPECT_TRUE(new_block->is_pinned());
EXPECT_EQ(new_block->bytes_remaining(), 512);
EXPECT_TRUE(new_block->buffer() != NULL);
blocks.push_back(new_block);
// Should be able to unpin and pin the middle block
EXPECT_TRUE(blocks[1]->unpin().ok());
bool pinned;
EXPECT_TRUE(blocks[1]->pin(&pinned).ok());
EXPECT_TRUE(pinned);
for (int i = 0; i < blocks.size(); ++i) {
blocks[i]->del();
}
TearDownMgrs();
}
// Test that pinning more blocks than the max available buffers.
TEST_F(BufferedBlockMgrTest, Pin) {
Status status;
int max_num_blocks = 5;
const int block_size = 1024;
BufferedBlockMgr2* block_mgr;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker, &client);
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
// Unpin them all.
for (int i = 0; i < blocks.size(); ++i) {
status = blocks[i]->unpin();
EXPECT_TRUE(status.ok());
}
// Allocate more, this should work since we just unpinned some blocks.
AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
// Try to pin a unpinned block, this should not be possible.
bool pinned;
status = blocks[0]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(pinned);
// Unpin all blocks.
for (int i = 0; i < blocks.size(); ++i) {
status = blocks[i]->unpin();
EXPECT_TRUE(status.ok());
}
// Should be able to pin max_num_blocks blocks.
for (int i = 0; i < max_num_blocks; ++i) {
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
}
// Can't pin any more though.
status = blocks[max_num_blocks]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(pinned);
TearDownMgrs();
}
// Test the eviction policy of the block mgr. No writes issued until more than
// the max available buffers are allocated. Writes must be issued in LIFO order.
TEST_F(BufferedBlockMgrTest, Eviction) {
TestEvictionImpl(1024);
TestEvictionImpl(8 * 1024 * 1024);
}
// Test deletion and reuse of blocks.
TEST_F(BufferedBlockMgrTest, Deletion) {
int max_num_buffers = 5;
const int block_size = 1024;
BufferedBlockMgr2* block_mgr;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker, &client);
// Check counters.
RuntimeProfile* profile = block_mgr->profile();
RuntimeProfile::Counter* recycled_cnt = profile->get_counter("BlocksRecycled");
RuntimeProfile::Counter* created_cnt = profile->get_counter("BlocksCreated");
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->del(); }
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);
TearDownMgrs();
}
// Delete blocks of various sizes and statuses to exercise the different code paths.
// This relies on internal validation in block manager to detect many errors.
TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
int max_num_buffers = 16;
BufferedBlockMgr2::Client* client;
BufferedBlockMgr2* block_mgr =
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker, &client);
// Pinned I/O block.
BufferedBlockMgr2::Block* new_block;
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_TRUE(new_block->is_pinned());
EXPECT_TRUE(new_block->is_max_size());
new_block->del();
EXPECT_TRUE(_client_tracker->consumption() == 0);
// Pinned non-I/O block.
int small_block_size = 128;
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, small_block_size).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_TRUE(new_block->is_pinned());
EXPECT_EQ(small_block_size, _client_tracker->consumption());
new_block->del();
EXPECT_EQ(0, _client_tracker->consumption());
// Unpinned I/O block - delete after written to disk.
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_TRUE(new_block->is_pinned());
EXPECT_TRUE(new_block->is_max_size());
new_block->unpin();
EXPECT_FALSE(new_block->is_pinned());
WaitForWrites(block_mgr);
new_block->del();
EXPECT_TRUE(_client_tracker->consumption() == 0);
// Unpinned I/O block - delete before written to disk.
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
EXPECT_TRUE(new_block != NULL);
EXPECT_TRUE(new_block->is_pinned());
EXPECT_TRUE(new_block->is_max_size());
new_block->unpin();
EXPECT_FALSE(new_block->is_pinned());
new_block->del();
WaitForWrites(block_mgr);
EXPECT_TRUE(_client_tracker->consumption() == 0);
TearDownMgrs();
}
// Test that all APIs return cancelled after close.
TEST_F(BufferedBlockMgrTest, Close) {
int max_num_buffers = 5;
const int block_size = 1024;
BufferedBlockMgr2* block_mgr;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker, &client);
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
block_mgr->cancel();
BufferedBlockMgr2::Block* new_block;
Status status = block_mgr->get_new_block(client, NULL, &new_block);
EXPECT_TRUE(status.is_cancelled());
EXPECT_TRUE(new_block == NULL);
status = blocks[0]->unpin();
EXPECT_TRUE(status.is_cancelled());
bool pinned;
status = blocks[0]->pin(&pinned);
EXPECT_TRUE(status.is_cancelled());
blocks[1]->del();
TearDownMgrs();
}
// Clear scratch directory. Return # of files deleted.
static int clear_scratch_dir() {
int num_files = 0;
directory_iterator dir_it(SCRATCH_DIR);
for (; dir_it != directory_iterator(); ++dir_it) {
++num_files;
remove_all(dir_it->path());
}
return num_files;
}
// Test that the block manager behaves correctly after a write error. Delete the scratch
// directory before an operation that would cause a write and test that subsequent API
// calls return 'CANCELLED' correctly.
TEST_F(BufferedBlockMgrTest, WriteError) {
Status status;
int max_num_buffers = 2;
const int block_size = 1024;
BufferedBlockMgr2* block_mgr;
BufferedBlockMgr2::Client* client;
block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker, &client);
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
// Unpin two blocks here, to ensure that backing storage is allocated in tmp file.
for (int i = 0; i < 2; ++i) {
status = blocks[i]->unpin();
EXPECT_TRUE(status.ok());
}
WaitForWrites(block_mgr);
// Repin the blocks
for (int i = 0; i < 2; ++i) {
bool pinned;
status = blocks[i]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
}
// Remove the backing storage so that future writes will fail
int num_files = clear_scratch_dir();
EXPECT_TRUE(num_files > 0);
for (int i = 0; i < 2; ++i) {
status = blocks[i]->unpin();
EXPECT_TRUE(status.ok());
}
WaitForWrites(block_mgr);
// Subsequent calls should fail.
for (int i = 0; i < 2; ++i) {
blocks[i]->del();
}
BufferedBlockMgr2::Block* new_block;
status = block_mgr->get_new_block(client, NULL, &new_block);
EXPECT_TRUE(status.is_cancelled());
EXPECT_TRUE(new_block == NULL);
TearDownMgrs();
}
// Test block manager error handling when temporary file space cannot be allocated to
// back an unpinned buffer.
TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
Status status;
int max_num_buffers = 2;
BufferedBlockMgr2::Client* client;
BufferedBlockMgr2* block_mgr =
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker, &client);
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
// Unpin a block, forcing a write.
status = blocks[0]->unpin();
EXPECT_TRUE(status.ok());
WaitForWrites(block_mgr);
// Remove temporary files - subsequent operations will fail.
int num_files = clear_scratch_dir();
EXPECT_TRUE(num_files > 0);
// Current implementation will fail here because it tries to expand the tmp file
// immediately. This behavior is not contractual but we want to know if it changes
// accidentally.
status = blocks[1]->unpin();
EXPECT_FALSE(status.ok());
TearDownMgrs();
}
// Test that the block manager is able to blacklist a temporary device correctly after a
// write error. We should not allocate more blocks on that device, but existing blocks
// on the device will remain in use.
/// Disabled because blacklisting was disabled as workaround for IMPALA-2305.
TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
// TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) {
// Set up two buffered block managers with two temporary dirs.
std::vector<string> tmp_dirs = InitMultipleTmpDirs(2);
// Simulate two concurrent queries.
const int NUM_BLOCK_MGRS = 2;
const int MAX_NUM_BLOCKS = 4;
int blocks_per_mgr = MAX_NUM_BLOCKS / NUM_BLOCK_MGRS;
std::vector<BufferedBlockMgr2*> block_mgrs;
std::vector<BufferedBlockMgr2::Client*> clients;
CreateMgrsAndClients(0, NUM_BLOCK_MGRS, blocks_per_mgr, _block_size, 0, _client_tracker,
&block_mgrs, &clients);
// Allocate files for all 2x2 combinations by unpinning blocks.
std::vector<vector<BufferedBlockMgr2::Block*>> blocks;
std::vector<BufferedBlockMgr2::Block*> all_blocks;
for (int i = 0; i < NUM_BLOCK_MGRS; ++i) {
std::vector<BufferedBlockMgr2::Block*> mgr_blocks;
AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
UnpinBlocks(mgr_blocks);
for (int j = 0; j < blocks_per_mgr; ++j) {
LOG(INFO) << "Manager " << i << " Block " << j << " backed by file "
<< mgr_blocks[j]->tmp_file_path();
}
blocks.push_back(mgr_blocks);
all_blocks.insert(all_blocks.end(), mgr_blocks.begin(), mgr_blocks.end());
}
WaitForWrites(block_mgrs);
int error_mgr = 0;
int no_error_mgr = 1;
const string& error_dir = tmp_dirs[0];
const string& good_dir = tmp_dirs[1];
// Delete one file from first scratch dir for first block manager.
BufferedBlockMgr2::Block* error_block = FindBlockForDir(blocks[error_mgr], error_dir);
ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << error_dir;
PinBlocks(all_blocks);
DeleteBackingFile(error_block);
UnpinBlocks(all_blocks); // Should succeed since tmp file space was already allocated.
WaitForWrites(block_mgrs);
EXPECT_TRUE(block_mgrs[error_mgr]->is_cancelled());
EXPECT_FALSE(block_mgrs[no_error_mgr]->is_cancelled());
// Temporary device with error should no longer be active.
std::vector<TmpFileMgr::DeviceId> active_tmp_devices =
_test_env->tmp_file_mgr()->active_tmp_devices();
EXPECT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size());
for (int i = 0; i < active_tmp_devices.size(); ++i) {
const string& device_path =
_test_env->tmp_file_mgr()->get_tmp_dir_path(active_tmp_devices[i]);
EXPECT_EQ(string::npos, error_dir.find(device_path));
}
// The second block manager should continue using allocated scratch space, since it
// didn't encounter a write error itself. In future this could change but for now it is
// the intended behaviour.
PinBlocks(blocks[no_error_mgr]);
UnpinBlocks(blocks[no_error_mgr]);
EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
// The second block manager should avoid using bad directory for new blocks.
std::vector<BufferedBlockMgr2::Block*> no_error_new_blocks;
AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], blocks_per_mgr,
&no_error_new_blocks);
UnpinBlocks(no_error_new_blocks);
for (int i = 0; i < no_error_new_blocks.size(); ++i) {
LOG(INFO) << "Newly created block backed by file "
<< no_error_new_blocks[i]->tmp_file_path();
EXPECT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir));
}
// A new block manager should only use the good dir for backing storage.
BufferedBlockMgr2::Client* new_client;
BufferedBlockMgr2* new_block_mgr =
CreateMgrAndClient(9999, blocks_per_mgr, _block_size, 0, _client_tracker, &new_client);
std::vector<BufferedBlockMgr2::Block*> new_mgr_blocks;
AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
UnpinBlocks(new_mgr_blocks);
for (int i = 0; i < blocks_per_mgr; ++i) {
LOG(INFO) << "New manager Block " << i << " backed by file "
<< new_mgr_blocks[i]->tmp_file_path();
EXPECT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir));
}
}
// Check that allocation error resulting from removal of directory results in blocks
/// being allocated in other directories.
TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
// Set up two buffered block managers with two temporary dirs.
std::vector<string> tmp_dirs = InitMultipleTmpDirs(2);
// Simulate two concurrent queries.
int num_block_mgrs = 2;
int max_num_blocks = 4;
int blocks_per_mgr = max_num_blocks / num_block_mgrs;
// std::vector<RuntimeState*> runtime_states;
std::vector<BufferedBlockMgr2*> block_mgrs;
std::vector<BufferedBlockMgr2::Client*> clients;
CreateMgrsAndClients(0, num_block_mgrs, blocks_per_mgr, _block_size, 0, _client_tracker,
&block_mgrs, &clients);
// Allocate files for all 2x2 combinations by unpinning blocks.
std::vector<vector<BufferedBlockMgr2::Block*>> blocks;
for (int i = 0; i < num_block_mgrs; ++i) {
std::vector<BufferedBlockMgr2::Block*> mgr_blocks;
LOG(INFO) << "Iter " << i;
AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
blocks.push_back(mgr_blocks);
}
const string& bad_dir = tmp_dirs[0];
const string& bad_scratch_subdir = bad_dir + SCRATCH_SUFFIX;
// const string& good_dir = tmp_dirs[1];
// const string& good_scratch_subdir = good_dir + SCRATCH_SUFFIX;
chmod(bad_scratch_subdir.c_str(), 0);
// The block mgr should attempt to allocate space in bad dir for one block, which will
// cause an error when it tries to create/expand the file. It should recover and just
// use the good dir.
UnpinBlocks(blocks[0]);
// Directories remain on active list even when they experience errors.
EXPECT_EQ(2, _test_env->tmp_file_mgr()->num_active_tmp_devices());
// Blocks should not be written to bad dir even if it remains non-writable.
UnpinBlocks(blocks[1]);
// All writes should succeed.
WaitForWrites(block_mgrs);
for (int i = 0; i < blocks.size(); ++i) {
for (int j = 0; j < blocks[i].size(); ++j) {
blocks[i][j]->del();
}
}
}
// Test that block manager fails cleanly when all directories are inaccessible at runtime.
TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
std::vector<string> tmp_dirs = InitMultipleTmpDirs(2);
int max_num_buffers = 2;
BufferedBlockMgr2::Client* client;
BufferedBlockMgr2* block_mgr =
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker, &client);
std::vector<BufferedBlockMgr2::Block*> blocks;
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
for (int i = 0; i < tmp_dirs.size(); ++i) {
const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
chmod(tmp_scratch_subdir.c_str(), 0);
}
for (int i = 0; i < blocks.size(); ++i) {
EXPECT_FALSE(blocks[i]->unpin().ok());
}
}
// Create two clients with different number of reserved buffers.
TEST_F(BufferedBlockMgrTest, MultipleClients) {
Status status;
int client1_buffers = 3;
int client2_buffers = 5;
int max_num_buffers = client1_buffers + client2_buffers;
const int block_size = 1024;
RuntimeState* runtime_state;
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
BufferedBlockMgr2::Client* client1 = NULL;
BufferedBlockMgr2::Client* client2 = NULL;
status = block_mgr->register_client(client1_buffers, _client_tracker, runtime_state, &client1);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client1 != NULL);
status = block_mgr->register_client(client2_buffers, _client_tracker, runtime_state, &client2);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client2 != NULL);
// Reserve client 1's and 2's buffers. They should succeed.
bool reserved = block_mgr->try_acquire_tmp_reservation(client1, 1);
EXPECT_TRUE(reserved);
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
EXPECT_TRUE(reserved);
std::vector<BufferedBlockMgr2::Block*> client1_blocks;
// Allocate all of client1's reserved blocks, they should all succeed.
AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
// Try allocating one more, that should fail.
BufferedBlockMgr2::Block* block;
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
// Trying to reserve should also fail.
reserved = block_mgr->try_acquire_tmp_reservation(client1, 1);
EXPECT_FALSE(reserved);
// Allocate all of client2's reserved blocks, these should succeed.
std::vector<BufferedBlockMgr2::Block*> client2_blocks;
AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
// Try allocating one more from client 2, that should fail.
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
// Unpin one block from client 1.
status = client1_blocks[0]->unpin();
EXPECT_TRUE(status.ok());
// Client 2 should still not be able to allocate.
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
// Client 2 should still not be able to reserve.
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
EXPECT_FALSE(reserved);
// Client 1 should be able to though.
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
// Unpin two of client 1's blocks (client 1 should have 3 unpinned blocks now).
status = client1_blocks[1]->unpin();
EXPECT_TRUE(status.ok());
status = client1_blocks[2]->unpin();
EXPECT_TRUE(status.ok());
// Clear client 1's reservation
block_mgr->clear_reservations(client1);
// Client 2 should be able to reserve 1 buffers now (there are 2 left);
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
EXPECT_TRUE(reserved);
// Client one can only pin 1.
bool pinned;
status = client1_blocks[0]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
// Can't get this one.
status = client1_blocks[1]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_FALSE(pinned);
// Client 2 can pick up the one reserved buffer
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
// But not a second
BufferedBlockMgr2::Block* block2;
status = block_mgr->get_new_block(client2, NULL, &block2);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block2 == NULL);
// Unpin client 2's block it got from the reservation. Sine this is a tmp
// reservation, client 1 can pick it up again (it is not longer reserved).
status = block->unpin();
EXPECT_TRUE(status.ok());
status = client1_blocks[1]->pin(&pinned);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(pinned);
TearDownMgrs();
}
// Create two clients with different number of reserved buffers and some additional.
TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) {
Status status;
int client1_buffers = 1;
int client2_buffers = 1;
int max_num_buffers = client1_buffers + client2_buffers + 2;
const int block_size = 1024;
RuntimeState* runtime_state;
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
BufferedBlockMgr2::Client* client1 = NULL;
BufferedBlockMgr2::Client* client2 = NULL;
BufferedBlockMgr2::Block* block = NULL;
status = block_mgr->register_client(client1_buffers, _client_tracker, runtime_state, &client1);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client1 != NULL);
status = block_mgr->register_client(client2_buffers, _client_tracker, runtime_state, &client2);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client2 != NULL);
std::vector<BufferedBlockMgr2::Block*> client1_blocks;
// Allocate all of client1's reserved blocks, they should all succeed.
AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
// Allocate all of client2's reserved blocks, these should succeed.
std::vector<BufferedBlockMgr2::Block*> client2_blocks;
AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
// We have two spare buffers now. Each client should be able to allocate it.
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
// Now we are completely full, no one should be able to allocate a new block.
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
TearDownMgrs();
}
// Create two clients causing oversubscription.
TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
Status status;
int client1_buffers = 1;
int client2_buffers = 2;
int max_num_buffers = 2;
const int block_size = 1024;
RuntimeState* runtime_state;
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
BufferedBlockMgr2::Client* client1 = NULL;
BufferedBlockMgr2::Client* client2 = NULL;
BufferedBlockMgr2::Block* block = NULL;
status = block_mgr->register_client(client1_buffers, _client_tracker, runtime_state, &client1);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client1 != NULL);
status = block_mgr->register_client(client2_buffers, _client_tracker, runtime_state, &client2);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(client2 != NULL);
// Client one allocates first block, should work.
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
// Client two allocates first block, should work.
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block != NULL);
// At this point we've used both buffers. Client one reserved one so subsequent
// calls should fail with no error (but returns no block).
status = block_mgr->get_new_block(client1, NULL, &block);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(block == NULL);
// Allocate with client two. Since client two reserved 2 buffers, this should fail
// with MEM_LIMIT_EXCEEDED.
status = block_mgr->get_new_block(client2, NULL, &block);
EXPECT_TRUE(status.is_mem_limit_exceeded());
TearDownMgrs();
}
TEST_F(BufferedBlockMgrTest, SingleRandom_plain) {
TestRandomInternalSingle(1024);
TestRandomInternalSingle(8 * 1024);
TestRandomInternalSingle(8 * 1024 * 1024);
}
TEST_F(BufferedBlockMgrTest, Multi2Random_plain) {
TestRandomInternalMulti(2, 1024);
TestRandomInternalMulti(2, 8 * 1024);
TestRandomInternalMulti(2, 8 * 1024 * 1024);
}
TEST_F(BufferedBlockMgrTest, Multi4Random_plain) {
TestRandomInternalMulti(4, 1024);
TestRandomInternalMulti(4, 8 * 1024);
TestRandomInternalMulti(4, 8 * 1024 * 1024);
}
// TODO: Enable when we improve concurrency/scalability of block mgr.
TEST_F(BufferedBlockMgrTest, DISABLED_Multi8Random_plain) {
TestRandomInternalMulti(8, 1024);
}
TEST_F(BufferedBlockMgrTest, CreateDestroyMulti) {
CreateDestroyMulti();
}
} // end namespace doris
int main(int argc, char** argv) {
// std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
// if (!doris::config::init(conffile.c_str(), false)) {
// fprintf(stderr, "error read config file. \n");
// return -1;
// }
doris::config::query_scratch_dirs = "/tmp";
// doris::config::max_free_io_buffers = 128;
doris::config::read_size = 8388608;
doris::config::min_buffer_size = 1024;
doris::config::disable_mem_pools = false;
doris::init_glog("be-test");
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
doris::DiskInfo::init();
return RUN_ALL_TESTS();
}