blob: 58744a6c07f178f154815153229123b39207cf08 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdlib>
#include <limits>
#include <string>
#include <vector>
#include <boost/bind.hpp>
#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/unordered_map.hpp>
#include "codegen/llvm-codegen.h"
#include "common/atomic.h"
#include "common/init.h"
#include "common/object-pool.h"
#include "runtime/bufferpool/buffer-allocator.h"
#include "runtime/bufferpool/buffer-pool-internal.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/query-state.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "testutil/cpu-util.h"
#include "testutil/death-test-util.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/blocking-queue.h"
#include "util/filesystem-util.h"
#include "util/metrics.h"
#include "util/spinlock.h"
#include "common/names.h"
using boost::filesystem::directory_iterator;
using std::mt19937;
using std::uniform_int_distribution;
using std::uniform_real_distribution;
DECLARE_bool(disk_spill_encryption);
// Note: This is the default scratch dir created by impala.
// FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
const string SCRATCH_DIR = "/tmp/impala-scratch";
// This suffix is appended to a tmp dir
const string SCRATCH_SUFFIX = "/impala-scratch";
namespace impala {
using BufferHandle = BufferPool::BufferHandle;
using ClientHandle = BufferPool::ClientHandle;
using FileGroup = TmpFileMgr::FileGroup;
using PageHandle = BufferPool::PageHandle;
class BufferPoolTest : public ::testing::Test {
public:
virtual void SetUp() {
test_env_.reset(new TestEnv);
// Don't create global buffer pool in 'test_env_' - we'll create a buffer pool in
// each test function.
test_env_->DisableBufferPool();
ASSERT_OK(test_env_->Init());
RandTestUtil::SeedRng("BUFFER_POOL_TEST_SEED", &rng_);
}
virtual void TearDown() {
for (auto entry : query_reservations_) {
ReservationTracker* tracker = entry.second;
tracker->Close();
}
for (TmpFileMgr::FileGroup* file_group : file_groups_) {
file_group->Close();
}
global_reservations_.Close();
obj_pool_.Clear();
// Tests modify permissions, so make sure we can delete if they didn't clean up.
for (string created_tmp_dir : created_tmp_dirs_) {
chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
}
ASSERT_OK(FileSystemUtil::RemovePaths(created_tmp_dirs_));
created_tmp_dirs_.clear();
CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
}
/// The minimum buffer size used in most tests.
const static int64_t TEST_BUFFER_LEN = 1024;
/// Test helper to simulate registering then deregistering a number of queries with
/// the given initial reservation and reservation limit. 'rng' is used to generate
/// any random numbers needed.
void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
int64_t initial_query_reservation, int64_t query_reservation_limit, mt19937* rng);
/// Create and destroy a page multiple times.
void CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
ReservationTracker* parent_tracker, int num_ops);
protected:
/// Reinitialize test_env_ to have multiple temporary directories.
vector<string> InitMultipleTmpDirs(int num_dirs) {
vector<string> tmp_dirs;
for (int i = 0; i < num_dirs; ++i) {
const string& dir = Substitute("/tmp/buffer-pool-test.$0", i);
// Fix permissions in case old directories were left from previous runs of test.
chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
tmp_dirs.push_back(dir);
created_tmp_dirs_.push_back(dir);
}
test_env_.reset(new TestEnv);
test_env_->DisableBufferPool();
test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
EXPECT_OK(test_env_->Init());
EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
return tmp_dirs;
}
static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
/// Helper function to create one reservation tracker per query.
ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
lock_guard<SpinLock> l(query_reservations_lock_);
ReservationTracker* tracker = query_reservations_[query_id];
if (tracker != NULL) return tracker;
tracker = obj_pool_.Add(new ReservationTracker());
query_reservations_[query_id] = tracker;
return tracker;
}
RuntimeProfile* NewProfile() {
return RuntimeProfile::Create(&obj_pool_, "test profile");
}
/// Create a new file group with the default configs.
TmpFileMgr::FileGroup* NewFileGroup() {
TmpFileMgr::FileGroup* file_group =
obj_pool_.Add(new TmpFileMgr::FileGroup(test_env_->tmp_file_mgr(),
test_env_->exec_env()->disk_io_mgr(), NewProfile(), TUniqueId()));
file_groups_.push_back(file_group);
return file_group;
}
// Helper to check if the page is evicted.
bool IsEvicted(BufferPool::PageHandle* page) {
lock_guard<SpinLock> pl(page->page_->buffer_lock);
return !page->page_->buffer.is_open();
}
int NumEvicted(vector<BufferPool::PageHandle>& pages) {
int num_evicted = 0;
for (PageHandle& page : pages) {
if (IsEvicted(&page)) ++num_evicted;
}
return num_evicted;
}
/// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to
/// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
/// If 'randomize_core' is true, will switch thread between cores randomly before
/// each allocation.
void AllocateBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
int64_t max_buffer_size, int64_t total_bytes,
vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
int64_t curr_buffer_size = max_buffer_size;
int64_t bytes_remaining = total_bytes;
while (bytes_remaining > 0) {
while (curr_buffer_size > client->GetUnusedReservation()) curr_buffer_size /= 2;
if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
buffers->emplace_back();
ASSERT_OK(pool->AllocateBuffer(client, curr_buffer_size, &buffers->back()));
bytes_remaining -= curr_buffer_size;
}
}
/// Do a temporary test allocation. Return the status of AllocateBuffer().
Status AllocateAndFree(BufferPool* pool, ClientHandle* client, int64_t len) {
BufferHandle tmp;
RETURN_IF_ERROR(pool->AllocateBuffer(client, len, &tmp));
pool->FreeBuffer(client, &tmp);
return Status::OK();
}
/// Create pages of varying sizes at most 'max_page_size' that add up to
/// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
/// If 'randomize_core' is true, will switch thread between cores randomly before
/// each allocation.
void CreatePages(BufferPool* pool, BufferPool::ClientHandle* client,
int64_t max_page_size, int64_t total_bytes, vector<BufferPool::PageHandle>* pages,
bool randomize_core = false) {
ASSERT_GE(client->GetUnusedReservation(), total_bytes);
int64_t curr_page_size = max_page_size;
int64_t bytes_remaining = total_bytes;
while (bytes_remaining > 0) {
while (curr_page_size > client->GetUnusedReservation()) curr_page_size /= 2;
pages->emplace_back();
if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
ASSERT_OK(pool->CreatePage(client, curr_page_size, &pages->back()));
bytes_remaining -= curr_page_size;
}
}
/// Free all the 'buffers' and clear the vector.
/// If 'randomize_core' is true, will switch thread between cores randomly before
/// each free.
void FreeBuffers(BufferPool* pool, BufferPool::ClientHandle* client,
vector<BufferPool::BufferHandle>* buffers, bool randomize_core = false) {
for (auto& buffer : *buffers) {
if (randomize_core) CpuTestUtil::PinToRandomCore(&rng_);
pool->FreeBuffer(client, &buffer);
}
buffers->clear();
}
Status PinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) {
for (auto& page : *pages) RETURN_IF_ERROR(pool->Pin(client, &page));
return Status::OK();
}
/// Unpin all of 'pages'. If 'delay_between_unpins_ms' > 0, sleep between unpins.
void UnpinAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages,
int delay_between_unpins_ms = 0) {
for (auto& page : *pages) {
pool->Unpin(client, &page);
if (delay_between_unpins_ms > 0) SleepForMs(delay_between_unpins_ms);
}
}
void DestroyAll(BufferPool* pool, ClientHandle* client, vector<PageHandle>* pages) {
for (auto& page : *pages) pool->DestroyPage(client, &page);
}
/// Write some deterministically-generated sentinel values to pages or buffers. The same
/// data is written each time for objects[i], based on start_num + i.
template <typename T>
void WriteData(const vector<T>& objects, int start_num) {
WriteOrVerifyData(objects, start_num, true);
}
template <typename T>
void WriteData(const T& object, int val) {
return WriteOrVerifyData(object, val, true);
}
/// Verify data written by WriteData().
template <typename T>
void VerifyData(const vector<T>& objects, int start_num) {
WriteOrVerifyData(objects, start_num, false);
}
template <typename T>
void VerifyData(const T& object, int val) {
return WriteOrVerifyData(object, val, false);
}
/// Implemention of WriteData() and VerifyData().
template <typename T>
void WriteOrVerifyData(const vector<T>& objects, int start_num, bool write) {
for (int i = 0; i < objects.size(); ++i) {
WriteOrVerifyData(objects[i], i + start_num, write);
}
}
template <typename T>
void WriteOrVerifyData(const T& object, int val, bool write) {
// Only write sentinel values to start and end of buffer to make writing and
// verification cheap.
MemRange mem = GetMemRange(object);
uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data());
uint64_t* end_word =
reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]);
if (write) {
*start_word = val;
*end_word = ~val;
} else {
EXPECT_EQ(*start_word, val);
EXPECT_EQ(*end_word, ~val);
}
}
MemRange GetMemRange(const BufferHandle& buffer) { return buffer.mem_range(); }
MemRange GetMemRange(const PageHandle& page) {
const BufferHandle* buffer;
EXPECT_OK(page.GetBuffer(&buffer));
return buffer->mem_range();
}
/// Set the maximum number of scavenge attempts that the pool's allocator wil do.
void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
pool->allocator()->set_max_scavenge_attempts(max_attempts);
}
void WaitForAllWrites(ClientHandle* client) { client->impl_->WaitForAllWrites(); }
// Remove write permissions on scratch files. Return # of scratch files.
static int RemoveScratchPerms() {
int num_files = 0;
directory_iterator dir_it(SCRATCH_DIR);
for (; dir_it != directory_iterator(); ++dir_it) {
++num_files;
EXPECT_EQ(0, chmod(dir_it->path().c_str(), 0));
}
return num_files;
}
// Remove permissions for the temporary file at 'path' - all subsequent writes
// to the file should fail. Expects backing file has already been allocated.
static void DisableBackingFile(const string& path) {
EXPECT_GT(path.size(), 0);
EXPECT_EQ(0, chmod(path.c_str(), 0));
LOG(INFO) << "Injected fault by removing file permissions " << path;
}
/// Write out a bunch of nonsense to replace the file's current data.
static void CorruptBackingFile(const string& path) {
EXPECT_GT(path.size(), 0);
FILE* file = fopen(path.c_str(), "rb+");
EXPECT_EQ(0, fseek(file, 0, SEEK_END));
int64_t size = ftell(file);
EXPECT_EQ(0, fseek(file, 0, SEEK_SET));
for (int64_t i = 0; i < size; ++i) fputc(123, file);
fclose(file);
LOG(INFO) << "Injected fault by corrupting file " << path;
}
/// Truncate the file to 0 bytes.
static void TruncateBackingFile(const string& path) {
EXPECT_GT(path.size(), 0);
EXPECT_EQ(0, truncate(path.c_str(), 0));
LOG(INFO) << "Injected fault by truncating file " << path;
}
// Return whether a pin is in flight for the page.
static bool PinInFlight(PageHandle* page) {
return page->page_->pin_in_flight;
}
// Return the path of the temporary file backing the page.
static string TmpFilePath(PageHandle* page) {
return page->page_->write_handle->TmpFilePath();
}
// Check that the file backing the page has dir as a prefix of its path.
static bool PageInDir(PageHandle* page, const string& dir) {
return TmpFilePath(page).find(dir) == 0;
}
// Find a page in the list that is backed by a file with the given directory as prefix
// of its path.
static PageHandle* FindPageInDir(vector<PageHandle>& pages, const string& dir) {
for (PageHandle& page : pages) {
if (PageInDir(&page, dir)) return &page;
}
return NULL;
}
/// Parameterised test implementations.
void TestBufferAllocation(bool reserved);
void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
void TestEvictionPolicy(int64_t page_size);
void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
void TestQueryTeardown(bool write_error);
void TestWriteError(int write_delay_ms);
void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
void TestRandomInternalMulti(int num_threads, int64_t buffer_len, bool multiple_pins);
static const int SINGLE_THREADED_TID = -1;
void TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins);
ObjectPool obj_pool_;
ReservationTracker global_reservations_;
boost::scoped_ptr<TestEnv> test_env_;
/// Per-test random number generator. Seeded before every test.
mt19937 rng_;
/// The file groups created - closed at end of each test.
vector<TmpFileMgr::FileGroup*> file_groups_;
/// Paths of temporary directories created during tests - deleted at end of test.
vector<string> created_tmp_dirs_;
/// Map from query_id to the reservation tracker for that query. Reads and modifications
/// of the map are protected by query_reservations_lock_.
unordered_map<int64_t, ReservationTracker*> query_reservations_;
SpinLock query_reservations_lock_;
};
const int64_t BufferPoolTest::TEST_BUFFER_LEN;
void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit,
mt19937* rng) {
Status status;
int clients_per_query = 32;
BufferPool::ClientHandle* clients[num_queries];
for (int i = 0; i < num_queries; ++i) {
int64_t query_id = QueryId(query_id_hi, i);
// Initialize a tracker for a new query.
ReservationTracker* query_reservation = GetQueryReservationTracker(query_id);
query_reservation->InitChildTracker(
NULL, &global_reservations_, NULL, query_reservation_limit);
// Test that closing then reopening child tracker works.
query_reservation->Close();
query_reservation->InitChildTracker(
NULL, &global_reservations_, NULL, query_reservation_limit);
EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
clients[i] = new BufferPool::ClientHandle[clients_per_query];
for (int j = 0; j < clients_per_query; ++j) {
int64_t initial_client_reservation =
initial_query_reservation / clients_per_query + j
< initial_query_reservation % clients_per_query;
// Reservation limit can be anything greater or equal to the initial reservation.
int64_t client_reservation_limit = initial_client_reservation + (*rng)() % 100000;
string name = Substitute("Client $0 for query $1", j, query_id);
EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL,
client_reservation_limit, NewProfile(), &clients[i][j]));
EXPECT_TRUE(clients[i][j].IncreaseReservationToFit(initial_client_reservation));
}
for (int j = 0; j < clients_per_query; ++j) {
ASSERT_TRUE(clients[i][j].is_registered());
}
}
// Deregister clients then query.
for (int i = 0; i < num_queries; ++i) {
for (int j = 0; j < clients_per_query; ++j) {
pool->DeregisterClient(&clients[i][j]);
ASSERT_FALSE(clients[i][j].is_registered());
}
delete[] clients[i];
GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
}
}
/// Test that queries and clients can be registered and deregistered with the reservation
/// trackers and the buffer pool.
TEST_F(BufferPoolTest, BasicRegistration) {
int num_concurrent_queries = 1024;
int64_t sum_initial_reservations = 4;
int64_t reservation_limit = 1024;
// Need enough buffers for all initial reservations.
int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
RegisterQueriesAndClients(&pool, 0, num_concurrent_queries, sum_initial_reservations,
reservation_limit, &rng_);
ASSERT_EQ(global_reservations_.GetUsedReservation(), 0);
ASSERT_EQ(global_reservations_.GetChildReservations(), 0);
ASSERT_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test that queries and clients can be registered and deregistered by concurrent
/// threads.
TEST_F(BufferPoolTest, ConcurrentRegistration) {
int queries_per_thread = 64;
int num_threads = 64;
int num_concurrent_queries = queries_per_thread * num_threads;
int64_t sum_initial_reservations = 4;
int64_t reservation_limit = 1024;
// Need enough buffers for all initial reservations.
int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
vector<mt19937> thread_rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_);
// Launch threads, each with a different set of query IDs.
thread_group workers;
for (int i = 0; i < num_threads; ++i) {
workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
&pool, i, queries_per_thread, sum_initial_reservations, reservation_limit,
&thread_rngs[i])));
}
workers.join_all();
// All the reservations should be released at this point.
ASSERT_EQ(global_reservations_.GetUsedReservation(), 0);
ASSERT_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test basic page handle creation.
TEST_F(BufferPoolTest, PageCreation) {
// Allocate many pages, each a power-of-two multiple of the minimum page length.
int num_pages = 16;
int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
int64_t total_mem = 2 * 2 * max_page_len;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
total_mem, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(total_mem));
vector<BufferPool::PageHandle> handles(num_pages);
// Create pages of various valid sizes.
for (int i = 0; i < num_pages; ++i) {
int size_multiple = 1 << i;
int64_t page_len = TEST_BUFFER_LEN * size_multiple;
int64_t used_before = client.GetUsedReservation();
ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
ASSERT_TRUE(handles[i].is_open());
ASSERT_TRUE(handles[i].is_pinned());
const BufferHandle* buffer;
ASSERT_OK(handles[i].GetBuffer(&buffer));
ASSERT_TRUE(buffer->data() != NULL);
ASSERT_EQ(handles[i].len(), page_len);
ASSERT_EQ(buffer->len(), page_len);
ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
}
// Close the handles and check memory consumption.
for (int i = 0; i < num_pages; ++i) {
int64_t used_before = client.GetUsedReservation();
int page_len = handles[i].len();
pool.DestroyPage(&client, &handles[i]);
ASSERT_EQ(client.GetUsedReservation(), used_before - page_len);
}
pool.DeregisterClient(&client);
// All the reservations should be released at this point.
ASSERT_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
TEST_F(BufferPoolTest, ReservedBufferAllocation) {
TestBufferAllocation(true);
}
TEST_F(BufferPoolTest, UnreservedBufferAllocation) {
TestBufferAllocation(false);
}
void BufferPoolTest::TestBufferAllocation(bool reserved) {
// Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
const int NUM_BUFFERS = 16;
const int64_t MAX_BUFFER_LEN = TEST_BUFFER_LEN << (NUM_BUFFERS - 1);
// Total memory required to allocate TEST_BUFFER_LEN, 2*TEST_BUFFER_LEN, ...,
// MAX_BUFFER_LEN.
const int64_t TOTAL_MEM = 2 * MAX_BUFFER_LEN - TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
TOTAL_MEM, NewProfile(), &client));
if (reserved) ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
vector<BufferPool::BufferHandle> handles(NUM_BUFFERS);
// Create buffers of various valid sizes.
int64_t total_allocated = 0;
for (int i = 0; i < NUM_BUFFERS; ++i) {
int size_multiple = 1 << i;
int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
int64_t used_before = client.GetUsedReservation();
if (reserved) {
ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
} else {
// Reservation should be automatically increased.
ASSERT_OK(pool.AllocateUnreservedBuffer(&client, buffer_len, &handles[i]));
}
total_allocated += buffer_len;
ASSERT_TRUE(handles[i].is_open());
ASSERT_TRUE(handles[i].data() != NULL);
ASSERT_EQ(handles[i].len(), buffer_len);
ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len);
// Check that pool-wide values are updated correctly.
EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
EXPECT_EQ(0, pool.GetNumFreeBuffers());
EXPECT_EQ(0, pool.GetFreeBufferBytes());
}
if (!reserved) {
// Allocate all of the memory and test the failure path for unreserved allocations.
BufferPool::BufferHandle tmp_handle;
ASSERT_OK(pool.AllocateUnreservedBuffer(&client, TEST_BUFFER_LEN, &tmp_handle));
ASSERT_FALSE(tmp_handle.is_open()) << "No reservation for buffer";
}
// Close the handles and check memory consumption.
for (int i = 0; i < NUM_BUFFERS; ++i) {
int64_t used_before = client.GetUsedReservation();
int buffer_len = handles[i].len();
pool.FreeBuffer(&client, &handles[i]);
ASSERT_EQ(client.GetUsedReservation(), used_before - buffer_len);
}
pool.DeregisterClient(&client);
// All the reservations should be released at this point.
ASSERT_EQ(global_reservations_.GetReservation(), 0);
// But freed memory is not released to the system immediately.
EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
EXPECT_EQ(NUM_BUFFERS, pool.GetNumFreeBuffers());
EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes());
global_reservations_.Close();
}
// Test that the buffer pool correctly reports the number of clean pages.
TEST_F(BufferPoolTest, CleanPageStats) {
const int MAX_NUM_BUFFERS = 4;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
WriteData(pages, 0);
EXPECT_FALSE(client.has_unpinned_pages());
// Pages don't start off clean.
EXPECT_EQ(0, pool.GetNumCleanPages());
EXPECT_EQ(0, pool.GetCleanPageBytes());
// Unpin pages and wait until they're written out and therefore clean.
UnpinAll(&pool, &client, &pages);
EXPECT_TRUE(client.has_unpinned_pages());
WaitForAllWrites(&client);
EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages());
EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes());
EXPECT_TRUE(client.has_unpinned_pages());
// Do an allocation to force eviction of one page.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages());
EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes());
EXPECT_TRUE(client.has_unpinned_pages());
// Re-pin all the pages - none will be clean afterwards.
ASSERT_OK(PinAll(&pool, &client, &pages));
VerifyData(pages, 0);
EXPECT_EQ(0, pool.GetNumCleanPages());
EXPECT_EQ(0, pool.GetCleanPageBytes());
EXPECT_FALSE(client.has_unpinned_pages());
DestroyAll(&pool, &client, &pages);
EXPECT_FALSE(client.has_unpinned_pages());
pool.DeregisterClient(&client);
global_reservations_.Close();
}
/// Test that the buffer pool respects the clean page limit with all pages in
/// the same arena.
TEST_F(BufferPoolTest, CleanPageLimitOneArena) {
TestCleanPageLimit(0, false);
TestCleanPageLimit(2, false);
TestCleanPageLimit(4, false);
}
/// Test that the buffer pool respects the clean page limit with pages spread across
/// different arenas.
TEST_F(BufferPoolTest, CleanPageLimitRandomArenas) {
TestCleanPageLimit(0, true);
TestCleanPageLimit(2, true);
TestCleanPageLimit(4, true);
}
void BufferPoolTest::TestCleanPageLimit(int max_clean_pages, bool randomize_core) {
const int MAX_NUM_BUFFERS = 4;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
const int max_clean_page_bytes = max_clean_pages * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
MetricGroup tmp_metrics("test-metrics");
BufferPool pool(&tmp_metrics, TEST_BUFFER_LEN, TOTAL_MEM, max_clean_page_bytes);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
if (!randomize_core) CpuTestUtil::PinToCore(0);
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages, randomize_core);
WriteData(pages, 0);
// Unpin pages and wait until they're written out and therefore clean.
UnpinAll(&pool, &client, &pages);
WaitForAllWrites(&client);
EXPECT_EQ(max_clean_pages, pool.GetNumCleanPages());
EXPECT_EQ(max_clean_page_bytes, pool.GetCleanPageBytes());
// Do an allocation to force a buffer to be reclaimed from somewhere.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
if (randomize_core) {
// We will either evict a clean page or reclaim a free buffer, depending on the
// arena that we pick.
EXPECT_LE(pool.GetNumCleanPages(), max_clean_pages);
EXPECT_LE(pool.GetCleanPageBytes(), max_clean_page_bytes);
} else {
// We will reclaim one of the free buffers in arena 0.
EXPECT_EQ(min(MAX_NUM_BUFFERS - 1, max_clean_pages), pool.GetNumCleanPages());
const int64_t expected_clean_page_bytes =
min<int64_t>((MAX_NUM_BUFFERS - 1) * TEST_BUFFER_LEN, max_clean_page_bytes);
EXPECT_EQ(expected_clean_page_bytes, pool.GetCleanPageBytes());
}
// Re-pin all the pages - none will be clean afterwards.
ASSERT_OK(PinAll(&pool, &client, &pages));
VerifyData(pages, 0);
EXPECT_EQ(0, pool.GetNumCleanPages());
EXPECT_EQ(0, pool.GetCleanPageBytes());
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
global_reservations_.Close();
}
/// Test transfer of buffer handles between clients.
TEST_F(BufferPoolTest, BufferTransfer) {
// Each client needs to have enough reservation for a buffer.
const int num_clients = 5;
int64_t total_mem = num_clients * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
BufferPool::ClientHandle clients[num_clients];
BufferPool::BufferHandle handles[num_clients];
for (int i = 0; i < num_clients; ++i) {
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
TEST_BUFFER_LEN, NewProfile(), &clients[i]));
ASSERT_TRUE(clients[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
}
// Transfer the page around between the clients repeatedly in a circle.
ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
uint8_t* data = handles[0].data();
for (int iter = 0; iter < 10; ++iter) {
for (int client = 0; client < num_clients; ++client) {
int next_client = (client + 1) % num_clients;
ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
&clients[next_client], &handles[next_client]));
// Check that the transfer left things in a consistent state.
ASSERT_FALSE(handles[client].is_open());
ASSERT_EQ(0, clients[client].GetUsedReservation());
ASSERT_TRUE(handles[next_client].is_open());
ASSERT_EQ(TEST_BUFFER_LEN, clients[next_client].GetUsedReservation());
// The same underlying buffer should be used.
ASSERT_EQ(data, handles[next_client].data());
}
}
pool.FreeBuffer(&clients[0], &handles[0]);
for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
ASSERT_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
TEST_F(BufferPoolTest, BufferTransferConcurrent) {
// Transfer buffers between threads in a circular fashion. Each client needs to have
// enough reservation for two buffers, since it may receive a buffer before handing
// off the next one.
const int NUM_CLIENTS = 5;
const int64_t TOTAL_MEM = NUM_CLIENTS * TEST_BUFFER_LEN * 2;
global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
BufferPool::ClientHandle clients[NUM_CLIENTS];
BufferPool::BufferHandle handles[NUM_CLIENTS];
SpinLock locks[NUM_CLIENTS]; // Each lock protects the corresponding BufferHandle.
for (int i = 0; i < NUM_CLIENTS; ++i) {
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
TOTAL_MEM, NewProfile(), &clients[i]));
ASSERT_TRUE(clients[i].IncreaseReservationToFit(2 * TEST_BUFFER_LEN));
}
thread_group workers;
for (int thread_idx = 0; thread_idx < NUM_CLIENTS; ++thread_idx) {
workers.add_thread(new thread([&pool, &clients, &handles, &locks, thread_idx] {
// Transfer buffers around between the clients repeatedly in a circle.
BufferHandle handle;
{
lock_guard<SpinLock> l(locks[thread_idx]);
LOG(INFO) << "Allocate from " << (void*)&clients[thread_idx];
ASSERT_OK(pool.AllocateBuffer(
&clients[thread_idx], TEST_BUFFER_LEN, &handle));
}
for (int iter = 0; iter < 100; ++iter) {
int next_thread_idx = (thread_idx + 1) % NUM_CLIENTS;
// Transfer our buffer to the next thread.
{
unique_lock<SpinLock> l(locks[next_thread_idx]);
// Spin until we can add the handle.
while (true) {
if (!handles[next_thread_idx].is_open()) break;
l.unlock();
sched_yield();
l.lock();
}
ASSERT_TRUE(handle.is_open());
ASSERT_OK(pool.TransferBuffer(&clients[thread_idx], &handle,
&clients[next_thread_idx], &handles[next_thread_idx]));
// Check that the transfer left things in a consistent state.
ASSERT_TRUE(handles[next_thread_idx].is_open());
ASSERT_FALSE(handle.is_open());
ASSERT_GE(clients[next_thread_idx].GetUsedReservation(), TEST_BUFFER_LEN);
}
// Get a new buffer from the previous thread.
{
unique_lock<SpinLock> l(locks[thread_idx]);
// Spin until we receive a handle from the previous thread.
while (true) {
if (handles[thread_idx].is_open()) break;
l.unlock();
sched_yield();
l.lock();
}
handle = move(handles[thread_idx]);
}
}
pool.FreeBuffer(&clients[thread_idx], &handle);
}));
}
workers.join_all();
for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
ASSERT_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test basic pinning and unpinning.
TEST_F(BufferPoolTest, Pin) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
// Set up client with enough reservation to pin twice.
int64_t child_reservation = TEST_BUFFER_LEN * 2;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, child_reservation, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
BufferPool::PageHandle handle1, handle2;
// Can pin two minimum sized pages.
const BufferHandle* page_buffer;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1, &page_buffer));
ASSERT_TRUE(handle1.is_open());
ASSERT_TRUE(handle1.is_pinned());
ASSERT_TRUE(page_buffer->data() != NULL);
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2, &page_buffer));
ASSERT_TRUE(handle2.is_open());
ASSERT_TRUE(handle2.is_pinned());
ASSERT_TRUE(page_buffer->data() != NULL);
pool.Unpin(&client, &handle2);
ASSERT_FALSE(handle2.is_pinned());
// Can pin minimum-sized page twice.
ASSERT_OK(pool.Pin(&client, &handle1));
ASSERT_TRUE(handle1.is_pinned());
// Have to unpin twice.
pool.Unpin(&client, &handle1);
ASSERT_TRUE(handle1.is_pinned());
pool.Unpin(&client, &handle1);
ASSERT_FALSE(handle1.is_pinned());
// Can pin double-sized page only once.
BufferPool::PageHandle double_handle;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle, &page_buffer));
ASSERT_TRUE(double_handle.is_open());
ASSERT_TRUE(double_handle.is_pinned());
ASSERT_TRUE(page_buffer->data() != NULL);
// Destroy the pages - test destroying both pinned and unpinned.
pool.DestroyPage(&client, &handle1);
pool.DestroyPage(&client, &handle2);
pool.DestroyPage(&client, &double_handle);
pool.DeregisterClient(&client);
}
// Test the various state transitions possible with async Pin() calls.
TEST_F(BufferPoolTest, AsyncPin) {
const int DATA_SEED = 1234;
// Set up pool with enough reservation to keep two buffers in memory.
const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
PageHandle handle;
const BufferHandle* buffer;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle, &buffer));
WriteData(*buffer, DATA_SEED);
// Pin() on a pinned page just increments the pin count.
ASSERT_OK(pool.Pin(&client, &handle));
EXPECT_EQ(2, handle.pin_count());
EXPECT_FALSE(PinInFlight(&handle));
pool.Unpin(&client, &handle);
pool.Unpin(&client, &handle);
ASSERT_FALSE(handle.is_pinned());
// Calling Pin() then Pin() results in double-pinning.
ASSERT_OK(pool.Pin(&client, &handle));
ASSERT_OK(pool.Pin(&client, &handle));
EXPECT_EQ(2, handle.pin_count());
EXPECT_FALSE(PinInFlight(&handle));
pool.Unpin(&client, &handle);
pool.Unpin(&client, &handle);
ASSERT_FALSE(handle.is_pinned());
// Pin() on a page that isn't evicted pins it immediately.
ASSERT_OK(pool.Pin(&client, &handle));
EXPECT_EQ(1, handle.pin_count());
EXPECT_FALSE(PinInFlight(&handle));
VerifyData(handle, 1234);
pool.Unpin(&client, &handle);
ASSERT_FALSE(handle.is_pinned());
// Force eviction. Pin() on an evicted page starts the write asynchronously.
ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
ASSERT_OK(pool.Pin(&client, &handle));
EXPECT_EQ(1, handle.pin_count());
EXPECT_TRUE(PinInFlight(&handle));
// Block on the pin and verify the buffer.
ASSERT_OK(handle.GetBuffer(&buffer));
EXPECT_FALSE(PinInFlight(&handle));
VerifyData(*buffer, 1234);
// Test that we can unpin while in flight and the data remains valid.
pool.Unpin(&client, &handle);
ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
ASSERT_OK(pool.Pin(&client, &handle));
EXPECT_TRUE(PinInFlight(&handle));
pool.Unpin(&client, &handle);
ASSERT_OK(pool.Pin(&client, &handle));
ASSERT_OK(handle.GetBuffer(&buffer));
VerifyData(*buffer, 1234);
// Evict the page, then destroy while we're pinning it asynchronously.
pool.Unpin(&client, &handle);
ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
ASSERT_OK(pool.Pin(&client, &handle));
pool.DestroyPage(&client, &handle);
pool.DeregisterClient(&client);
}
/// Creating a page or pinning without sufficient reservation should DCHECK.
TEST_F(BufferPoolTest, PinWithoutReservation) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
TEST_BUFFER_LEN, NewProfile(), &client));
BufferPool::PageHandle handle;
IMPALA_ASSERT_DEBUG_DEATH(
discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)), "");
// Should succeed after increasing reservation.
ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
// But we can't pin again.
IMPALA_ASSERT_DEBUG_DEATH(discard_result(pool.Pin(&client, &handle)), "");
pool.DestroyPage(&client, &handle);
pool.DeregisterClient(&client);
}
TEST_F(BufferPoolTest, ExtractBuffer) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
// Set up client with enough reservation for two buffers/pins.
int64_t child_reservation = TEST_BUFFER_LEN * 2;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, child_reservation, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
BufferPool::PageHandle page;
BufferPool::BufferHandle buffer;
// Test basic buffer extraction.
for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
const BufferHandle* page_buffer;
ASSERT_OK(pool.CreatePage(&client, len, &page, &page_buffer));
uint8_t* page_data = page_buffer->data();
ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
ASSERT_EQ(len, buffer.len());
ASSERT_EQ(page_data, buffer.data());
ASSERT_EQ(len, client.GetUsedReservation());
pool.FreeBuffer(&client, &buffer);
ASSERT_EQ(0, client.GetUsedReservation());
}
// Test that ExtractBuffer() accounts correctly for pin count > 1.
const BufferHandle* page_buffer;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page, &page_buffer));
uint8_t* page_data = page_buffer->data();
ASSERT_OK(pool.Pin(&client, &page));
ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation());
ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation());
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
ASSERT_EQ(page_data, buffer.data());
pool.FreeBuffer(&client, &buffer);
ASSERT_EQ(0, client.GetUsedReservation());
// Test that ExtractBuffer() DCHECKs for unpinned pages.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
pool.Unpin(&client, &page);
IMPALA_ASSERT_DEBUG_DEATH(
discard_result(pool.ExtractBuffer(&client, &page, &buffer)), "");
pool.DestroyPage(&client, &page);
pool.DeregisterClient(&client);
}
// Test concurrent creation and destruction of pages.
TEST_F(BufferPoolTest, ConcurrentPageCreation) {
int ops_per_thread = 1024;
// int num_threads = 64;
int num_threads = 1;
// Need enough buffers for all initial reservations.
int total_mem = num_threads * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
// Share a file group between the threads.
TmpFileMgr::FileGroup* file_group = NewFileGroup();
// Launch threads, each with a different set of query IDs.
thread_group workers;
for (int i = 0; i < num_threads; ++i) {
workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
file_group, &global_reservations_, ops_per_thread)));
}
// Build debug string to test concurrent iteration over pages_ list.
for (int i = 0; i < 64; ++i) {
LOG(INFO) << pool.DebugString();
}
workers.join_all();
// All the reservations should be released at this point.
ASSERT_EQ(global_reservations_.GetChildReservations(), 0);
global_reservations_.Close();
}
void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
ReservationTracker* parent_tracker, int num_ops) {
BufferPool::ClientHandle client;
ASSERT_OK(pool->RegisterClient("test client", file_group, parent_tracker, NULL,
TEST_BUFFER_LEN, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
for (int i = 0; i < num_ops; ++i) {
BufferPool::PageHandle handle;
ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
pool->Unpin(&client, &handle);
ASSERT_OK(pool->Pin(&client, &handle));
pool->DestroyPage(&client, &handle);
}
pool->DeregisterClient(&client);
}
/// Test that DCHECK fires when trying to unpin a page with spilling disabled.
TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
global_reservations_.InitRootTracker(NULL, 2 * TEST_BUFFER_LEN);
BufferPool pool(
test_env_->metrics(), TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
BufferPool::PageHandle handle;
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
numeric_limits<int64_t>::max(), NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
ASSERT_OK(pool.Pin(&client, &handle));
// It's ok to Unpin() if the pin count remains positive.
pool.Unpin(&client, &handle);
// We didn't pass in a FileGroup, so spilling is disabled and we can't bring the
// pin count to 0.
IMPALA_ASSERT_DEBUG_DEATH(pool.Unpin(&client, &handle), "");
pool.DestroyPage(&client, &handle);
pool.DeregisterClient(&client);
}
/// Test simple case where pool must evict a page from the same client to fit another.
TEST_F(BufferPoolTest, EvictPageSameClient) {
global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
BufferPool pool(
test_env_->metrics(), TEST_BUFFER_LEN, TEST_BUFFER_LEN, TEST_BUFFER_LEN);
BufferPool::PageHandle handle1, handle2;
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, TEST_BUFFER_LEN, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
// Do not have enough reservations because we pinned the page.
IMPALA_ASSERT_DEBUG_DEATH(
discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)), "");
// We should be able to create a new page after unpinned and evicting the first one.
pool.Unpin(&client, &handle1);
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
pool.DestroyPage(&client, &handle1);
pool.DestroyPage(&client, &handle2);
pool.DeregisterClient(&client);
}
/// Test simple case where pool must evict pages of different sizes.
TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
const int64_t TOTAL_BYTES = 2 * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
BufferPool::PageHandle handle1, handle2;
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, TOTAL_BYTES, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
pool.Unpin(&client, &handle1);
// We must evict the small page to fit the large one.
ASSERT_OK(pool.CreatePage(&client, 2 * TEST_BUFFER_LEN, &handle2));
ASSERT_TRUE(IsEvicted(&handle1));
// We must evict the large page to fit the small one.
pool.Unpin(&client, &handle2);
ASSERT_OK(pool.Pin(&client, &handle1));
ASSERT_TRUE(IsEvicted(&handle2));
pool.DestroyPage(&client, &handle1);
pool.DestroyPage(&client, &handle2);
pool.DeregisterClient(&client);
}
/// Test simple case where pool must evict a page from a one client to fit another one in
/// memory.
TEST_F(BufferPoolTest, EvictPageDifferentClient) {
const int NUM_CLIENTS = 2;
const int64_t TOTAL_BYTES = NUM_CLIENTS * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
BufferPool::ClientHandle clients[NUM_CLIENTS];
for (int i = 0; i < NUM_CLIENTS; ++i) {
ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), NewFileGroup(),
&global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &clients[i]));
ASSERT_TRUE(clients[i].IncreaseReservation(TEST_BUFFER_LEN));
}
// Create a pinned and unpinned page for the first client.
PageHandle handle1, handle2;
const BufferHandle* page_buffer;
ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1, &page_buffer));
const uint8_t TEST_VAL = 123;
memset(
page_buffer->data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
pool.Unpin(&clients[0], &handle1);
ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2));
// Allocating a buffer for the second client requires evicting the unpinned page.
BufferHandle buffer;
ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer));
ASSERT_TRUE(IsEvicted(&handle1));
// Test reading back the first page, which requires swapping buffers again.
pool.Unpin(&clients[0], &handle2);
ASSERT_OK(pool.Pin(&clients[0], &handle1));
ASSERT_TRUE(IsEvicted(&handle2));
ASSERT_OK(handle1.GetBuffer(&page_buffer));
for (int i = 0; i < handle1.len(); ++i) {
EXPECT_EQ(TEST_VAL, page_buffer->data()[i]) << i;
}
// Clean up everything.
pool.DestroyPage(&clients[0], &handle1);
pool.DestroyPage(&clients[0], &handle2);
pool.FreeBuffer(&clients[1], &buffer);
for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
}
/// Regression test for IMPALA-5113 where the page flushing invariant didn't correctly
/// take multiply pinned pages into account.
TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
const int NUM_BUFFERS = 3;
const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
BufferPool::ClientHandle client;
RuntimeProfile* profile = NewProfile();
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
NULL, TOTAL_BYTES, profile, &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_BYTES));
BufferPool::PageHandle handle1, handle2;
BufferPool::BufferHandle buffer;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
pool.Unpin(&client, &handle1);
ASSERT_OK(pool.Pin(&client, &handle2));
ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer));
// We shouldn't need to flush anything to disk since we have only three pages/buffers in
// memory. Rely on DCHECKs to check invariants and check we didn't evict the page.
EXPECT_FALSE(IsEvicted(&handle1)) << handle1.DebugString();
pool.DestroyPage(&client, &handle1);
pool.DestroyPage(&client, &handle2);
pool.FreeBuffer(&client, &buffer);
pool.DeregisterClient(&client);
}
// Constants for TestMemoryReclamation().
const int MEM_RECLAMATION_NUM_CLIENTS = 2;
// Choose a non-power-of two so that AllocateBuffers() will allocate a mix of sizes:
// 32 + 32 + 32 + 8 + 4 + 2 + 1
const int64_t MEM_RECLAMATION_BUFFERS_PER_CLIENT = 127;
const int64_t MEM_RECLAMATION_CLIENT_RESERVATION =
BufferPoolTest::TEST_BUFFER_LEN * MEM_RECLAMATION_BUFFERS_PER_CLIENT;
const int64_t MEM_RECLAMATION_TOTAL_BYTES =
MEM_RECLAMATION_NUM_CLIENTS * MEM_RECLAMATION_CLIENT_RESERVATION;
// Test that we can reclaim buffers and pages from the same arena and from other arenas.
TEST_F(BufferPoolTest, MemoryReclamation) {
global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES,
MEM_RECLAMATION_TOTAL_BYTES);
// Assume that all cores are online. Test various combinations of cores to validate
// that it can reclaim from any other other core.
for (int src = 0; src < CpuInfo::num_cores(); ++src) {
// Limit the max scavenge attempts to force use of the "locked" scavenging sometimes,
// which would otherwise only be triggered by racing threads.
SetMaxScavengeAttempts(&pool, 1 + src % 3);
for (int j = 0; j < 4; ++j) {
int dst = (src + j) % CpuInfo::num_cores();
TestMemoryReclamation(&pool, src, dst);
}
// Test with one fixed and the other randomly changing
TestMemoryReclamation(&pool, src, -1);
TestMemoryReclamation(&pool, -1, src);
}
// Test with both src and dst randomly changing.
TestMemoryReclamation(&pool, -1, -1);
global_reservations_.Close();
}
// Test that we can reclaim buffers and pages from the same arena or a different arena.
// Allocates then frees memory on 'src_core' then allocates on 'dst_core' to force
// reclamation of memory from src_core's free buffer lists and clean page lists.
// If 'src_core' or 'dst_core' is -1, randomly switch between cores instead of sticking
// to a fixed core.
void BufferPoolTest::TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core) {
LOG(INFO) << "TestMemoryReclamation " << src_core << " -> " << dst_core;
const bool rand_src_core = src_core == -1;
const bool rand_dst_core = dst_core == -1;
BufferPool::ClientHandle clients[MEM_RECLAMATION_NUM_CLIENTS];
for (int i = 0; i < MEM_RECLAMATION_NUM_CLIENTS; ++i) {
ASSERT_OK(pool->RegisterClient(Substitute("test client $0", i), NewFileGroup(),
&global_reservations_, NULL, MEM_RECLAMATION_CLIENT_RESERVATION, NewProfile(),
&clients[i]));
ASSERT_TRUE(clients[i].IncreaseReservation(MEM_RECLAMATION_CLIENT_RESERVATION));
}
// Allocate and free the whole pool's buffers on src_core to populate its free lists.
if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
vector<BufferPool::BufferHandle> client_buffers[MEM_RECLAMATION_NUM_CLIENTS];
AllocateBuffers(pool, &clients[0], 32 * TEST_BUFFER_LEN,
MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_src_core);
AllocateBuffers(pool, &clients[1], 32 * TEST_BUFFER_LEN,
MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[1], rand_src_core);
FreeBuffers(pool, &clients[0], &client_buffers[0], rand_src_core);
FreeBuffers(pool, &clients[1], &client_buffers[1], rand_src_core);
// Allocate buffers again on dst_core. Make sure the size is bigger, smaller, and the
// same size as buffers we allocated earlier to we exercise different code paths.
if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
FreeBuffers(pool, &clients[0], &client_buffers[0], rand_dst_core);
// Allocate and unpin the whole pool's buffers as clean pages on src_core to populate
// its clean page lists.
if (!rand_src_core) CpuTestUtil::PinToCore(src_core);
vector<BufferPool::PageHandle> client_pages[MEM_RECLAMATION_NUM_CLIENTS];
CreatePages(pool, &clients[0], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION,
&client_pages[0], rand_src_core);
CreatePages(pool, &clients[1], 32 * TEST_BUFFER_LEN, MEM_RECLAMATION_CLIENT_RESERVATION,
&client_pages[1], rand_src_core);
for (auto& page : client_pages[0]) pool->Unpin(&clients[0], &page);
for (auto& page : client_pages[1]) pool->Unpin(&clients[1], &page);
// Allocate the buffers again to force reclamation of the buffers from the clean pages.
if (!rand_dst_core) CpuTestUtil::PinToCore(dst_core);
AllocateBuffers(pool, &clients[0], 4 * TEST_BUFFER_LEN,
MEM_RECLAMATION_CLIENT_RESERVATION, &client_buffers[0], rand_dst_core);
FreeBuffers(pool, &clients[0], &client_buffers[0]);
// Just for good measure, pin the pages again then destroy them.
for (auto& page : client_pages[0]) {
ASSERT_OK(pool->Pin(&clients[0], &page));
pool->DestroyPage(&clients[0], &page);
}
for (auto& page : client_pages[1]) {
ASSERT_OK(pool->Pin(&clients[1], &page));
pool->DestroyPage(&clients[1], &page);
}
for (BufferPool::ClientHandle& client : clients) pool->DeregisterClient(&client);
}
// Test the eviction policy of the buffer pool. Writes are issued eagerly as pages
// are unpinned, but pages are only evicted from memory when another buffer is
// allocated.
TEST_F(BufferPoolTest, EvictionPolicy) {
TestEvictionPolicy(TEST_BUFFER_LEN);
TestEvictionPolicy(2 * 1024 * 1024);
}
void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
// The eviction policy changes if there are multiple NUMA nodes, because buffers from
// clean pages on the local node are claimed in preference to free buffers on the
// non-local node. The rest of the test assumes that it executes on a single NUMA node.
if (CpuInfo::GetMaxNumNumaNodes() > 1) CpuTestUtil::PinToCore(0);
const int MAX_NUM_BUFFERS = 5;
int64_t total_mem = MAX_NUM_BUFFERS * page_size;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
MetricGroup tmp_metrics("test-metrics");
BufferPool pool(&tmp_metrics, TEST_BUFFER_LEN, total_mem, total_mem);
ClientHandle client;
RuntimeProfile* profile = NewProfile();
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, total_mem, profile, &client));
ASSERT_TRUE(client.IncreaseReservation(total_mem));
RuntimeProfile* buffer_pool_profile = nullptr;
vector<RuntimeProfile*> profile_children;
profile->GetChildren(&profile_children);
for (RuntimeProfile* child : profile_children) {
if (child->name() == "Buffer pool") {
buffer_pool_profile = child;
break;
}
}
ASSERT_TRUE(buffer_pool_profile != nullptr);
RuntimeProfile::Counter* cumulative_bytes_alloced =
buffer_pool_profile->GetCounter("CumulativeAllocationBytes");
RuntimeProfile::Counter* write_ios = buffer_pool_profile->GetCounter("WriteIoOps");
RuntimeProfile::Counter* read_ios = buffer_pool_profile->GetCounter("ReadIoOps");
vector<PageHandle> pages;
CreatePages(&pool, &client, page_size, total_mem, &pages);
WriteData(pages, 0);
// Unpin pages. Writes should be started and memory should not be deallocated.
EXPECT_EQ(total_mem, cumulative_bytes_alloced->value());
EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
UnpinAll(&pool, &client, &pages);
ASSERT_GT(write_ios->value(), 0);
// Re-pin all the pages and validate their data. This should not require reading the
// pages back from disk.
ASSERT_OK(PinAll(&pool, &client, &pages));
ASSERT_EQ(0, read_ios->value());
VerifyData(pages, 0);
// Unpin all pages. Writes should be started again.
int64_t prev_write_ios = write_ios->value();
UnpinAll(&pool, &client, &pages);
ASSERT_GT(write_ios->value(), prev_write_ios);
// Allocate two more buffers. Two unpinned pages must be evicted to make room.
const int NUM_EXTRA_BUFFERS = 2;
vector<BufferHandle> extra_buffers;
AllocateBuffers(
&pool, &client, page_size, page_size * NUM_EXTRA_BUFFERS, &extra_buffers);
// At least two unpinned pages should have been written out.
ASSERT_GE(write_ios->value(), prev_write_ios + NUM_EXTRA_BUFFERS);
// No additional memory should have been allocated - it should have been recycled.
EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
// Check that two pages were evicted.
EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages));
// Free up memory required to pin the original pages again.
FreeBuffers(&pool, &client, &extra_buffers);
ASSERT_OK(PinAll(&pool, &client, &pages));
// We only needed read to back the two evicted pages. Make sure we didn't do extra I/O.
ASSERT_EQ(NUM_EXTRA_BUFFERS, read_ios->value());
VerifyData(pages, 0);
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
global_reservations_.Close();
}
/// Test that we can destroy pages while a disk write is in flight for those pages.
TEST_F(BufferPoolTest, DestroyDuringWrite) {
const int TRIALS = 20;
const int MAX_NUM_BUFFERS = 5;
const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
ClientHandle client;
for (int trial = 0; trial < TRIALS; ++trial) {
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
// Unpin will initiate writes.
UnpinAll(&pool, &client, &pages);
// Writes should still be in flight when pages are deleted.
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
}
}
/// Test teardown of a query while writes are in flight. This was based on a
/// BufferedBlockMgr regression test for IMPALA-2252 where tear-down of the
/// query's RuntimeStates raced with scratch writes. If write_error is true,
/// force writes to hit errors.
void BufferPoolTest::TestQueryTeardown(bool write_error) {
const int64_t TOTAL_BUFFERS = 20;
const int CLIENT_BUFFERS = 10;
const int64_t TOTAL_MEM = TOTAL_BUFFERS * TEST_BUFFER_LEN;
const int64_t CLIENT_MEM = CLIENT_BUFFERS * TEST_BUFFER_LEN;
// Set up a BufferPool in the TestEnv.
test_env_.reset(new TestEnv());
test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
ASSERT_OK(test_env_->Init());
BufferPool* pool = test_env_->exec_env()->buffer_pool();
RuntimeState* state;
ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &state));
ClientHandle client;
ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
state->instance_buffer_reservation(),
obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), CLIENT_MEM,
NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(CLIENT_MEM));
vector<PageHandle> pages;
CreatePages(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &pages);
if (write_error) {
UnpinAll(pool, &client, &pages);
// Allocate more buffers to create memory pressure and force eviction of all the
// unpinned pages.
vector<BufferHandle> tmp_buffers;
AllocateBuffers(pool, &client, TEST_BUFFER_LEN, CLIENT_BUFFERS, &tmp_buffers);
string tmp_file_path = TmpFilePath(pages.data());
FreeBuffers(pool, &client, &tmp_buffers);
ASSERT_OK(PinAll(pool, &client, &pages));
// Remove temporary file to force future writes to that file to fail.
DisableBackingFile(tmp_file_path);
}
// Unpin will initiate writes. If we triggered a write error earlier, some writes may
// go down the error path.
UnpinAll(pool, &client, &pages);
// Tear down the pages, client, and query in the correct order while writes are in
// flight.
DestroyAll(pool, &client, &pages);
pool->DeregisterClient(&client);
test_env_->TearDownQueries();
// All memory should be released from the query.
EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption());
EXPECT_EQ(0, test_env_->exec_env()->buffer_reservation()->GetChildReservations());
}
TEST_F(BufferPoolTest, QueryTeardown) {
TestQueryTeardown(false);
}
TEST_F(BufferPoolTest, QueryTeardownWriteError) {
TestQueryTeardown(true);
}
// Test that the buffer pool handles a write error correctly. Delete the scratch
// directory before an operation that would cause a write and test that subsequent API
// calls return errors as expected.
void BufferPoolTest::TestWriteError(int write_delay_ms) {
int MAX_NUM_BUFFERS = 2;
int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
client.impl_->set_debug_write_delay_ms(write_delay_ms);
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, MAX_NUM_BUFFERS, &pages);
// Unpin two pages here, to ensure that backing storage is allocated in tmp file.
UnpinAll(&pool, &client, &pages);
WaitForAllWrites(&client);
// Repin the pages
ASSERT_OK(PinAll(&pool, &client, &pages));
// Remove permissions to the backing storage so that future writes will fail
ASSERT_GT(RemoveScratchPerms(), 0);
// Give the first write a chance to fail before the second write starts.
const int INTERVAL_MS = 10;
UnpinAll(&pool, &client, &pages, INTERVAL_MS);
WaitForAllWrites(&client);
// Subsequent calls to APIs that require allocating memory should fail: the write error
// is picked up asynchronously.
BufferHandle tmp_buffer;
PageHandle tmp_page;
Status error = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
ASSERT_NE(string::npos, error.msg().msg().find(GetBackendString()));
EXPECT_FALSE(tmp_buffer.is_open());
error = pool.CreatePage(&client, TEST_BUFFER_LEN, &tmp_page);
EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
EXPECT_FALSE(tmp_page.is_open());
error = pool.Pin(&client, pages.data());
EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
EXPECT_FALSE(pages[0].is_pinned());
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
global_reservations_.Close();
}
TEST_F(BufferPoolTest, WriteError) {
TestWriteError(0);
}
// Regression test for IMPALA-4842 - inject a delay in the write to
// reproduce the issue.
TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
TestWriteError(100);
}
// Test error handling when temporary file space cannot be allocated to back an unpinned
// page.
TEST_F(BufferPoolTest, TmpFileAllocateError) {
const int MAX_NUM_BUFFERS = 2;
const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
// Unpin a page, which will trigger a write.
pool.Unpin(&client, pages.data());
WaitForAllWrites(&client);
// Remove permissions to the temporary files - subsequent operations will fail.
ASSERT_GT(RemoveScratchPerms(), 0);
// The write error will happen asynchronously.
pool.Unpin(&client, &pages[1]);
// Write failure causes future operations like Pin() to fail.
WaitForAllWrites(&client);
Status error = pool.Pin(&client, pages.data());
EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, error.code());
EXPECT_FALSE(pages[0].is_pinned());
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
}
// Test that scratch devices are blacklisted after a write error. The query that
// encountered the write error should not allocate more pages on that device, but
// existing pages on the device will remain in use and future queries will use the device.
TEST_F(BufferPoolTest, WriteErrorBlacklist) {
// Set up two file groups with two temporary dirs.
vector<string> tmp_dirs = InitMultipleTmpDirs(2);
// Simulate two concurrent queries.
const int TOTAL_QUERIES = 3;
const int INITIAL_QUERIES = 2;
const int MAX_NUM_PAGES = 6;
const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES;
const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN;
const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
vector<FileGroup*> file_groups;
vector<ClientHandle> clients(TOTAL_QUERIES);
for (int i = 0; i < INITIAL_QUERIES; ++i) {
file_groups.push_back(NewFileGroup());
ASSERT_OK(pool.RegisterClient("test client", file_groups[i], &global_reservations_,
nullptr, MEM_PER_QUERY, NewProfile(), &clients[i]));
ASSERT_TRUE(clients[i].IncreaseReservation(MEM_PER_QUERY));
}
// Allocate files for all 2x2 combinations by unpinning pages.
vector<vector<PageHandle>> pages(TOTAL_QUERIES);
for (int i = 0; i < INITIAL_QUERIES; ++i) {
CreatePages(&pool, &clients[i], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[i]);
WriteData(pages[i], 0);
UnpinAll(&pool, &clients[i], &pages[i]);
for (int j = 0; j < PAGES_PER_QUERY; ++j) {
LOG(INFO) << "Manager " << i << " Block " << j << " backed by file "
<< TmpFilePath(&pages[i][j]);
}
}
for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
const int ERROR_QUERY = 0;
const int NO_ERROR_QUERY = 1;
const string& error_dir = tmp_dirs[0];
const string& good_dir = tmp_dirs[1];
// Delete one file from first scratch dir for first query to trigger an error.
PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir);
ASSERT_TRUE(error_page != NULL) << "Expected a tmp file in dir " << error_dir;
const string& error_file_path = TmpFilePath(error_page);
for (int i = 0; i < INITIAL_QUERIES; ++i) {
ASSERT_OK(PinAll(&pool, &clients[i], &pages[i]));
}
DisableBackingFile(error_file_path);
for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]);
// At least one write should hit an error, but it should be recoverable.
for (int i = 0; i < INITIAL_QUERIES; ++i) WaitForAllWrites(&clients[i]);
// Both clients should still be usable - test the API.
for (int i = 0; i < INITIAL_QUERIES; ++i) {
ASSERT_OK(PinAll(&pool, &clients[i], &pages[i]));
VerifyData(pages[i], 0);
UnpinAll(&pool, &clients[i], &pages[i]);
ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN));
}
// Temporary device with error should still be active.
vector<TmpFileMgr::DeviceId> active_tmp_devices =
test_env_->tmp_file_mgr()->ActiveTmpDevices();
ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
for (int i = 0; i < active_tmp_devices.size(); ++i) {
const string& device_path =
test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
ASSERT_EQ(string::npos, error_dir.find(device_path));
}
// The query that hit the error should only allocate from the device that had no error.
// The other one should continue using both devices, since it didn't encounter a write
// error itself.
vector<PageHandle> error_new_pages;
CreatePages(
&pool, &clients[ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &error_new_pages);
UnpinAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
WaitForAllWrites(&clients[ERROR_QUERY]);
EXPECT_TRUE(FindPageInDir(error_new_pages, good_dir) != NULL);
EXPECT_TRUE(FindPageInDir(error_new_pages, error_dir) == NULL);
for (PageHandle& error_new_page : error_new_pages) {
LOG(INFO) << "Newly created page backed by file " << TmpFilePath(&error_new_page);
EXPECT_TRUE(PageInDir(&error_new_page, good_dir));
}
DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
ASSERT_OK(PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]));
UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
WaitForAllWrites(&clients[NO_ERROR_QUERY]);
EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL);
EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], error_dir) != NULL);
// The second client should use the bad directory for new pages since
// blacklisting is per-query, not global.
vector<PageHandle> no_error_new_pages;
CreatePages(&pool, &clients[NO_ERROR_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY,
&no_error_new_pages);
UnpinAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
WaitForAllWrites(&clients[NO_ERROR_QUERY]);
EXPECT_TRUE(FindPageInDir(no_error_new_pages, good_dir) != NULL);
EXPECT_TRUE(FindPageInDir(no_error_new_pages, error_dir) != NULL);
DestroyAll(&pool, &clients[NO_ERROR_QUERY], &no_error_new_pages);
// A new query should use the both dirs for backing storage.
const int NEW_QUERY = 2;
ASSERT_OK(pool.RegisterClient("new test client", NewFileGroup(), &global_reservations_,
nullptr, MEM_PER_QUERY, NewProfile(), &clients[NEW_QUERY]));
ASSERT_TRUE(clients[NEW_QUERY].IncreaseReservation(MEM_PER_QUERY));
CreatePages(
&pool, &clients[NEW_QUERY], TEST_BUFFER_LEN, MEM_PER_QUERY, &pages[NEW_QUERY]);
UnpinAll(&pool, &clients[NEW_QUERY], &pages[NEW_QUERY]);
WaitForAllWrites(&clients[NEW_QUERY]);
EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], good_dir) != NULL);
EXPECT_TRUE(FindPageInDir(pages[NEW_QUERY], error_dir) != NULL);
for (int i = 0; i < TOTAL_QUERIES; ++i) {
DestroyAll(&pool, &clients[i], &pages[i]);
pool.DeregisterClient(&clients[i]);
}
}
// Test error handling when on-disk data is corrupted and the read fails.
TEST_F(BufferPoolTest, ScratchReadError) {
// Only allow one buffer in memory.
const int64_t TOTAL_MEM = TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
// Simulate different types of error.
enum ErrType {
CORRUPT_DATA, // Overwrite real spilled data with bogus data.
NO_PERMS, // Remove permissions on the scratch file.
TRUNCATE // Truncate the scratch file, destroying spilled data.
};
for (ErrType error_type : {CORRUPT_DATA, NO_PERMS, TRUNCATE}) {
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
PageHandle page;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
// Unpin a page, which will trigger a write.
pool.Unpin(&client, &page);
WaitForAllWrites(&client);
// Force eviction of the page.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
string tmp_file = TmpFilePath(&page);
if (error_type == CORRUPT_DATA) {
CorruptBackingFile(tmp_file);
} else if (error_type == NO_PERMS) {
DisableBackingFile(tmp_file);
} else {
DCHECK_EQ(error_type, TRUNCATE);
TruncateBackingFile(tmp_file);
}
ASSERT_OK(pool.Pin(&client, &page));
// The read is async, so won't bubble up until we block on it with GetBuffer().
const BufferHandle* page_buffer;
Status status = page.GetBuffer(&page_buffer);
if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) {
// Without encryption we can't detect that the data changed.
EXPECT_OK(status);
} else {
// Otherwise the read should fail.
EXPECT_FALSE(status.ok());
}
// Should be able to destroy the page, even though we hit an error.
pool.DestroyPage(&client, &page);
// If the backing file is still enabled, we should still be able to pin and unpin
// pages as normal.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
WriteData(page, 1);
pool.Unpin(&client, &page);
WaitForAllWrites(&client);
if (error_type == NO_PERMS) {
// The error prevents read/write of scratch files - this will fail.
EXPECT_FALSE(pool.Pin(&client, &page).ok());
} else {
// The error does not prevent read/write of scratch files.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
ASSERT_OK(pool.Pin(&client, &page));
VerifyData(page, 1);
}
pool.DestroyPage(&client, &page);
pool.DeregisterClient(&client);
}
}
/// Test that the buffer pool fails cleanly when all scratch directories are inaccessible
/// at runtime.
TEST_F(BufferPoolTest, NoDirsAllocationError) {
vector<string> tmp_dirs = InitMultipleTmpDirs(2);
int MAX_NUM_BUFFERS = 2;
int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
for (int i = 0; i < tmp_dirs.size(); ++i) {
const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
chmod(tmp_scratch_subdir.c_str(), 0);
}
// The error will happen asynchronously.
UnpinAll(&pool, &client, &pages);
WaitForAllWrites(&client);
// Write failure should results in an error getting propagated back to Pin().
for (PageHandle& page : pages) {
Status status = pool.Pin(&client, &page);
EXPECT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code());
}
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
}
// Test that the buffer pool can still create pages when no scratch is present.
TEST_F(BufferPoolTest, NoTmpDirs) {
InitMultipleTmpDirs(0);
const int MAX_NUM_BUFFERS = 3;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
// Unpinning is allowed by the BufferPool interface but we won't start any writes to
// disk because the flushing heuristic does not eagerly start writes when there are no
// active scratch devices.
UnpinAll(&pool, &client, &pages);
WaitForAllWrites(&client);
ASSERT_OK(pool.Pin(&client, pages.data()));
// Allocating another buffer will force a write, which will fail.
BufferHandle tmp_buffer;
Status status = pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &tmp_buffer);
ASSERT_FALSE(status.ok());
ASSERT_EQ(TErrorCode::SCRATCH_ALLOCATION_FAILED, status.code()) << status.msg().msg();
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
}
// Test that the buffer pool can still create pages when spilling is disabled by
// setting scratch_limit = 0.
TEST_F(BufferPoolTest, ScratchLimitZero) {
const int QUERY_BUFFERS = 3;
const int64_t TOTAL_MEM = 100 * TEST_BUFFER_LEN;
const int64_t QUERY_MEM = QUERY_BUFFERS * TEST_BUFFER_LEN;
// Set up a query state with the scratch_limit option in the TestEnv.
test_env_.reset(new TestEnv());
test_env_->SetBufferPoolArgs(TEST_BUFFER_LEN, TOTAL_MEM);
ASSERT_OK(test_env_->Init());
BufferPool* pool = test_env_->exec_env()->buffer_pool();
RuntimeState* state;
TQueryOptions query_options;
query_options.scratch_limit = 0;
ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &state));
ClientHandle client;
ASSERT_OK(pool->RegisterClient("test client", state->query_state()->file_group(),
state->instance_buffer_reservation(),
obj_pool_.Add(new MemTracker(-1, "", state->instance_mem_tracker())), QUERY_MEM,
NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(QUERY_MEM));
vector<PageHandle> pages;
CreatePages(pool, &client, TEST_BUFFER_LEN, QUERY_MEM, &pages);
// Spilling is disabled by the QueryState when scratch_limit is 0, so trying to unpin
// will cause a DCHECK.
IMPALA_ASSERT_DEBUG_DEATH(pool->Unpin(&client, pages.data()), "");
DestroyAll(pool, &client, &pages);
pool->DeregisterClient(&client);
}
TEST_F(BufferPoolTest, SingleRandom) {
TestRandomInternalSingle(8 * 1024, true);
TestRandomInternalSingle(8 * 1024, false);
}
TEST_F(BufferPoolTest, Multi2Random) {
TestRandomInternalMulti(2, 8 * 1024, true);
TestRandomInternalMulti(2, 8 * 1024, false);
}
TEST_F(BufferPoolTest, Multi4Random) {
TestRandomInternalMulti(4, 8 * 1024, true);
TestRandomInternalMulti(4, 8 * 1024, false);
}
TEST_F(BufferPoolTest, Multi8Random) {
TestRandomInternalMulti(8, 8 * 1024, true);
TestRandomInternalMulti(8, 8 * 1024, false);
}
// Single-threaded execution of the TestRandomInternalImpl.
void BufferPoolTest::TestRandomInternalSingle(
int64_t min_buffer_len, bool multiple_pins) {
const int MAX_NUM_BUFFERS = 200;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len;
MetricGroup tmp_metrics("test-metrics");
BufferPool pool(&tmp_metrics, min_buffer_len, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
MemTracker global_tracker(TOTAL_MEM);
TestRandomInternalImpl(
&pool, NewFileGroup(), &global_tracker, &rng_, SINGLE_THREADED_TID, multiple_pins);
global_reservations_.Close();
}
// Multi-threaded execution of the TestRandomInternalImpl.
void BufferPoolTest::TestRandomInternalMulti(
int num_threads, int64_t min_buffer_len, bool multiple_pins) {
const int MAX_NUM_BUFFERS_PER_THREAD = 200;
const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len;
MetricGroup tmp_metrics("test-metrics");
BufferPool pool(&tmp_metrics, min_buffer_len, TOTAL_MEM, TOTAL_MEM);
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
MemTracker global_tracker(TOTAL_MEM);
FileGroup* shared_file_group = NewFileGroup();
thread_group workers;
vector<mt19937> rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_);
for (int i = 0; i < num_threads; ++i) {
workers.add_thread(new thread(
[this, &pool, shared_file_group, &global_tracker, &rngs, i, multiple_pins]() {
TestRandomInternalImpl(
&pool, shared_file_group, &global_tracker, &rngs[i], i, multiple_pins);
}));
}
AtomicInt32 stop_maintenance(0);
thread maintenance_thread([&pool, &stop_maintenance]() {
while (stop_maintenance.Load() == 0) {
pool.Maintenance();
SleepForMs(50);
}
});
workers.join_all();
stop_maintenance.Add(1);
maintenance_thread.join();
global_reservations_.Close();
}
/// Randomly issue AllocateBuffer(), FreeBuffer(), CreatePage(), Pin(), Unpin(), and
/// DestroyPage() calls. All calls made are legal - error conditions are not expected.
/// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID. If
/// 'multiple_pins' is true, pages can be pinned multiple times (useful to test this
/// functionality). Otherwise they are only pinned once (useful to test the case when
/// memory is more committed).
void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_group,
MemTracker* parent_mem_tracker, mt19937* rng, int tid, bool multiple_pins) {
// Encrypting and decrypting is expensive - reduce iterations when encryption is on.
int num_iterations = FLAGS_disk_spill_encryption ? 5000 : 50000;
// All the existing pages and buffers along with the sentinel values written to them.
vector<pair<PageHandle, int>> pages;
vector<pair<BufferHandle, int>> buffers;
/// Pick a power-of-two buffer sizes that are up to 2^4 times the minimum buffer length.
uniform_int_distribution<int> buffer_exponent_dist(0, 4);
ClientHandle client;
ASSERT_OK(pool->RegisterClient(Substitute("$0", tid), file_group, &global_reservations_,
obj_pool_.Add(new MemTracker(-1, "", parent_mem_tracker)), 1L << 48, NewProfile(),
&client));
for (int i = 0; i < num_iterations; ++i) {
if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl;
// Pick an operation.
// New page: 15%
// Pin a page and block waiting for the result: 20%
// Pin a page and let it continue asynchronously: 10%
// Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases).
// Destroy page: 10% (< New page prob. so that number of pages grows over time).
// Allocate buffer: 10%
// Free buffer: 9.9%
// Switch core that the thread is executing on: 0.1%
double p = uniform_real_distribution<double>(0.0, 1.0)(*rng);
if (p < 0.15) {
// Create a new page.
int64_t page_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
if (!client.IncreaseReservationToFit(page_len)) continue;
PageHandle new_page;
ASSERT_OK(pool->CreatePage(&client, page_len, &new_page));
int data = (*rng)();
WriteData(new_page, data);
pages.emplace_back(move(new_page), data);
} else if (p < 0.45) {
// Pin a page asynchronously.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
PageHandle* page = &pages[rand_pick].first;
if (!client.IncreaseReservationToFit(page->len())) continue;
if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page));
// Block on the pin and verify data for sync pins.
if (p < 0.35) VerifyData(*page, pages[rand_pick].second);
} else if (p < 0.70) {
// Unpin a pinned page.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
PageHandle* page = &pages[rand_pick].first;
if (page->is_pinned()) {
VerifyData(*page, pages[rand_pick].second);
pool->Unpin(&client, page);
}
} else if (p < 0.80) {
// Destroy a page.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
auto page_data = move(pages[rand_pick]);
if (page_data.first.is_pinned()) VerifyData(page_data.first, page_data.second);
pages[rand_pick] = move(pages.back());
pages.pop_back();
pool->DestroyPage(&client, &page_data.first);
} else if (p < 0.90) {
// Allocate a buffer. Pick a random power-of-two size that is up to 2^4
// times the minimum buffer length.
int64_t buffer_len = pool->min_buffer_len() << (buffer_exponent_dist)(*rng);
if (!client.IncreaseReservationToFit(buffer_len)) continue;
BufferHandle new_buffer;
ASSERT_OK(pool->AllocateBuffer(&client, buffer_len, &new_buffer));
int data = (*rng)();
WriteData(new_buffer, data);
buffers.emplace_back(move(new_buffer), data);
} else if (p < 0.999) {
// Free a buffer.
if (buffers.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, buffers.size() - 1)(*rng);
auto buffer_data = move(buffers[rand_pick]);
buffers[rand_pick] = move(buffers.back());
buffers.pop_back();
pool->FreeBuffer(&client, &buffer_data.first);
} else {
CpuTestUtil::PinToRandomCore(rng);
}
}
// The client needs to delete all its pages.
for (auto& page : pages) pool->DestroyPage(&client, &page.first);
for (auto& buffer : buffers) pool->FreeBuffer(&client, &buffer.first);
pool->DeregisterClient(&client);
}
/// Test basic SubReservation functionality.
TEST_F(BufferPoolTest, SubReservation) {
const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10;
global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
BufferPool::SubReservation subreservation(&client);
BufferPool::BufferHandle buffer;
// Save and check that the reservation moved as expected.
client.SaveReservation(&subreservation, TEST_BUFFER_LEN);
EXPECT_EQ(0, client.GetUnusedReservation());
EXPECT_EQ(TEST_BUFFER_LEN, subreservation.GetReservation());
// Should not be able to allocate from client since the reservation was moved.
IMPALA_ASSERT_DEBUG_DEATH(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN), "");
// Restore and check that the reservation moved as expected.
client.RestoreReservation(&subreservation, TEST_BUFFER_LEN);
EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation());
EXPECT_EQ(0, subreservation.GetReservation());
// Should be able to allocate from the client after restoring.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
EXPECT_EQ(TEST_BUFFER_LEN, client.GetUnusedReservation());
subreservation.Close();
pool.DeregisterClient(&client);
}
// Check that we can decrease reservation without violating any buffer pool invariants.
TEST_F(BufferPoolTest, DecreaseReservation) {
const int MAX_NUM_BUFFERS = 4;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
nullptr, TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
vector<PageHandle> pages;
CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
WriteData(pages, 0);
// Unpin pages and decrease reservation while the writes are in flight.
UnpinAll(&pool, &client, &pages);
ASSERT_OK(client.DecreaseReservationTo(
numeric_limits<int64_t>::max(), 2 * TEST_BUFFER_LEN));
// Two pages must be clean to stay within reservation
EXPECT_GE(pool.GetNumCleanPages(), 2);
EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation());
// Decrease it further after the pages are evicted.
WaitForAllWrites(&client);
ASSERT_OK(client.DecreaseReservationTo(
numeric_limits<int64_t>::max(), TEST_BUFFER_LEN));
EXPECT_GE(pool.GetNumCleanPages(), 3);
EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation());
// Check that we can still use the reservation.
ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
EXPECT_EQ(1, NumEvicted(pages));
// Check that we can decrease it to zero, with the max decrease applied.
const int64_t MAX_DECREASE = 123;
ASSERT_OK(client.DecreaseReservationTo(MAX_DECREASE, 0));
EXPECT_EQ(TEST_BUFFER_LEN - MAX_DECREASE, client.GetReservation());
ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0));
EXPECT_EQ(0, client.GetReservation());
// Test concurrent increases and decreases do not race. All increases go through
// and each decrease reduces the reservation by DECREASE_SIZE or less.
const int NUM_CONCURRENT_INCREASES = 50;
const int NUM_CONCURRENT_DECREASES = 50;
const int64_t INCREASE_SIZE = 13;
const int64_t DECREASE_SIZE = 7;
const int64_t START_RESERVATION = 1000;
const int64_t MIN_RESERVATION = 500;
ASSERT_TRUE(client.IncreaseReservation(START_RESERVATION));
thread increaser([&] {
for (int i = 0; i < NUM_CONCURRENT_INCREASES; ++i) {
ASSERT_TRUE(client.IncreaseReservation(INCREASE_SIZE));
SleepForMs(0);
}
});
for (int i = 0; i < NUM_CONCURRENT_DECREASES; ++i) {
ASSERT_OK(client.DecreaseReservationTo(DECREASE_SIZE, MIN_RESERVATION));
}
increaser.join();
// All increases and decreased should have been applied.
EXPECT_EQ(START_RESERVATION + INCREASE_SIZE * NUM_CONCURRENT_INCREASES
- DECREASE_SIZE * NUM_CONCURRENT_DECREASES, client.GetReservation());
ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0));
EXPECT_EQ(0, client.GetReservation());
DestroyAll(&pool, &client, &pages);
pool.DeregisterClient(&client);
global_reservations_.Close();
}
// Test concurrent operations using the same client and different buffers.
TEST_F(BufferPoolTest, ConcurrentBufferOperations) {
const int DELETE_THREADS = 2;
const int ALLOCATE_THREADS = 2;
const int NUM_ALLOCATIONS_PER_THREAD = 128;
const int MAX_NUM_BUFFERS = 16;
const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
BufferPool::ClientHandle client;
ASSERT_OK(pool.RegisterClient("test client", nullptr, &global_reservations_, nullptr,
TOTAL_MEM, NewProfile(), &client));
ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
thread_group allocate_threads;
thread_group delete_threads;
AtomicInt64 available_reservation(TOTAL_MEM);
// Queue of buffers to be deleted, along with the first byte of the data in
// the buffer, for validation purposes.
BlockingQueue<pair<uint8_t, BufferHandle>> delete_queue(MAX_NUM_BUFFERS);
// Allocate threads allocate buffers whenever able and enqueue them.
for (int i = 0; i < ALLOCATE_THREADS; ++i) {
allocate_threads.add_thread(new thread([&] {
for (int j = 0; j < NUM_ALLOCATIONS_PER_THREAD; ++j) {
// Try to deduct reservation.
while (true) {
int64_t val = available_reservation.Load();
if (val >= TEST_BUFFER_LEN
&& available_reservation.CompareAndSwap(val, val - TEST_BUFFER_LEN)) {
break;
}
}
BufferHandle buffer;
ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer));
uint8_t first_byte = static_cast<uint8_t>(j % 256);
buffer.data()[0] = first_byte;
delete_queue.BlockingPut(pair<uint8_t, BufferHandle>(first_byte, move(buffer)));
}
}));
}
// Delete threads pull buffers off the queue and free them.
for (int i = 0; i < DELETE_THREADS; ++i) {
delete_threads.add_thread(new thread([&] {
pair<uint8_t, BufferHandle> item;
while (delete_queue.BlockingGet(&item)) {
ASSERT_EQ(item.first, item.second.data()[0]);
pool.FreeBuffer(&client, &item.second);
available_reservation.Add(TEST_BUFFER_LEN);
}
}));
}
allocate_threads.join_all();
delete_queue.Shutdown();
delete_threads.join_all();
pool.DeregisterClient(&client);
global_reservations_.Close();
}
// IMPALA-7446: the buffer pool GC hook that's set up in ExecEnv should
// free cached buffers.
TEST_F(BufferPoolTest, BufferPoolGc) {
const int64_t BUFFER_SIZE = 1024L * 1024L * 1024L;
// Set up a small buffer pool and process mem limit that fits only a single buffer.
// A large buffer size is used so that untracked memory is small relative to the
// buffer.
test_env_.reset(new TestEnv);
test_env_->SetBufferPoolArgs(1024, BUFFER_SIZE);
// Make sure we have a process memory tracker that uses TCMalloc metrics to match
// GC behaviour of a real impalad and reproduce IMPALA-7446. We need to add some
// extra headroom for other allocations and overhead.
test_env_->SetProcessMemTrackerArgs(BUFFER_SIZE * 3 / 2, true);
ASSERT_OK(test_env_->Init());
BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
// Set up a client with unlimited reservation.
MemTracker* client_tracker = obj_pool_.Add(
new MemTracker(-1, "client", test_env_->exec_env()->process_mem_tracker()));
BufferPool::ClientHandle client;
ASSERT_OK(buffer_pool->RegisterClient("", nullptr,
test_env_->exec_env()->buffer_reservation(), client_tracker,
numeric_limits<int>::max(), NewProfile(), &client));
BufferPool::BufferHandle buffer;
ASSERT_TRUE(client.IncreaseReservation(BUFFER_SIZE));
// Make sure that buffers/pages were gc'ed and/or recycled.
EXPECT_EQ(0, buffer_pool->GetSystemBytesAllocated());
ASSERT_OK(buffer_pool->AllocateBuffer(&client, BUFFER_SIZE, &buffer));
buffer_pool->FreeBuffer(&client, &buffer);
ASSERT_OK(client.DecreaseReservationTo(numeric_limits<int64_t>::max(), 0));
// Before IMPALA-7446 was fixed, this reservation increase would fail because the
// free buffer counted against the process memory limit.
ASSERT_TRUE(client.IncreaseReservation(BUFFER_SIZE));
ASSERT_OK(buffer_pool->AllocateBuffer(&client, BUFFER_SIZE, &buffer));
buffer_pool->FreeBuffer(&client, &buffer);
buffer_pool->DeregisterClient(&client);
}
} // namespace impala
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
impala::InitFeSupport();
ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
int result = 0;
for (bool encryption : {false, true}) {
for (bool numa : {false, true}) {
if (!numa && encryption) continue; // Not an interesting combination.
impala::CpuTestUtil::SetupFakeNuma(numa);
FLAGS_disk_spill_encryption = encryption;
std::cerr << "+==================================================" << std::endl
<< "| Running tests with encryption=" << encryption << " numa=" << numa
<< std::endl
<< "+==================================================" << std::endl;
if (RUN_ALL_TESTS() != 0) result = 1;
}
}
return result;
}