blob: 538d3dbe45be8d0b97ac696b160a26e121f92271 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/error_manager.h"
#include <cstdlib>
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/threading/thread_collision_warner.h"
#include "kudu/util/monotime.h"
#include "kudu/util/test_util.h"
using std::map;
using std::set;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
const int kVecSize = 10;
namespace kudu {
namespace fs {
// These tests are designed to ensure callback serialization by using callbacks
// that update a single vector.
class FsErrorManagerTest : public KuduTest {
public:
void SetUp() override {
KuduTest::SetUp();
em_.reset(new FsErrorManager());
test_vec_.resize(kVecSize, -1);
}
// Returns a stringified version of the single vector that each thread is
// updating.
string test_vec_string() {
return JoinInts(test_vec_, " ");
}
// Sleeps for a random amount of time, up to 500ms.
void SleepForRand() {
SleepFor(MonoDelta::FromMilliseconds(rand() % 500));
}
// Returns the index of the first instance of 'k' in test_vec_.
int FindFirst(int k) {
int first = -1;
for (int i = 0; i < test_vec_.size(); i++) {
if (test_vec_[i] == k) {
first = i;
break;
}
}
return first;
}
// Writes i to the first available (-1) entry in test_vec_ after sleeping a
// random amount of time. If multiple calls to this are running at the same
// time, it is likely that some will write to the same entry.
//
// NOTE: this can be curried into an ErrorNotificationCb.
void SleepAndWriteFirstEmptyCb(int i, const string& /* s */) {
DFAKE_SCOPED_LOCK(fake_lock_);
int first_available = FindFirst(-1);
SleepForRand();
if (first_available == -1) {
LOG(INFO) << "No available entries!";
return;
}
test_vec_[first_available] = i;
}
// Returns a map between each unique value in test_vec_ and the indices
// within test_vec_ at which the value is located.
map<int, set<int>> GetPositions() {
map<int, set<int>> positions;
for (int i = 0; i < test_vec_.size(); i++) {
positions[test_vec_[i]].insert(i);
}
return positions;
}
FsErrorManager* em() const { return em_.get(); }
protected:
// The single vector that the error notification callbacks will all write to.
vector<int> test_vec_;
private:
unique_ptr<FsErrorManager> em_;
// Fake lock used to ensure threads don't run error-handling at the same time.
DFAKE_MUTEX(fake_lock_);
};
// Tests the basic functionality (i.e. registering, unregistering, calling
// callbacks) of the error manager.
TEST_F(FsErrorManagerTest, TestBasicRegistration) {
// Before registering anything, there should be all '-1's in test_vec_.
ASSERT_EQ(-1, FindFirst(ErrorHandlerType::DISK_ERROR));
ASSERT_EQ(-1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
// Register a callback to update the first '-1' entry in test_vec_ to '0'
// after waiting a random amount of time.
em()->SetErrorNotificationCb(
ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
});
em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
// Running callbacks that haven't been registered should do nothing.
em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
ASSERT_EQ(-1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
// Now register another callback.
em()->SetErrorNotificationCb(
ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, uuid);
});
em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
ASSERT_EQ(1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
// Now unregister one of the callbacks. This should not affect the other.
em()->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
LOG(INFO) << "Final state of the vector: " << test_vec_string();
map<int, set<int>> positions = GetPositions();
set<int> disk_set = { 0 }; // The first entry should be DISK...
set<int> tablet_set = { 1, 2 }; // ...followed by NO_AVAILABLE_DISKS, NO_AVAILABLE_DISKS.
ASSERT_EQ(disk_set, FindOrDie(positions, ErrorHandlerType::DISK_ERROR));
ASSERT_EQ(tablet_set, FindOrDie(positions, ErrorHandlerType::NO_AVAILABLE_DISKS));
}
// Test that the callbacks get run serially.
TEST_F(FsErrorManagerTest, TestSerialization) {
em()->SetErrorNotificationCb(
ErrorHandlerType::DISK_ERROR, [this](const string& uuid) {
this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::DISK_ERROR, uuid);
});
em()->SetErrorNotificationCb(
ErrorHandlerType::NO_AVAILABLE_DISKS, [this](const string& uuid) {
this->SleepAndWriteFirstEmptyCb(ErrorHandlerType::NO_AVAILABLE_DISKS, uuid);
});
// Swap back and forth between error-handler type.
const auto IntToEnum = [&] (int i) {
return i % 2 == 0 ? ErrorHandlerType::DISK_ERROR : ErrorHandlerType::NO_AVAILABLE_DISKS;
};
vector<thread> cb_threads;
for (int i = 0; i < kVecSize; i++) {
// Each call will update the first available entry in test_vec_ after
// waiting a random amount of time. Without proper serialization, these
// could end up writing to the same entry.
cb_threads.emplace_back([&,i] {
em()->RunErrorNotificationCb(IntToEnum(i), "");
});
}
for (auto& t : cb_threads) {
t.join();
}
LOG(INFO) << "Final state of the vector: " << test_vec_string();
// Because the callbacks ran serially, all threads should have been able to
// get a unique slot in test_vec_, and thus, all entries should be filled.
// Note that the order of the calls does not matter, only the fact that they
// ran serially.
CHECK_EQ(-1, FindFirst(-1));
}
} // namespace fs
} // namespace kudu