blob: b0f4afb6e85fc280332daca04bb933f41bcbc42b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <folly/Logging.h>
#include <folly/Memory.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <gtest/gtest.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
#include <chrono>
#include <functional>
#include <string>
#include "hbase/connection/rpc-client.h"
#include "hbase/client/async-batch-rpc-retrying-caller.h"
#include "hbase/client/async-connection.h"
#include "hbase/client/async-rpc-retrying-caller-factory.h"
#include "hbase/client/client.h"
#include "hbase/client/connection-configuration.h"
#include "hbase/client/keyvalue-codec.h"
#include "hbase/client/region-location.h"
#include "hbase/client/result.h"
#include "hbase/exceptions/exception.h"
#include "hbase/test-util/test-util.h"
#include "hbase/utils/time-util.h"
using hbase::AsyncRpcRetryingCallerFactory;
using hbase::AsyncConnection;
using hbase::AsyncRegionLocator;
using hbase::ConnectionConfiguration;
using hbase::Configuration;
using hbase::HBaseRpcController;
using hbase::RegionLocation;
using hbase::RegionLocateType;
using hbase::RpcClient;
using hbase::RequestConverter;
using hbase::ResponseConverter;
using hbase::Put;
using hbase::TimeUtil;
using hbase::Client;
using hbase::security::User;
using std::chrono::nanoseconds;
using std::chrono::milliseconds;
using namespace hbase;
using folly::exception_wrapper;
class AsyncBatchRpcRetryTest : public ::testing::Test {
public:
static std::unique_ptr<hbase::TestUtil> test_util;
static std::string tableName;
static void SetUpTestCase() {
google::InstallFailureSignalHandler();
test_util = std::make_unique<hbase::TestUtil>();
test_util->StartMiniCluster(2);
std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400",
"test500", "test600", "test700", "test800", "test900"};
tableName = "split-table1";
test_util->CreateTable(tableName, "d", keys);
}
};
std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr;
std::string AsyncBatchRpcRetryTest::tableName;
class AsyncRegionLocatorBase : public AsyncRegionLocator {
public:
AsyncRegionLocatorBase() {}
explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
: region_location_(region_location) {}
virtual ~AsyncRegionLocatorBase() = default;
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
const std::string &row,
const RegionLocateType,
const int64_t) override {
folly::Promise<std::shared_ptr<RegionLocation>> promise;
promise.setValue(region_locations_.at(row));
return promise.getFuture();
}
virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
region_location_ = region_location;
}
virtual void set_region_location(
const std::map<std::string, std::shared_ptr<RegionLocation>> &reg_locs) {
for (auto reg_loc : reg_locs) {
region_locations_[reg_loc.first] = reg_loc.second;
}
}
void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override {
}
protected:
std::shared_ptr<RegionLocation> region_location_;
std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_;
std::map<std::string, uint32_t> mtries_;
std::map<std::string, uint32_t> mnum_fails_;
void InitRetryMaps(uint32_t num_fails) {
if (mtries_.size() == 0 && mnum_fails_.size() == 0) {
for (auto reg_loc : region_locations_) {
mtries_[reg_loc.first] = 0;
mnum_fails_[reg_loc.first] = num_fails;
}
}
}
};
class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
public:
MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockAsyncRegionLocator() {}
};
class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
private:
uint32_t counter_ = 0;
uint32_t num_fails_ = 0;
uint32_t tries_ = 0;
public:
explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
: AsyncRegionLocatorBase(), num_fails_(num_fails) {}
explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockWrongRegionAsyncRegionLocator() {}
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
const hbase::pb::TableName &tn, const std::string &row,
const RegionLocateType locate_type = RegionLocateType::kCurrent,
const int64_t locate_ns = 0) override {
InitRetryMaps(num_fails_);
auto &tries = mtries_[row];
auto &num_fails = mnum_fails_[row];
if (++tries > num_fails) {
return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
}
folly::Promise<std::shared_ptr<RegionLocation>> promise;
/* set random region name, simulating invalid region */
auto result = std::make_shared<RegionLocation>("whatever-region-name",
region_locations_.at(row)->region_info(),
region_locations_.at(row)->server_name());
promise.setValue(result);
return promise.getFuture();
}
};
class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
private:
uint32_t tries_ = 0;
uint32_t num_fails_ = 0;
uint32_t counter_ = 0;
public:
explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
: AsyncRegionLocatorBase(), num_fails_(num_fails) {}
explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockFailingAsyncRegionLocator() {}
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
const hbase::pb::TableName &tn, const std::string &row,
const RegionLocateType locate_type = RegionLocateType::kCurrent,
const int64_t locate_ns = 0) override {
InitRetryMaps(num_fails_);
auto &tries = mtries_[row];
auto &num_fails = mnum_fails_[row];
if (++tries > num_fails) {
return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
}
folly::Promise<std::shared_ptr<RegionLocation>> promise;
promise.setException(std::runtime_error{"Failed to look up region location"});
return promise.getFuture();
}
};
class MockAsyncConnection : public AsyncConnection,
public std::enable_shared_from_this<MockAsyncConnection> {
public:
MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
std::shared_ptr<folly::HHWheelTimer> retry_timer,
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
std::shared_ptr<RpcClient> rpc_client,
std::shared_ptr<AsyncRegionLocator> region_locator)
: conn_conf_(conn_conf),
retry_timer_(retry_timer),
cpu_executor_(cpu_executor),
io_executor_(io_executor),
retry_executor_(retry_executor),
rpc_client_(rpc_client),
region_locator_(region_locator) {}
~MockAsyncConnection() {}
void Init() {
caller_factory_ =
std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
}
std::shared_ptr<Configuration> conf() override { return nullptr; }
std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
return caller_factory_;
}
std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
return retry_executor_;
}
void Close() override {
retry_timer_->destroy();
retry_executor_->stop();
io_executor_->stop();
cpu_executor_->stop();
}
std::shared_ptr<HBaseRpcController> CreateRpcController() override {
return std::make_shared<HBaseRpcController>();
}
private:
std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<ConnectionConfiguration> conn_conf_;
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
std::shared_ptr<RpcClient> rpc_client_;
std::shared_ptr<AsyncRegionLocator> region_locator_;
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
};
class MockRawAsyncTableImpl {
public:
explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn,
std::shared_ptr<hbase::pb::TableName> tn)
: conn_(conn), tn_(tn) {}
virtual ~MockRawAsyncTableImpl() = default;
/* implement this in real RawAsyncTableImpl. */
template <typename REQ, typename RESP>
folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) {
/* init request caller builder */
auto builder = conn_->caller_factory()->Batch<REQ, RESP>();
/* call with retry to get result */
auto async_caller =
builder->table(tn_)
->actions(std::make_shared<std::vector<REQ>>(rows))
->rpc_timeout(conn_->connection_conf()->read_rpc_timeout())
->operation_timeout(conn_->connection_conf()->operation_timeout())
->pause(conn_->connection_conf()->pause())
->max_attempts(conn_->connection_conf()->max_retries())
->start_log_errors_count(conn_->connection_conf()->start_log_errors_count())
->Build();
return async_caller->Call().then([async_caller](auto r) { return r; });
}
private:
std::shared_ptr<MockAsyncConnection> conn_;
std::shared_ptr<hbase::pb::TableName> tn_;
};
std::shared_ptr<MockAsyncConnection> getAsyncConnection(
Client &client, uint32_t operation_timeout_millis, uint32_t tries,
std::shared_ptr<AsyncRegionLocatorBase> region_locator) {
/* init region location and rpc channel */
auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io_executor_ = client.async_connection()->io_executor();
auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
auto codec = std::make_shared<hbase::KeyValueCodec>();
auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
AsyncBatchRpcRetryTest::test_util->conf());
std::shared_ptr<folly::HHWheelTimer> retry_timer =
folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
/* init connection configuration */
auto connection_conf = std::make_shared<ConnectionConfiguration>(
TimeUtil::SecondsToNanos(20), // connect_timeout
TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
TimeUtil::SecondsToNanos(60), // rpc_timeout
TimeUtil::MillisToNanos(100), // pause
tries, // max retries
1); // start log errors count
return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
io_executor_, retry_executor_, rpc_client,
region_locator);
}
template <typename ACTION>
std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) {
std::vector<std::shared_ptr<hbase::Row>> rows;
for (auto action : actions) {
std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action);
rows.push_back(srow);
}
return rows;
}
template <typename REQ, typename RESP>
std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions,
std::vector<folly::Try<RESP>> &tresults) {
std::vector<std::shared_ptr<hbase::Result>> results{};
uint64_t num = 0;
for (auto tresult : tresults) {
if (tresult.hasValue()) {
results.push_back(tresult.value());
} else if (tresult.hasException()) {
folly::exception_wrapper ew = tresult.exception();
LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for "
<< actions[num].row();
throw ew;
}
++num;
}
return results;
}
template <typename ACTION>
std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions(
uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) {
std::map<std::string, std::shared_ptr<RegionLocation>> region_locations;
for (uint64_t i = 0; i < num_rows; ++i) {
auto row = "test" + std::to_string(i);
ACTION action(row);
actions.push_back(action);
region_locations[row] = table->GetRegionLocation(row);
}
return region_locations;
}
void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
const std::string &table_name, bool split_regions, uint32_t tries = 3,
uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) {
// Create TableName and Row to be fetched from HBase
auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
// Create a client
Client client(*AsyncBatchRpcRetryTest::test_util->conf());
// Get connection to HBase Table
std::shared_ptr<Table> table = client.Table(tn);
for (uint64_t i = 0; i < num_rows; i++) {
table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i),
"value" + std::to_string(i)));
}
std::vector<hbase::Get> gets;
auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table);
/* set region locator */
region_locator->set_region_location(region_locations);
/* init hbase client connection */
auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
conn->Init();
/* init retry caller factory */
auto tableImpl =
std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets);
auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
milliseconds(operation_timeout_millis));
ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults);
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
uint32_t i = 0;
for (; i < num_rows; ++i) {
ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row()
<< " must not be empty";
EXPECT_EQ("test" + std::to_string(i), results[i]->Row());
EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value());
}
table->Close();
client.Close();
conn->Close();
}
void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator,
const std::string &table_name, bool split_regions, uint32_t tries = 3,
uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) {
// Create TableName and Row to be fetched from HBase
auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName);
// Create a client
Client client(*AsyncBatchRpcRetryTest::test_util->conf());
// Get connection to HBase Table
std::shared_ptr<Table> table = client.Table(tn);
std::vector<hbase::Put> puts;
auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table);
/* set region locator */
region_locator->set_region_location(region_locations);
/* init hbase client connection */
auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator);
conn->Init();
/* init retry caller factory */
auto tableImpl =
std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn));
std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts);
auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get(
milliseconds(operation_timeout_millis));
ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty.";
auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults);
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty.";
table->Close();
client.Close();
conn->Close();
}
// Test successful case
TEST_F(AsyncBatchRpcRetryTest, MultiGets) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
runMultiGets(region_locator, "table1", false);
}
// Tests the RPC failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
runMultiGets(region_locator, "table2", false, 5);
}
// Tests the RPC failing 4 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false));
}
// Tests the region location lookup failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
runMultiGets(region_locator, "table4", false);
}
// Tests the region location lookup failing 5 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3));
}
// Tests hitting operation timeout, thus not retrying anymore
TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(6));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000));
}
//////////////////////
// Test successful case
TEST_F(AsyncBatchRpcRetryTest, MultiPuts) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
runMultiPuts(region_locator, "table1", false);
}
// Tests the RPC failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
runMultiPuts(region_locator, "table2", false, 5);
}
// Tests the RPC failing 4 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false));
}
// Tests the region location lookup failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
runMultiPuts(region_locator, "table4", false);
}
// Tests the region location lookup failing 5 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3));
}
// Tests hitting operation timeout, thus not retrying anymore
TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(6));
EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000));
}
// Test successful case
TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
runMultiGets(region_locator, "table7", true);
}
// Tests the RPC failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
runMultiGets(region_locator, "table8", true, 5);
}
// Tests the RPC failing 4 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true));
}
// Tests the region location lookup failing 3 times, then succeeding
TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
runMultiGets(region_locator, "table10", true);
}
// Tests the region location lookup failing 5 times, throwing an exception
TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(4));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3));
}
// Tests hitting operation timeout, thus not retrying anymore
TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(6));
EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000));
}