blob: 2a9d1016566659a91308c8358c14df1354d2650b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <future>
#include <boost/filesystem.hpp>
#include "gutil/strings/substitute.h"
#include "kudu/util/env.h"
#include "runtime/tuple-cache-mgr.h"
#include "testutil/gtest-util.h"
#include "util/filesystem-util.h"
#include "util/time.h"
#include "common/names.h"
namespace filesystem = boost::filesystem;
using std::async;
using std::future;
using std::launch;
DECLARE_bool(cache_force_single_shard);
namespace impala {
class TupleCacheMgrTest : public ::testing::Test {
public:
void SetUp() override {
cache_dir_ = ("/tmp" / boost::filesystem::unique_path()).string();
ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(cache_dir_));
}
void TearDown() override {
ASSERT_OK(FileSystemUtil::RemovePaths({cache_dir_}));
}
TupleCacheMgr GetCache(const string& cache_dir, const string& capacity = "1MB",
string eviction_policy = "LRU", uint8_t debug_pos = TupleCacheMgr::NO_FILES,
uint32_t sync_pool_size = 0, uint32_t sync_pool_queue_depth = 1000,
string outstanding_write_limit_str = "1GB",
uint32_t outstanding_write_chunk_bytes = 0) {
string cache_config;
if (!cache_dir.empty()) {
cache_config = Substitute("$0:$1", cache_dir, capacity);
}
return TupleCacheMgr{move(cache_config), move(eviction_policy), &metrics_, debug_pos,
sync_pool_size, sync_pool_queue_depth, move(outstanding_write_limit_str),
outstanding_write_chunk_bytes};
}
TupleCacheMgr GetCache() {
return GetCache(GetCacheDir());
}
TupleCacheMgr GetFailAllocateCache() {
return GetCache(GetCacheDir(), "1MB", "LRU",
TupleCacheMgr::FAIL_ALLOCATE | TupleCacheMgr::NO_FILES);
}
TupleCacheMgr GetFailInsertCache() {
return GetCache(GetCacheDir(), "1MB", "LRU",
TupleCacheMgr::FAIL_INSERT | TupleCacheMgr::NO_FILES);
}
std::string GetCacheDir() const { return cache_dir_; }
private:
std::string cache_dir_;
MetricGroup metrics_{"tuple-cache-test"};
};
TEST_F(TupleCacheMgrTest, Disabled) {
TupleCacheMgr cache = GetCache("");
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key");
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestMiss) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key");
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestMissAcquire) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestFailAllocate) {
TupleCacheMgr cache = GetFailAllocateCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestFailInsert) {
TupleCacheMgr cache = GetFailInsertCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestHit) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
cache.CompleteWrite(move(handle), 100);
handle = cache.Lookup("a_key", true);
EXPECT_TRUE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
std::string expected_loc =
(filesystem::path(GetCacheDir()) / "tuple-cache-a_key").string();
std::string actual_loc = cache.GetPath(handle);
EXPECT_EQ(expected_loc, actual_loc);
}
TEST_F(TupleCacheMgrTest, TestTombstone) {
// Create and immediately tombstone an entry.
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("tombstone_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
cache.AbortWrite(move(handle), true);
// Subsequent lookups should find a tombstone.
handle = cache.Lookup("tombstone_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_FALSE(cache.IsAvailableForWrite(handle));
}
TEST_F(TupleCacheMgrTest, TestConcurrentWrite) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
// Attempt to IsAvailableForWrite many times concurrently. Successes will be returned.
vector<future<TupleCacheMgr::UniqueHandle>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
if (cache.IsAvailableForWrite(handle)) {
return handle;
}
return TupleCacheMgr::UniqueHandle{nullptr};
}));
}
// Wait for all threads to complete so we don't abort and allow another success.
for (auto& result : results) {
result.wait();
}
int successes = 0;
for (auto& result : results) {
if (TupleCacheMgr::UniqueHandle handle = result.get(); handle) {
++successes;
cache.AbortWrite(move(handle), false);
}
}
// Only one Acquire should succeed.
EXPECT_EQ(1, successes);
}
TEST_F(TupleCacheMgrTest, TestConcurrentTombstone) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
// Attempt to IsAvailableForWrite many times concurrently. Returns successes.
vector<future<bool>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
if (cache.IsAvailableForWrite(handle)) {
// Immediately tombstone for other threads.
cache.AbortWrite(move(handle), true);
return true;
}
return false;
}));
}
int successes = 0;
for (auto& result : results) {
result.wait();
if (result.get()) ++successes;
}
// Only one Acquire should succeed.
EXPECT_EQ(1, successes);
}
TEST_F(TupleCacheMgrTest, TestConcurrentAbort) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
// Attempt to IsAvailableForWrite many times concurrently. Returns successes.
vector<future<bool>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key", true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
if (cache.IsAvailableForWrite(handle)) {
// Immediately abort for other threads.
cache.AbortWrite(move(handle), false);
return true;
}
return false;
}));
}
int successes = 0;
for (auto& result : results) {
result.wait();
if (result.get()) ++successes;
}
// Multiple Acquires should succeed. This is somewhat probabilistic, but odds of
// every other thread completing between the first IsAvailableForWrite/AbortWrite
// seem very low.
EXPECT_GT(successes, 1);
}
TEST_F(TupleCacheMgrTest, TestConcurrentComplete) {
TupleCacheMgr cache = GetCache();
ASSERT_OK(cache.Init());
// Attempt to write many times concurrently. Returns successful reads of that write.
vector<future<bool>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key", true);
if (cache.IsAvailableForRead(handle)) {
return true;
}
if (cache.IsAvailableForWrite(handle)) {
// Immediately complete for other threads.
cache.CompleteWrite(move(handle), 10);
}
return false;
}));
}
int successes = 0;
for (auto& result : results) {
result.wait();
if (result.get()) ++successes;
}
// At least one Read should succeed. This is somewhat probabilistic, but odds of
// every other thread completing between the first IsAvailableForWrite/CompleteWrite
// seem very low.
EXPECT_GT(successes, 0);
}
TEST_F(TupleCacheMgrTest, TestConcurrentEviction) {
FLAGS_cache_force_single_shard = true;
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
ASSERT_OK(cache.Init());
// Add many entries concurrently.
vector<future<void>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache, i]() {
TupleCacheMgr::UniqueHandle handle =
cache.Lookup(Substitute("concurrent_key$0", i), true);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
// Keep the 1st key in the cache.
cache.Lookup("concurrent_key0");
cache.CompleteWrite(move(handle), 20+i/2);
}));
}
for (auto& result : results) {
result.wait();
}
// The 1st key should still be in cache, 2nd should have been evicted.
TupleCacheMgr::UniqueHandle handle0 = cache.Lookup("concurrent_key0");
EXPECT_TRUE(cache.IsAvailableForRead(handle0));
TupleCacheMgr::UniqueHandle handle1 = cache.Lookup("concurrent_key1");
EXPECT_FALSE(cache.IsAvailableForRead(handle1));
}
TEST_F(TupleCacheMgrTest, TestMaxSize) {
FLAGS_cache_force_single_shard = true;
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
ASSERT_OK(cache.Init());
EXPECT_EQ(1024, cache.MaxSize());
}
TEST_F(TupleCacheMgrTest, TestRequestWriteSize) {
FLAGS_cache_force_single_shard = true;
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
ASSERT_OK(cache.Init());
// Write 5 entries of 200 bytes each
for (int i = 0; i < 5; ++i) {
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("a_key_$0", i), true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
cache.CompleteWrite(move(handle), 200);
}
TupleCacheMgr::UniqueHandle handle = cache.Lookup("update_entry_then_abort", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
// Update to 200 bytes. This should evict one entry.
Status status = cache.RequestWriteSize(&handle, 200);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 1);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 200);
// Update to 900. This will evict all the others
status = cache.RequestWriteSize(&handle, 900);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
// Update to MaxSize(). This will succeed.
status = cache.RequestWriteSize(&handle, cache.MaxSize());
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
// Try to update to MaxSize() + 1. This will fail.
status = cache.RequestWriteSize(&handle, cache.MaxSize() + 1);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED);
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
// Need to test the three state transitions out of IN_PROGRESS
// Path #1: AbortWrite without tombstone
cache.AbortWrite(move(handle), /* tombstone */ false);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
// Path #2: AbortWrite with tombstone
handle = cache.Lookup("update_entry_then_tombstone", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
status = cache.RequestWriteSize(&handle, 900);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
cache.AbortWrite(move(handle), /* tombstone */ true);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
// Path #3: CompleteWrite
handle = cache.Lookup("update_entry_then_complete", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
status = cache.RequestWriteSize(&handle, 900);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
cache.CompleteWrite(move(handle), 900);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
}
TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimit) {
FLAGS_cache_force_single_shard = true;
// Set up a cache with an outstanding write limit of 1KB
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "1KB");
ASSERT_OK(cache.Init());
// Open two handles
TupleCacheMgr::UniqueHandle handle1 = cache.Lookup("outstanding_write_limit_1", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle1));
TupleCacheMgr::UniqueHandle handle2 = cache.Lookup("outstanding_write_limit_2", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle2));
// UpdateWrite size to 512 bytes for each, so it is equal to the limit and succeeds.
Status status = cache.RequestWriteSize(&handle1, 512);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 512);
status = cache.RequestWriteSize(&handle2, 512);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
// Going one byte past the limit should fail
// This does not set exceeded_max_size
status = cache.RequestWriteSize(&handle1, 513);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
// Clean up
cache.AbortWrite(move(handle1), /* tombstone */ false);
cache.AbortWrite(move(handle2), /* tombstone */ false);
}
TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent) {
FLAGS_cache_force_single_shard = true;
// Set up a cache with a low outstanding write limit of 1KB to make it easy to hit
// the limit.
TupleCacheMgr cache = GetCache(GetCacheDir(), "100KB", "LRU", 0, 0, 0, "1KB");
ASSERT_OK(cache.Init());
// This attempts to do 100 512-byte writes to the cache with 64 byte request chunks.
// The cache is big enough to fit all of the writes, so the only reason they should
// fail is when they hit the outstanding write limit.
vector<future<bool>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache, i]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", i), true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
// Write in 64 byte chunks, 8 chunks = 512 bytes
for (int num_chunks = 1; num_chunks <= 8; ++num_chunks) {
Status status = cache.RequestWriteSize(&handle, num_chunks * 64);
if (!status.ok()) {
cache.AbortWrite(move(handle), /* tombstone */ false);
return false;
}
}
cache.CompleteWrite(move(handle), 512);
return true;
}));
}
// Wait for all threads to complete and count the number of failures
uint32_t num_failures = 0;
for (auto& result : results) {
result.wait();
if (!result.get()) num_failures++;
}
// This test case has race conditions. We expect the failures to line up with the
// number of backpressure halted. We expect at least one thread to succeed.
// There are scenarios where all the threads can succeed, so this doesn't require
// num_failures > 0.
EXPECT_EQ(cache.tuple_cache_backpressure_halted_->GetValue(), num_failures);
EXPECT_LT(num_failures, 100);
}
TEST_F(TupleCacheMgrTest, TestOutstandingWriteChunkSize) {
FLAGS_cache_force_single_shard = true;
uint32_t chunk_size = 250;
// Set up a cache with an outstanding write limit of 1KB and a chunk size of 250
// The chunk size is specifically not a clean divisor of 1KB.
TupleCacheMgr cache =
GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "2KB", chunk_size);
ASSERT_OK(cache.Init());
TupleCacheMgr::UniqueHandle handle = cache.Lookup("outstanding_chunk_then_abort", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
// Update write size to 1, but this is counted as the chunk size
Status status = cache.RequestWriteSize(&handle, 1);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), chunk_size);
// Request write size to be equal to the chunk size. This doesn't change the outstanding
// write bytes.
status = cache.RequestWriteSize(&handle, chunk_size);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), chunk_size);
// Request write size to be one above the chunk size. This grabs a second chunk.
status = cache.RequestWriteSize(&handle, chunk_size + 1);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
// The chunk size avoids conflicts with the MaxSize(). This requests a size that
// would round to larger than MaxSize (the chunk size is not a clean divisor of the
// cache size), but it does not result in an error. Instead, it reserves MaxSize().
status = cache.RequestWriteSize(&handle,
((cache.MaxSize() / chunk_size) * chunk_size) + 1);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
// Request size can go all the way to MaxSize() even with chunk size.
status = cache.RequestWriteSize(&handle, cache.MaxSize());
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
// Need to test the three state transitions out of IN_PROGRESS
// Path #1: AbortWrite without tombstone
cache.AbortWrite(move(handle), /* tombstone */ false);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
// Path #2: AbortWrite with tombstone
handle = cache.Lookup("outstanding_chunk_then_tombstone", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
status = cache.RequestWriteSize(&handle, chunk_size + 1);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
cache.AbortWrite(move(handle), /* tombstone */ true);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
// Path #3: CompleteWrite
handle = cache.Lookup("outstanding_chunk_then_complete", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
status = cache.RequestWriteSize(&handle, chunk_size + 1);
EXPECT_OK(status);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
cache.CompleteWrite(move(handle), chunk_size + 1);
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
}
TEST_F(TupleCacheMgrTest, TestSyncToDisk) {
// Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
TupleCacheMgr cache =
GetCache(GetCacheDir(), "1KB", "LRU", /* debug_pos */ 0, /* sync_pool_size */ 10);
ASSERT_OK(cache.Init());
// Error case: If there is no file, then the thread doing sync will get an error
// when trying to open the file. This causes the entry to be evicted.
TupleCacheMgr::UniqueHandle handle = cache.Lookup("key_without_file", true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
cache.CompleteWrite(move(handle), 100);
// Sleep a bit to let the thread pool process the entry
SleepForMs(100);
handle = cache.Lookup("key_without_file", false);
EXPECT_FALSE(cache.IsAvailableForRead(handle));
// Success case: If there is a file that can be synced to disk, everything behaves
// normally.
handle = cache.Lookup("key_with_file", true);
std::string file_path = cache.GetPath(handle);
std::unique_ptr<kudu::WritableFile> cache_file;
kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, &cache_file);
EXPECT_TRUE(s.ok());
std::string data("data");
cache_file->Append(Slice(data));
cache.CompleteWrite(move(handle), 100);
// Sleep a bit to let the thread pool process the entry
SleepForMs(100);
handle = cache.Lookup("key_with_file", false);
EXPECT_TRUE(cache.IsAvailableForRead(handle));
}
TEST_F(TupleCacheMgrTest, TestDroppedSyncs) {
// Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
// We set a small sync_pool_size (1) and the bare minimum sync_pool_queue_depth (1)
// to force some syncs to be dropped.
FLAGS_cache_force_single_shard = true;
TupleCacheMgr cache = GetCache(GetCacheDir(), "10KB", "LRU", /* debug_pos */ 0,
/* sync_pool_size */ 1, /* sync_pool_queue_depth */ 1);
ASSERT_OK(cache.Init());
// Attempt to write entries to the cache concurrently to stress the sync pool.
// This uses many writers, but the writes are small and can all fit into the
// cache. The only reason something would fail to write to the cache is if the
// sync pool gets overwhelmed.
vector<future<bool>> results;
results.reserve(100);
for (int i = 0; i < 100; ++i) {
results.emplace_back(async(launch::async, [&cache, i]() {
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", i), true);
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
std::string file_path = cache.GetPath(handle);
std::unique_ptr<kudu::WritableFile> cache_file;
kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, &cache_file);
EXPECT_TRUE(s.ok());
std::string data("data");
cache_file->Append(Slice(data));
cache.CompleteWrite(move(handle), 100);
// CompleteWrite doesn't return status, so we can only tell if the sync failed
// by looking up the entry.
handle = cache.Lookup(Substitute("write$0", i), false);
return cache.IsAvailableForRead(handle);
}));
}
// Wait for all threads to complete and count the number of failures
uint32_t num_failures = 0;
for (auto& result : results) {
result.wait();
if (!result.get()) num_failures++;
}
// The sync pool should get overwhelmed and the number of dropped syncs should match
// the number of failures.
EXPECT_GT(cache.tuple_cache_dropped_sync_->GetValue(), 0);
EXPECT_EQ(cache.tuple_cache_dropped_sync_->GetValue(), num_failures);
}
} // namespace impala