blob: 416dc2a6af40bf67d31d7e2a89003e5191a0d7da [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <boost/bind.hpp>
#include <fstream>
#include <gflags/gflags.h>
#include <iostream>
#include <rapidjson/document.h>
#include <sys/sysinfo.h>
#include "gutil/strings/join.h"
#include "gutil/strings/util.h"
#include "runtime/io/data-cache.h"
#include "runtime/io/request-ranges.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
#include "testutil/scoped-flag-setter.h"
#include "util/counting-barrier.h"
#include "util/filesystem-util.h"
#include "util/thread.h"
#include "common/names.h"
#define BASE_CACHE_DIR "/tmp"
#define DEFAULT_CACHE_SIZE (4 * 1024 * 1024)
#define FNAME ("foobar")
#define NUM_TEST_DIRS (16)
#define NUM_THREADS (18)
#define MTIME (12345)
#define TEMP_BUFFER_SIZE (4096)
#define TEST_BUFFER_SIZE (8192)
DECLARE_bool(cache_force_single_shard);
DECLARE_bool(data_cache_anonymize_trace);
DECLARE_bool(data_cache_enable_tracing);
DECLARE_int64(data_cache_file_max_size_bytes);
DECLARE_int32(data_cache_max_opened_files);
DECLARE_int32(data_cache_write_concurrency);
namespace impala {
namespace io {
class DataCacheTest : public testing::Test {
public:
const uint8_t* test_buffer() {
return reinterpret_cast<const uint8_t*>(test_buffer_);
}
const std::vector<string>& data_cache_dirs() {
return data_cache_dirs_;
}
//
// Use multiple threads to insert and read back a set of ranges from test_buffer().
// Depending on the setting, the working set may or may not fit in the cache.
//
// 'cache_size' : cache size in byte
// 'max_start_offset' : maximum offset in test_buffer() to start copying from
// into the cache for a given entry
// 'use_per_thread_filename' : If true, the filename used when inserting into the cache
// will be prefixed with the thread name, which is unique
// per thread; If false, the prefix for filenames is empty
// 'expect_misses' : If true, expect there will be cache misses when reading
// from the cache
//
void MultiThreadedReadWrite(DataCache* cache, int64_t max_start_offset,
bool use_per_thread_filename, bool expect_misses) {
// Barrier to synchronize all threads so no thread will start probing the cache until
// all insertions are done.
CountingBarrier barrier(NUM_THREADS);
vector<unique_ptr<Thread>> threads;
int num_misses[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i) {
unique_ptr<Thread> thread;
num_misses[i] = 0;
string thread_name = Substitute("thread-$0", i);
ASSERT_OK(Thread::Create("data-cache-test", thread_name,
boost::bind(&DataCacheTest::ThreadFn, this,
use_per_thread_filename ? thread_name : "", cache, max_start_offset,
&barrier, &num_misses[i]), &thread));
threads.emplace_back(std::move(thread));
}
int cache_misses = 0;
for (int i = 0; i < NUM_THREADS; ++i) {
threads[i]->Join();
cache_misses += num_misses[i];
}
if (expect_misses) {
ASSERT_GT(cache_misses, 0);
} else {
ASSERT_EQ(0, cache_misses);
}
// Verify the backing files don't exceed size limits.
ASSERT_OK(cache->CloseFilesAndVerifySizes());
}
protected:
DataCacheTest() {
// Create a buffer of random characters.
for (int i = 0; i < TEST_BUFFER_SIZE; ++i) {
test_buffer_[i] = '!' + (rand() % 93);
}
}
// Create a bunch of test directories in which the data cache will reside.
virtual void SetUp() {
test_env_.reset(new TestEnv());
flag_saver_.reset(new google::FlagSaver());
ASSERT_OK(test_env_->Init());
for (int i = 0; i < NUM_TEST_DIRS; ++i) {
const string& path = Substitute("$0/data-cache-test.$1", BASE_CACHE_DIR, i);
ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(path));
data_cache_dirs_.push_back(path);
}
// Force a single shard to avoid imbalance between shards in the Kudu LRU cache which
// may lead to unintended eviction. This allows the capacity of the cache is utilized
// to the limit for the purpose of this test.
FLAGS_cache_force_single_shard = true;
// Allows full write concurrency for the multi-threaded tests.
FLAGS_data_cache_write_concurrency = NUM_THREADS;
}
// Delete all the test directories created.
virtual void TearDown() {
// Make sure the cache's destructor removes all backing files, except for
// potentially the trace file.
for (const string& dir_path : data_cache_dirs_) {
vector<string> entries;
ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(dir_path, &entries));
if (entries.size() == 1) {
EXPECT_EQ(DataCache::Partition::TRACE_FILE_NAME, entries[0]);
} else {
ASSERT_EQ(0, entries.size());
}
}
ASSERT_OK(FileSystemUtil::RemovePaths(data_cache_dirs_));
flag_saver_.reset();
test_env_.reset();
}
private:
std::unique_ptr<TestEnv> test_env_;
char test_buffer_[TEST_BUFFER_SIZE];
vector<string> data_cache_dirs_;
// Saved configuration flags for restoring the values at the end of the test.
std::unique_ptr<google::FlagSaver> flag_saver_;
// The function is invoked by each thread in the multi-threaded tests below.
// It inserts chunks of size TEMP_BUFFER_SIZE from different offsets at range
// [0, 'max_test_offset') from 'test_buffer_'. Afterwards, it tries reading back
// all the inserted chunks.
//
// 'fname_prefix' is the prefix added to the default filename when inserting into
// the cache. 'max_test_offset' and 'fname_prefix' implicitly control the combination
// of cache keys which indirectly control the cache footprint. 'num_misses' records
// the number of cache misses.
void ThreadFn(const string& fname_prefix, DataCache* cache, int64_t max_start_offset,
CountingBarrier* store_barrier, int* num_misses) {
const string& custom_fname = Substitute("$0file", fname_prefix);
vector<int64_t> offsets;
for (int64_t offset = 0; offset < max_start_offset; ++offset) {
offsets.push_back(offset);
}
random_shuffle(offsets.begin(), offsets.end());
for (int64_t offset : offsets) {
cache->Store(custom_fname, MTIME, offset, test_buffer() + offset,
TEMP_BUFFER_SIZE);
}
// Wait until all threads have finished inserting. Since different threads may be
// inserting the same cache key and collide, only one thread which wins the race will
// insert the cache entry. Make sure other threads which lose out on the race will
// wait for the insertion to complete first before proceeding.
store_barrier->Notify();
store_barrier->Wait();
for (int64_t offset : offsets) {
uint8_t buffer[TEMP_BUFFER_SIZE];
memset(buffer, 0, TEMP_BUFFER_SIZE);
int64_t bytes_read =
cache->Lookup(custom_fname, MTIME, offset, TEMP_BUFFER_SIZE, buffer);
if (bytes_read == TEMP_BUFFER_SIZE) {
ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
} else {
ASSERT_EQ(bytes_read, 0);
++(*num_misses);
}
}
}
};
// This test exercises the basic insertion and lookup paths by inserting a known set of
// offsets which fit in the cache entirely. Also tries reading entries which are never
// inserted into the cache.
TEST_F(DataCacheTest, TestBasics) {
const int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
// Temporary buffer for holding results read from the cache.
uint8_t buffer[TEMP_BUFFER_SIZE];
// Read and then insert a range of offsets. Expected all misses in the first iteration
// and all hits in the second iteration.
for (int i = 0; i < 2; ++i) {
for (int64_t offset = 0; offset < 1024; ++offset) {
int expected_bytes = i * TEMP_BUFFER_SIZE;
memset(buffer, 0, TEMP_BUFFER_SIZE);
ASSERT_EQ(expected_bytes,
cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer)) << offset;
if (i == 0) {
ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
TEMP_BUFFER_SIZE));
} else {
ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
}
}
}
// Read the same range inserted previously but with a different filename.
for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
const string& alt_fname = "random";
ASSERT_EQ(0, cache.Lookup(alt_fname, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
}
// Read the same range inserted previously but with a different mtime.
for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
int64_t alt_mtime = 67890;
ASSERT_EQ(0, cache.Lookup(FNAME, alt_mtime, offset, TEMP_BUFFER_SIZE, buffer));
}
// Read a range of offsets which should miss in the cache.
for (int64_t offset = 1024; offset < TEST_BUFFER_SIZE; ++offset) {
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
}
// Read the same same range inserted previously. They should still all be in the cache.
for (int64_t offset = 0; offset < 1024; ++offset) {
memset(buffer, 0, TEMP_BUFFER_SIZE);
ASSERT_EQ(TEMP_BUFFER_SIZE,
cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE + 10, buffer));
ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
ASSERT_EQ(TEMP_BUFFER_SIZE - 10,
cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE - 10, buffer));
ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE - 10));
}
// Insert with the same key but different length.
uint8_t buffer2[TEMP_BUFFER_SIZE + 10];
const int64_t larger_entry_size = TEMP_BUFFER_SIZE + 10;
ASSERT_TRUE(cache.Store(FNAME, MTIME, 0, test_buffer(), larger_entry_size));
memset(buffer2, 0, larger_entry_size);
ASSERT_EQ(larger_entry_size,
cache.Lookup(FNAME, MTIME, 0, larger_entry_size, buffer2));
ASSERT_EQ(0, memcmp(test_buffer(), buffer2, larger_entry_size));
// Check that an insertion larger than the cache size will fail.
ASSERT_FALSE(cache.Store(FNAME, MTIME, 0, test_buffer(), cache_size * 2));
// Test with uncacheable 'mtime' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, ScanRange::INVALID_MTIME, 0, test_buffer(),
TEMP_BUFFER_SIZE));
ASSERT_EQ(0, cache.Lookup(FNAME, ScanRange::INVALID_MTIME, 0, TEMP_BUFFER_SIZE,
buffer));
// Test with bad 'mtime' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, -1000, 0, test_buffer(), TEMP_BUFFER_SIZE));
ASSERT_EQ(0, cache.Lookup(FNAME, -1000, 0, TEMP_BUFFER_SIZE, buffer));
// Test with bad 'offset' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, MTIME, -2000, test_buffer(), TEMP_BUFFER_SIZE));
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, -2000, TEMP_BUFFER_SIZE, buffer));
// Test with bad 'buffer_len' to make sure the entry is not stored.
ASSERT_FALSE(cache.Store(FNAME, MTIME, 0, test_buffer(), -5000));
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, 0, -5000, buffer));
}
// Tests backing file rotation by setting FLAGS_data_cache_file_max_size_bytes to be 1/4
// of the cache size. This forces rotation of backing files.
TEST_F(DataCacheTest, RotateFiles) {
// Set the maximum size of backing files to be 1/4 of the cache size.
FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
// Read and then insert a range of offsets. Expected all misses in the first iteration
// and all hits in the second iteration.
for (int i = 0; i < 2; ++i) {
for (int64_t offset = 0; offset < 1024; ++offset) {
int expected_bytes = i * TEMP_BUFFER_SIZE;
uint8_t buffer[TEMP_BUFFER_SIZE];
memset(buffer, 0, TEMP_BUFFER_SIZE);
ASSERT_EQ(expected_bytes,
cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer)) << offset;
if (i == 0) {
ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
TEMP_BUFFER_SIZE));
} else {
ASSERT_EQ(0, memcmp(test_buffer() + offset, buffer, TEMP_BUFFER_SIZE));
}
}
}
// Make sure the cache's destructor removes all backing files.
vector<string> entries;
ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(data_cache_dirs()[0], &entries, -1,
FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
ASSERT_EQ(4, entries.size());
// Verify the backing files don't exceed size limits.
ASSERT_OK(cache.CloseFilesAndVerifySizes());
}
// Tests backing file rotation by setting --data_cache_file_max_size_bytes to be 1/4
// of the cache size. This forces rotation of backing files. Also sets
// --data_cache_max_opened_files to 1 so that only one underlying
// file is allowed. This exercises the lazy deletion path.
TEST_F(DataCacheTest, RotateAndDeleteFiles) {
// Set the maximum size of backing files to be 1/4 of the cache size.
FLAGS_data_cache_file_max_size_bytes = 1024 * 1024;
// Force to allow one backing file.
FLAGS_data_cache_max_opened_files = 1;
const int64_t cache_size = 4 * FLAGS_data_cache_file_max_size_bytes ;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
// Read and insert a working set the same size of the cache. Expected all misses
// in all iterations as the backing file is only 1/4 of the cache capacity.
uint8_t buffer[TEMP_BUFFER_SIZE];
for (int i = 0; i < 4; ++i) {
for (int64_t offset = 0; offset < 1024; ++offset) {
memset(buffer, 0, TEMP_BUFFER_SIZE);
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
TEMP_BUFFER_SIZE));
}
}
// Verifies that the last part of the working set which fits in a single backing file
// is still in the cache.
int64_t in_cache_offset =
1024 - FLAGS_data_cache_file_max_size_bytes / TEMP_BUFFER_SIZE;
for (int64_t offset = in_cache_offset ; offset < 1024; ++offset) {
memset(buffer, 0, TEMP_BUFFER_SIZE);
ASSERT_EQ(TEMP_BUFFER_SIZE, cache.Lookup(FNAME, MTIME, offset,
TEMP_BUFFER_SIZE, buffer));
ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE));
}
// Make sure only one backing file exists. Allow for 10 seconds latency for the
// file deleter thread to run.
int num_entries = 0;
const int num_wait_secs = 10;
for (int i = 0; i <= num_wait_secs; ++i) {
vector<string> entries;
ASSERT_OK(FileSystemUtil::Directory::GetEntryNames(data_cache_dirs()[0], &entries, -1,
FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
num_entries = entries.size();
if (num_entries == 1) break;
// Flag a failure on the 11-th time around this loop.
ASSERT_LT(i, num_wait_secs);
sleep(1);
}
}
// Tests eviction in the cache by inserting a large entry which evicts all existing
// entries in the cache.
TEST_F(DataCacheTest, Eviction) {
const int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
// Read and then insert range chunks of size TEMP_BUFFER_SIZE from the test buffer.
// Expected all misses in both iterations due to LRU eviction.
uint8_t buffer[TEMP_BUFFER_SIZE];
int64_t offset = 0;
for (int i = 0; i < 2; ++i) {
for (offset = 0; offset < 1028; ++offset) {
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer() + offset,
TEMP_BUFFER_SIZE));
}
}
// Verifies that the cache is full.
int hit_count = 0;
for (offset = 0; offset < 1028; ++offset) {
int64_t bytes_read = cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer);
DCHECK(bytes_read == 0 || bytes_read == TEMP_BUFFER_SIZE);
if (bytes_read == TEMP_BUFFER_SIZE) ++hit_count;
}
ASSERT_EQ(1024, hit_count);
// Create a buffer which has the same size as the cache and insert it into the cache.
// This should evict all the existing entries in the cache.
unique_ptr<uint8_t[]> large_buffer(new uint8_t[DEFAULT_CACHE_SIZE]);
for (offset = 0; offset < cache_size; offset += TEST_BUFFER_SIZE) {
memcpy(large_buffer.get() + offset, test_buffer(), TEST_BUFFER_SIZE);
}
const string& alt_fname = "random";
offset = 0;
ASSERT_TRUE(cache.Store(alt_fname, MTIME, offset, large_buffer.get(), cache_size));
// Verifies that all previous entries are all evicted.
for (offset = 0; offset < 1028; ++offset) {
ASSERT_EQ(0, cache.Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer));
}
// The large buffer should still be in the cache.
unique_ptr<uint8_t[]> temp_buffer(new uint8_t[DEFAULT_CACHE_SIZE]);
offset = 0;
ASSERT_EQ(cache_size,
cache.Lookup(alt_fname, MTIME, offset, cache_size, temp_buffer.get()));
ASSERT_EQ(0, memcmp(temp_buffer.get(), large_buffer.get(), cache_size));
// Verify the backing files don't exceed size limits.
ASSERT_OK(cache.CloseFilesAndVerifySizes());
}
// Tests insertion and lookup with the cache with multiple threads.
// Inserts a working set which will fit in the cache. Despite potential
// collision during insertion, all entries in the working set should be found.
TEST_F(DataCacheTest, MultiThreadedNoMisses) {
int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
int64_t max_start_offset = 1024;
bool use_per_thread_filename = false;
bool expect_misses = false;
MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
expect_misses);
}
// Inserts a working set which is known to be larger than the cache's capacity.
// Expect some cache misses in lookups.
TEST_F(DataCacheTest, MultiThreadedWithMisses) {
int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
int64_t max_start_offset = 1024;
bool use_per_thread_filename = true;
bool expect_misses = true;
MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
expect_misses);
}
// Test insertion and lookup with a cache configured with multiple partitions.
TEST_F(DataCacheTest, MultiPartitions) {
StringPiece delimiter(",");
string cache_base = JoinStrings(data_cache_dirs(), delimiter);
const int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute("$0:$1", cache_base, std::to_string(cache_size)));
ASSERT_OK(cache.Init());
int64_t max_start_offset = 512;
bool use_per_thread_filename = false;
bool expect_misses = false;
MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
expect_misses);
}
// Tests insertion of a working set whose size is 1/8 of the total memory size.
// This likely exceeds the size of the page cache and forces write back of dirty pages in
// the page cache to the backing files and also read from the backing files during lookup.
TEST_F(DataCacheTest, LargeFootprint) {
struct sysinfo info;
ASSERT_EQ(0, sysinfo(&info));
ASSERT_GT(info.totalram, 0);
DataCache cache(
Substitute("$0:$1", data_cache_dirs()[0], std::to_string(info.totalram)));
ASSERT_OK(cache.Init());
const int64_t footprint = info.totalram / 8;
for (int64_t i = 0; i < footprint / TEST_BUFFER_SIZE; ++i) {
int64_t offset = i * TEST_BUFFER_SIZE;
ASSERT_TRUE(cache.Store(FNAME, MTIME, offset, test_buffer(), TEST_BUFFER_SIZE));
}
uint8_t buffer[TEST_BUFFER_SIZE];
for (int64_t i = 0; i < footprint / TEST_BUFFER_SIZE; ++i) {
int64_t offset = i * TEST_BUFFER_SIZE;
memset(buffer, 0, TEST_BUFFER_SIZE);
ASSERT_EQ(TEST_BUFFER_SIZE,
cache.Lookup(FNAME, MTIME, offset, TEST_BUFFER_SIZE, buffer));
ASSERT_EQ(0, memcmp(buffer, test_buffer(), TEST_BUFFER_SIZE));
}
}
TEST_F(DataCacheTest, TestAccessTrace) {
FLAGS_data_cache_enable_tracing = true;
for (bool anon : { false, true }) {
SCOPED_TRACE(anon);
FLAGS_data_cache_anonymize_trace = anon;
{
int64_t cache_size = DEFAULT_CACHE_SIZE;
DataCache cache(Substitute(
"$0:$1", data_cache_dirs()[0], std::to_string(cache_size)));
ASSERT_OK(cache.Init());
int64_t max_start_offset = 1024;
bool use_per_thread_filename = true;
bool expect_misses = true;
MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename,
expect_misses);
}
// Read the trace file and ensure that all of the lines are valid JSON with the
// expected fields.
std::ifstream trace(Substitute("$0/$1", data_cache_dirs()[0],
DataCache::Partition::TRACE_FILE_NAME));
int line_num = 1;
while (trace.good()) {
SCOPED_TRACE(line_num++);
string line;
std::getline(trace, line);
if (line.empty()) {
ASSERT_TRUE(trace.eof());
break;
}
SCOPED_TRACE(line);
rapidjson::Document d;
d.Parse<0>(line.c_str());
ASSERT_TRUE(d.IsObject());
ASSERT_TRUE(d["ts"].IsDouble());
ASSERT_TRUE(d["s"].IsString());
ASSERT_TRUE(d["m"].IsInt64());
ASSERT_TRUE(d["f"].IsString());
if (anon) {
// We expect anonymized filenames to be 22-character fingerprints:
// - 128 bit fingerprint = 16 bytes
// - 16 * 4/3 (base64 encoding) = 21.3333
// - Round up to 22.
EXPECT_EQ(22, d["f"].GetStringLength());
} else {
EXPECT_TRUE(MatchPattern(d["f"].GetString(), "thread-*file"));
}
}
}
}
} // namespace io
} // namespace impala
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
impala::InitFeSupport();
int rand_seed = time(NULL);
LOG(INFO) << "rand_seed: " << rand_seed;
srand(rand_seed);
// Skip if the platform is affected by KUDU-1508.
if (!impala::FileSystemUtil::CheckForBuggyExtFS(BASE_CACHE_DIR).ok()) {
LOG(WARNING) << "Skipping data-cache-test due to KUDU-1508.";
return 0;
}
return RUN_ALL_TESTS();
}