blob: bff2be4f1e2de7d7bbe7835b6751f78def436566 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <unistd.h>
#include <atomic>
#include <cstdint>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/barrier.h"
#include "kudu/util/locks.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using std::atomic;
using std::lock_guard;
using std::string;
using std::thread;
using std::unordered_map;
using std::vector;
namespace kudu {
namespace consensus {
static constexpr const int64_t kInitialTerm = 1;
using LockTable = unordered_map<string, string>;
// Multithreaded stress tests for the cmeta manager.
class ConsensusMetadataManagerStressTest : public KuduTest {
public:
ConsensusMetadataManagerStressTest()
: rng_(SeedRandom()),
fs_manager_(env_, FsManagerOpts(GetTestPath("fs_root"))),
cmeta_manager_(new ConsensusMetadataManager(&fs_manager_)) {
}
void SetUp() override {
KuduTest::SetUp();
ASSERT_OK(fs_manager_.CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_.Open());
// Initialize test configuration.
config_.set_opid_index(kInvalidOpIdIndex);
RaftPeerPB* peer = config_.add_peers();
peer->set_permanent_uuid(fs_manager_.uuid());
peer->set_member_type(RaftPeerPB::VOTER);
}
protected:
enum OpType {
kCreate,
kLoad,
kDelete,
kNumOpTypes, // Must come last.
};
ThreadSafeRandom rng_;
FsManager fs_manager_;
scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
RaftConfigPB config_;
// Lock used by tests.
simple_spinlock lock_;
};
// Concurrency test to check whether TSAN will flag unsafe concurrent
// operations for simultaneous access to the cmeta manager by different threads
// on different tablet ids. For a given tablet id, a lock table is used as
// external synchronization to ensure exclusive access by a single thread.
TEST_F(ConsensusMetadataManagerStressTest, CreateLoadDeleteTSANTest) {
static const int kNumTablets = 26;
static const int kNumThreads = 8;
static const int kNumOpsPerThread = 1000;
// Set of tablets we are operating on.
vector<string> tablet_ids;
// Map of tablet_id -> cmeta existence.
unordered_map<string, bool> tablet_cmeta_exists;
// Each entry in 'lock_table' protects each value of
// 'tablet_cmeta_exists[tablet_id]'. We never resize 'tablet_cmeta_exists'.
LockTable lock_table;
for (int i = 0; i < kNumTablets; i++) {
string tablet_id = string(1, 'a' + i);
// None of the cmetas have been created yet.
InsertOrDie(&tablet_cmeta_exists, tablet_id, false);
tablet_ids.push_back(std::move(tablet_id));
}
// Eventually exit if the test hangs.
alarm(60);
auto c = MakeScopedCleanup([&] { alarm(0); });
atomic<int64_t> ops_performed(0);
Barrier barrier(kNumThreads);
vector<thread> threads;
for (int thread_num = 0; thread_num < kNumThreads; thread_num++) {
threads.emplace_back([&] {
barrier.Wait();
for (int op_num = 0; op_num < kNumOpsPerThread; op_num++) {
const string& tablet_id = tablet_ids[rng_.Uniform(kNumTablets)];
auto unlocker = MakeScopedCleanup([&] {
lock_guard<simple_spinlock> l(lock_);
CHECK(lock_table.erase(tablet_id));
});
// Acquire lock in lock table or bail.
{
// 'lock_' protects 'lock_table'.
lock_guard<simple_spinlock> l(lock_);
if (ContainsKey(lock_table, tablet_id)) {
// Another thread has access to this tablet id. Bail.
unlocker.cancel(); // Don't unlock what we didn't lock.
continue;
}
InsertOrDie(&lock_table, tablet_id, "lock for test");
}
OpType type = static_cast<OpType>(rng_.Uniform(kNumOpTypes));
switch (type) {
case kCreate: {
Status s = cmeta_manager_->Create(tablet_id, config_, kInitialTerm);
if (tablet_cmeta_exists[tablet_id]) {
CHECK(s.IsAlreadyPresent()) << s.ToString();
} else {
CHECK(s.ok()) << s.ToString();
ops_performed.fetch_add(1, std::memory_order_relaxed);
}
tablet_cmeta_exists[tablet_id] = true;
break;
}
case kLoad: {
scoped_refptr<ConsensusMetadata> cmeta;
Status s = cmeta_manager_->Load(tablet_id, &cmeta);
if (tablet_cmeta_exists[tablet_id]) {
CHECK(s.ok()) << s.ToString();
ops_performed.fetch_add(1, std::memory_order_relaxed);
} else {
CHECK(s.IsNotFound()) << tablet_id << ": " << s.ToString();
}
// Load() does not change 'tablet_cmeta_exists' status.
break;
}
case kDelete: {
Status s = cmeta_manager_->Delete(tablet_id);
if (tablet_cmeta_exists[tablet_id]) {
CHECK(s.ok()) << s.ToString();
ops_performed.fetch_add(1, std::memory_order_relaxed);
} else {
CHECK(s.IsNotFound()) << s.ToString();
}
tablet_cmeta_exists[tablet_id] = false;
break;
}
default: LOG(FATAL) << type; break;
}
}
});
}
for (int thread_num = 0; thread_num < kNumThreads; thread_num++) {
threads[thread_num].join();
}
LOG(INFO) << "Ops performed: " << ops_performed.load(std::memory_order_relaxed);
}
} // namespace consensus
} // namespace kudu