blob: b590b43fd316ddc88ad2d6a912ef3bbb248e425b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <folly/Logging.h>
#include <folly/Memory.h>
#include <folly/futures/Future.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <gmock/gmock.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
#include <chrono>
#include <functional>
#include <string>
#include "hbase/connection/request.h"
#include "hbase/connection/response.h"
#include "hbase/connection/rpc-client.h"
#include "hbase/client/async-connection.h"
#include "hbase/client/async-rpc-retrying-caller-factory.h"
#include "hbase/client/async-rpc-retrying-caller.h"
#include "hbase/client/client.h"
#include "hbase/client/connection-configuration.h"
#include "hbase/client/hbase-rpc-controller.h"
#include "hbase/client/keyvalue-codec.h"
#include "hbase/client/region-location.h"
#include "hbase/client/request-converter.h"
#include "hbase/client/response-converter.h"
#include "hbase/client/result.h"
#include "hbase/exceptions/exception.h"
#include "hbase/if/Client.pb.h"
#include "hbase/if/HBase.pb.h"
#include "hbase/test-util/test-util.h"
#include "hbase/utils/time-util.h"
using hbase::AsyncRpcRetryingCallerFactory;
using hbase::AsyncConnection;
using hbase::AsyncRegionLocator;
using hbase::ConnectionConfiguration;
using hbase::Configuration;
using hbase::HBaseRpcController;
using hbase::RegionLocation;
using hbase::RegionLocateType;
using hbase::RpcClient;
using hbase::RequestConverter;
using hbase::ResponseConverter;
using hbase::ReqConverter;
using hbase::RespConverter;
using hbase::Put;
using hbase::TimeUtil;
using hbase::Client;
using hbase::security::User;
using ::testing::Return;
using ::testing::_;
using std::chrono::nanoseconds;
using std::chrono::milliseconds;
using namespace hbase;
using folly::exception_wrapper;
class AsyncRpcRetryTest : public ::testing::Test {
public:
static std::unique_ptr<hbase::TestUtil> test_util;
static void SetUpTestCase() {
google::InstallFailureSignalHandler();
test_util = std::make_unique<hbase::TestUtil>();
test_util->StartMiniCluster(2);
}
};
std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr;
class AsyncRegionLocatorBase : public AsyncRegionLocator {
public:
AsyncRegionLocatorBase() {}
explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location)
: region_location_(region_location) {}
virtual ~AsyncRegionLocatorBase() = default;
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &,
const std::string &,
const RegionLocateType,
const int64_t) override {
folly::Promise<std::shared_ptr<RegionLocation>> promise;
promise.setValue(region_location_);
return promise.getFuture();
}
virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) {
region_location_ = region_location;
}
void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {}
protected:
std::shared_ptr<RegionLocation> region_location_;
};
class MockAsyncRegionLocator : public AsyncRegionLocatorBase {
public:
MockAsyncRegionLocator() : AsyncRegionLocatorBase() {}
explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockAsyncRegionLocator() {}
};
class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase {
private:
uint32_t tries_ = 0;
uint32_t num_fails_ = 0;
public:
explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails)
: AsyncRegionLocatorBase(), num_fails_(num_fails) {}
explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockWrongRegionAsyncRegionLocator() {}
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
const hbase::pb::TableName &tn, const std::string &row,
const RegionLocateType locate_type = RegionLocateType::kCurrent,
const int64_t locate_ns = 0) override {
// Fail for num_fails_ times, then delegate to the super class which will give the correct
// region location.
if (tries_++ > num_fails_) {
return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
}
folly::Promise<std::shared_ptr<RegionLocation>> promise;
/* set random region name, simulating invalid region */
auto result = std::make_shared<RegionLocation>(
"whatever-region-name", region_location_->region_info(), region_location_->server_name());
promise.setValue(result);
return promise.getFuture();
}
};
class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase {
private:
uint32_t tries_ = 0;
uint32_t num_fails_ = 0;
public:
explicit MockFailingAsyncRegionLocator(uint32_t num_fails)
: AsyncRegionLocatorBase(), num_fails_(num_fails) {}
explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location)
: AsyncRegionLocatorBase(region_location) {}
virtual ~MockFailingAsyncRegionLocator() {}
folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(
const hbase::pb::TableName &tn, const std::string &row,
const RegionLocateType locate_type = RegionLocateType::kCurrent,
const int64_t locate_ns = 0) override {
// Fail for num_fails_ times, then delegate to the super class which will give the correct
// region location.
if (tries_++ > num_fails_) {
return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns);
}
folly::Promise<std::shared_ptr<RegionLocation>> promise;
promise.setException(std::runtime_error{"Failed to look up region location"});
return promise.getFuture();
}
};
class MockAsyncConnection : public AsyncConnection,
public std::enable_shared_from_this<MockAsyncConnection> {
public:
MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf,
std::shared_ptr<folly::HHWheelTimer> retry_timer,
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor,
std::shared_ptr<RpcClient> rpc_client,
std::shared_ptr<AsyncRegionLocator> region_locator)
: conn_conf_(conn_conf),
retry_timer_(retry_timer),
cpu_executor_(cpu_executor),
io_executor_(io_executor),
retry_executor_(retry_executor),
rpc_client_(rpc_client),
region_locator_(region_locator) {}
~MockAsyncConnection() {}
void Init() {
caller_factory_ =
std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_);
}
std::shared_ptr<Configuration> conf() override { return nullptr; }
std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; }
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override {
return caller_factory_;
}
std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; }
std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; }
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; }
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; }
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override {
return retry_executor_;
}
void Close() override {}
std::shared_ptr<HBaseRpcController> CreateRpcController() override {
return std::make_shared<HBaseRpcController>();
}
private:
std::shared_ptr<folly::HHWheelTimer> retry_timer_;
std::shared_ptr<ConnectionConfiguration> conn_conf_;
std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_;
std::shared_ptr<RpcClient> rpc_client_;
std::shared_ptr<AsyncRegionLocator> region_locator_;
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_;
};
template <typename CONN>
class MockRawAsyncTableImpl {
public:
explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {}
virtual ~MockRawAsyncTableImpl() = default;
/* implement this in real RawAsyncTableImpl. */
/* in real RawAsyncTableImpl, this should be private. */
folly::Future<std::shared_ptr<hbase::Result>> GetCall(
std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<RegionLocation> loc, const hbase::Get &get) {
hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = [](
std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc,
std::shared_ptr<HBaseRpcController> controller,
std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> {
VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:"
<< loc->DebugString();
return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
std::move(preq), User::defaultUser(), "ClientService");
};
return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>(
rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call,
&hbase::ResponseConverter::FromGetResponse);
}
/* in real RawAsyncTableImpl, this should be private. */
template <typename REQ, typename PREQ, typename PRESP, typename RESP>
folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client,
std::shared_ptr<HBaseRpcController> controller,
std::shared_ptr<RegionLocation> loc, const REQ &req,
ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter,
hbase::RpcCall<PREQ, PRESP> rpc_call,
RespConverter<RESP, PRESP> resp_converter) {
promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
auto f = promise_->getFuture();
VLOG(1) << "calling rpc_call";
rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name())))
.then([&, this, resp_converter](std::unique_ptr<PRESP> presp) {
VLOG(1) << "MockRawAsyncTableImpl#call succeded: ";
RESP result = resp_converter(*presp);
promise_->setValue(result);
})
.onError([this](const exception_wrapper &e) {
VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what();
VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name();
promise_->setException(e);
});
return f;
}
private:
std::shared_ptr<CONN> conn_;
std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_;
};
void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName,
uint32_t operation_timeout_millis = 1200000) {
AsyncRpcRetryTest::test_util->CreateTable(tableName, "d");
// Create TableName and Row to be fetched from HBase
auto tn = folly::to<hbase::pb::TableName>(tableName);
auto row = "test2";
// Get to be performed on above HBase Table
hbase::Get get(row);
// Create a client
Client client(*(AsyncRpcRetryTest::test_util->conf()));
// Get connection to HBase Table
auto table = client.Table(tn);
table->Put(Put{"test2"}.AddColumn("d", "2", "value2"));
table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra"));
/* init region location and rpc channel */
auto region_location = table->GetRegionLocation(row);
// auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4);
auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
auto io_executor_ = client.async_connection()->io_executor();
auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1);
auto codec = std::make_shared<hbase::KeyValueCodec>();
auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec,
AsyncRpcRetryTest::test_util->conf());
// auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true);
std::shared_ptr<folly::HHWheelTimer> retry_timer =
folly::HHWheelTimer::newTimer(retry_executor_->getEventBase());
/* init connection configuration */
auto connection_conf = std::make_shared<ConnectionConfiguration>(
TimeUtil::SecondsToNanos(20), // connect_timeout
TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout
TimeUtil::SecondsToNanos(60), // rpc_timeout
TimeUtil::MillisToNanos(100), // pause
5, // max retries
9); // start log errors count
/* set region locator */
region_locator->set_region_location(region_location);
/* init hbase client connection */
auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_,
io_executor_, retry_executor_, rpc_client,
region_locator);
conn->Init();
/* init retry caller factory */
auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn);
/* init request caller builder */
auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>();
/* call with retry to get result */
auto async_caller =
builder->table(std::make_shared<hbase::pb::TableName>(tn))
->row(row)
->rpc_timeout(conn->connection_conf()->read_rpc_timeout())
->operation_timeout(conn->connection_conf()->operation_timeout())
->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
std::shared_ptr<hbase::RegionLocation> loc,
std::shared_ptr<hbase::RpcClient> rpc_client)
-> folly::Future<std::shared_ptr<hbase::Result>> {
return tableImpl->GetCall(rpc_client, controller, loc, get);
})
->Build();
auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>();
auto result = async_caller->Call().get(milliseconds(500000));
// Test the values, should be same as in put executed on hbase shell
ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
EXPECT_EQ("test2", result->Row());
EXPECT_EQ("value2", *(result->Value("d", "2")));
EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
retry_timer->destroy();
table->Close();
client.Close();
retry_executor_->stop();
}
// Test successful case
TEST_F(AsyncRpcRetryTest, TestGetBasic) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockAsyncRegionLocator>());
runTest(region_locator, "table1");
}
// Tests the RPC failing 3 times, then succeeding
TEST_F(AsyncRpcRetryTest, TestHandleException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(3));
runTest(region_locator, "table2");
}
// Tests the RPC failing 5 times, throwing an exception
TEST_F(AsyncRpcRetryTest, TestFailWithException) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockWrongRegionAsyncRegionLocator>(5));
EXPECT_ANY_THROW(runTest(region_locator, "table3"));
}
// Tests the region location lookup failing 3 times, then succeeding
TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
runTest(region_locator, "table4");
}
// Tests the region location lookup failing 5 times, throwing an exception
TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(5));
EXPECT_ANY_THROW(runTest(region_locator, "table5"));
}
// Tests hitting operation timeout, thus not retrying anymore
TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) {
std::shared_ptr<AsyncRegionLocatorBase> region_locator(
std::make_shared<MockFailingAsyncRegionLocator>(3));
EXPECT_ANY_THROW(runTest(region_locator, "table6", 200));
}