blob: af84bbebf7646443975bffa767420fe39a57166c [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/unordered_map.hpp>
#include <cstdlib>
#include <string>
#include <vector>
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "common/init.h"
#include "common/object-pool.h"
#include "testutil/death-test-util.h"
#include "testutil/gtest-util.h"
#include "common/names.h"
namespace impala {
class BufferPoolTest : public ::testing::Test {
public:
virtual void SetUp() {}
virtual void TearDown() {
for (auto entry : query_reservations_) {
ReservationTracker* tracker = entry.second;
tracker->Close();
}
global_reservations_.Close();
obj_pool_.Clear();
}
/// The minimum buffer size used in most tests.
const static int64_t TEST_BUFFER_LEN = 1024;
/// Test helper to simulate registering then deregistering a number of queries with
/// the given initial reservation and reservation limit.
void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
int64_t initial_query_reservation, int64_t query_reservation_limit);
/// Create and destroy a page multiple times.
void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops);
protected:
static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
/// Helper function to create one reservation tracker per query.
ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
lock_guard<SpinLock> l(query_reservations_lock_);
ReservationTracker* tracker = query_reservations_[query_id];
if (tracker != NULL) return tracker;
tracker = obj_pool_.Add(new ReservationTracker());
query_reservations_[query_id] = tracker;
return tracker;
}
RuntimeProfile* NewProfile() {
return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
}
ObjectPool obj_pool_;
ReservationTracker global_reservations_;
// Map from query_id to the reservation tracker for that query. Reads and modifications
// of the map are protected by query_reservations_lock_.
unordered_map<int64_t, ReservationTracker*> query_reservations_;
SpinLock query_reservations_lock_;
};
const int64_t BufferPoolTest::TEST_BUFFER_LEN;
void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
Status status;
int clients_per_query = 32;
BufferPool::Client* clients[num_queries];
ReservationTracker* client_reservations[num_queries];
for (int i = 0; i < num_queries; ++i) {
int64_t query_id = QueryId(query_id_hi, i);
// Initialize a tracker for a new query.
ReservationTracker* query_reservation = GetQueryReservationTracker(query_id);
query_reservation->InitChildTracker(
NULL, &global_reservations_, NULL, query_reservation_limit);
// Test that closing then reopening child tracker works.
query_reservation->Close();
query_reservation->InitChildTracker(
NULL, &global_reservations_, NULL, query_reservation_limit);
EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
clients[i] = new BufferPool::Client[clients_per_query];
client_reservations[i] = new ReservationTracker[clients_per_query];
for (int j = 0; j < clients_per_query; ++j) {
int64_t initial_client_reservation =
initial_query_reservation / clients_per_query + j
< initial_query_reservation % clients_per_query;
// Reservation limit can be anything greater or equal to the initial reservation.
int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
client_reservations[i][j].InitChildTracker(
NULL, query_reservation, NULL, client_reservation_limit);
EXPECT_TRUE(
client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
string name = Substitute("Client $0 for query $1", j, query_id);
EXPECT_OK(pool->RegisterClient(
name, &client_reservations[i][j], NewProfile(), &clients[i][j]));
}
for (int j = 0; j < clients_per_query; ++j) {
ASSERT_TRUE(clients[i][j].is_registered());
}
}
// Deregister clients then query.
for (int i = 0; i < num_queries; ++i) {
for (int j = 0; j < clients_per_query; ++j) {
pool->DeregisterClient(&clients[i][j]);
ASSERT_FALSE(clients[i][j].is_registered());
client_reservations[i][j].Close();
}
delete[] clients[i];
delete[] client_reservations[i];
GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
}
}
/// Test that queries and clients can be registered and deregistered with the reservation
/// trackers and the buffer pool.
TEST_F(BufferPoolTest, BasicRegistration) {
int num_concurrent_queries = 1024;
int64_t sum_initial_reservations = 4;
int64_t reservation_limit = 1024;
// Need enough buffers for all initial reservations.
int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
RegisterQueriesAndClients(
&pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
DCHECK_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test that queries and clients can be registered and deregistered by concurrent
/// threads.
TEST_F(BufferPoolTest, ConcurrentRegistration) {
int queries_per_thread = 64;
int num_threads = 64;
int num_concurrent_queries = queries_per_thread * num_threads;
int64_t sum_initial_reservations = 4;
int64_t reservation_limit = 1024;
// Need enough buffers for all initial reservations.
int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
// Launch threads, each with a different set of query IDs.
thread_group workers;
for (int i = 0; i < num_threads; ++i) {
workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
&pool, i, queries_per_thread, sum_initial_reservations, reservation_limit)));
}
workers.join_all();
// All the reservations should be released at this point.
DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
DCHECK_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test basic page handle creation.
TEST_F(BufferPoolTest, PageCreation) {
// Allocate many pages, each a power-of-two multiple of the minimum page length.
int num_pages = 16;
int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
int64_t total_mem = 2 * 2 * max_page_len;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
BufferPool::Client client;
ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
vector<BufferPool::PageHandle> handles(num_pages);
// Create pages of various valid sizes.
for (int i = 0; i < num_pages; ++i) {
int size_multiple = 1 << i;
int64_t page_len = TEST_BUFFER_LEN * size_multiple;
int64_t used_before = client_tracker->GetUsedReservation();
ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
ASSERT_TRUE(handles[i].is_open());
ASSERT_TRUE(handles[i].is_pinned());
ASSERT_TRUE(handles[i].buffer_handle() != NULL);
ASSERT_TRUE(handles[i].data() != NULL);
ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
ASSERT_EQ(handles[i].len(), page_len);
ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
}
// Close the handles and check memory consumption.
for (int i = 0; i < num_pages; ++i) {
int64_t used_before = client_tracker->GetUsedReservation();
int page_len = handles[i].len();
pool.DestroyPage(&client, &handles[i]);
DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
}
pool.DeregisterClient(&client);
client_tracker->Close();
// All the reservations should be released at this point.
DCHECK_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
TEST_F(BufferPoolTest, BufferAllocation) {
// Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
int num_buffers = 16;
int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
int64_t total_mem = 2 * 2 * max_buffer_len;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
BufferPool::Client client;
ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
vector<BufferPool::BufferHandle> handles(num_buffers);
// Create buffers of various valid sizes.
for (int i = 0; i < num_buffers; ++i) {
int size_multiple = 1 << i;
int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
int64_t used_before = client_tracker->GetUsedReservation();
ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
ASSERT_TRUE(handles[i].is_open());
ASSERT_TRUE(handles[i].data() != NULL);
ASSERT_EQ(handles[i].len(), buffer_len);
DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
}
// Close the handles and check memory consumption.
for (int i = 0; i < num_buffers; ++i) {
int64_t used_before = client_tracker->GetUsedReservation();
int buffer_len = handles[i].len();
pool.FreeBuffer(&client, &handles[i]);
DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
}
pool.DeregisterClient(&client);
client_tracker->Close();
// All the reservations should be released at this point.
DCHECK_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test transfer of buffer handles between clients.
TEST_F(BufferPoolTest, BufferTransfer) {
// Each client needs to have enough reservation for a buffer.
const int num_clients = 5;
int64_t total_mem = num_clients * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
ReservationTracker client_trackers[num_clients];
BufferPool::Client clients[num_clients];
BufferPool::BufferHandle handles[num_clients];
for (int i = 0; i < num_clients; ++i) {
client_trackers[i].InitChildTracker(
NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
ASSERT_OK(pool.RegisterClient(
"test client", &client_trackers[i], NewProfile(), &clients[i]));
}
// Transfer the page around between the clients repeatedly in a circle.
ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
uint8_t* data = handles[0].data();
for (int iter = 0; iter < 10; ++iter) {
for (int client = 0; client < num_clients; ++client) {
int next_client = (client + 1) % num_clients;
ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
&clients[next_client], &handles[next_client]));
// Check that the transfer left things in a consistent state.
ASSERT_FALSE(handles[client].is_open());
ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
ASSERT_TRUE(handles[next_client].is_open());
ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation());
// The same underlying buffer should be used.
ASSERT_EQ(data, handles[next_client].data());
}
}
pool.FreeBuffer(&clients[0], &handles[0]);
for (int i = 0; i < num_clients; ++i) {
pool.DeregisterClient(&clients[i]);
client_trackers[i].Close();
}
DCHECK_EQ(global_reservations_.GetReservation(), 0);
global_reservations_.Close();
}
/// Test basic pinning and unpinning.
TEST_F(BufferPoolTest, Pin) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
// Set up client with enough reservation to pin twice.
int64_t child_reservation = TEST_BUFFER_LEN * 2;
BufferPool pool(TEST_BUFFER_LEN, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
client_tracker->InitChildTracker(
NewProfile(), &global_reservations_, NULL, child_reservation);
ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
BufferPool::Client client;
ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
BufferPool::PageHandle handle1, handle2;
// Can pin two minimum sized pages.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
ASSERT_TRUE(handle1.is_open());
ASSERT_TRUE(handle1.is_pinned());
ASSERT_TRUE(handle1.data() != NULL);
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
ASSERT_TRUE(handle2.is_open());
ASSERT_TRUE(handle2.is_pinned());
ASSERT_TRUE(handle2.data() != NULL);
pool.Unpin(&client, &handle2);
ASSERT_FALSE(handle2.is_pinned());
// Can pin minimum-sized page twice.
ASSERT_OK(pool.Pin(&client, &handle1));
ASSERT_TRUE(handle1.is_pinned());
// Have to unpin twice.
pool.Unpin(&client, &handle1);
ASSERT_TRUE(handle1.is_pinned());
pool.Unpin(&client, &handle1);
ASSERT_FALSE(handle1.is_pinned());
// Can pin double-sized page only once.
BufferPool::PageHandle double_handle;
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
ASSERT_TRUE(double_handle.is_open());
ASSERT_TRUE(double_handle.is_pinned());
ASSERT_TRUE(double_handle.data() != NULL);
// Destroy the pages - test destroying both pinned and unpinned.
pool.DestroyPage(&client, &handle1);
pool.DestroyPage(&client, &handle2);
pool.DestroyPage(&client, &double_handle);
pool.DeregisterClient(&client);
client_tracker->Close();
}
/// Creating a page or pinning without sufficient reservation should DCHECK.
TEST_F(BufferPoolTest, PinWithoutReservation) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
BufferPool pool(TEST_BUFFER_LEN, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
client_tracker->InitChildTracker(
NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
BufferPool::Client client;
ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
BufferPool::PageHandle handle;
IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
// Should succeed after increasing reservation.
ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
// But we can't pin again.
IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
pool.DestroyPage(&client, &handle);
pool.DeregisterClient(&client);
client_tracker->Close();
}
TEST_F(BufferPoolTest, ExtractBuffer) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
// Set up client with enough reservation for two buffers/pins.
int64_t child_reservation = TEST_BUFFER_LEN * 2;
BufferPool pool(TEST_BUFFER_LEN, total_mem);
global_reservations_.InitRootTracker(NULL, total_mem);
ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
client_tracker->InitChildTracker(
NewProfile(), &global_reservations_, NULL, child_reservation);
ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
BufferPool::Client client;
ASSERT_OK(pool.RegisterClient("test client", client_tracker, NewProfile(), &client));
BufferPool::PageHandle page;
BufferPool::BufferHandle buffer;
// Test basic buffer extraction.
for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
ASSERT_OK(pool.CreatePage(&client, len, &page));
uint8_t* page_data = page.data();
pool.ExtractBuffer(&client, &page, &buffer);
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
ASSERT_EQ(len, buffer.len());
ASSERT_EQ(page_data, buffer.data());
ASSERT_EQ(len, client_tracker->GetUsedReservation());
pool.FreeBuffer(&client, &buffer);
ASSERT_EQ(0, client_tracker->GetUsedReservation());
}
// Test that ExtractBuffer() accounts correctly for pin count > 1.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
uint8_t* page_data = page.data();
ASSERT_OK(pool.Pin(&client, &page));
ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
pool.ExtractBuffer(&client, &page, &buffer);
ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
ASSERT_EQ(page_data, buffer.data());
pool.FreeBuffer(&client, &buffer);
ASSERT_EQ(0, client_tracker->GetUsedReservation());
// Test that ExtractBuffer() DCHECKs for unpinned pages.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
pool.Unpin(&client, &page);
IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
pool.DestroyPage(&client, &page);
pool.DeregisterClient(&client);
client_tracker->Close();
}
// Test concurrent creation and destruction of pages.
TEST_F(BufferPoolTest, ConcurrentPageCreation) {
int ops_per_thread = 1024;
int num_threads = 64;
// Need enough buffers for all initial reservations.
int total_mem = num_threads * TEST_BUFFER_LEN;
global_reservations_.InitRootTracker(NULL, total_mem);
BufferPool pool(TEST_BUFFER_LEN, total_mem);
// Launch threads, each with a different set of query IDs.
thread_group workers;
for (int i = 0; i < num_threads; ++i) {
workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
&global_reservations_, ops_per_thread)));
}
// Build debug string to test concurrent iteration over pages_ list.
for (int i = 0; i < 64; ++i) {
LOG(INFO) << pool.DebugString();
}
workers.join_all();
// All the reservations should be released at this point.
DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
global_reservations_.Close();
}
void BufferPoolTest::CreatePageLoop(
BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
ReservationTracker client_tracker;
client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
BufferPool::Client client;
ASSERT_OK(pool->RegisterClient("test client", &client_tracker, NewProfile(), &client));
for (int i = 0; i < num_ops; ++i) {
BufferPool::PageHandle handle;
ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
pool->Unpin(&client, &handle);
ASSERT_OK(pool->Pin(&client, &handle));
pool->DestroyPage(&client, &handle);
client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
}
pool->DeregisterClient(&client);
client_tracker.Close();
}
/// Test error path where pool is unable to fulfill a reservation because it cannot evict
/// unpinned pages.
TEST_F(BufferPoolTest, CapacityExhausted) {
global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
// TODO: once we enable spilling, set up buffer pool so that spilling is disabled.
// Set up pool with one more buffer than reservations (so that we hit the reservation
// limit instead of the buffer limit).
BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
BufferPool::PageHandle handle1, handle2, handle3;
BufferPool::Client client;
ASSERT_OK(
pool.RegisterClient("test client", &global_reservations_, NewProfile(), &client));
ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
// Do not have enough reservations because we pinned the page.
IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
// Even with reservations, we can only create one more unpinned page because we don't
// support eviction yet.
pool.Unpin(&client, &handle1);
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
pool.Unpin(&client, &handle2);
ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
// After destroying a page, we should have a free buffer that we can use.
pool.DestroyPage(&client, &handle1);
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
pool.DestroyPage(&client, &handle2);
pool.DestroyPage(&client, &handle3);
pool.DeregisterClient(&client);
}
}
IMPALA_TEST_MAIN();