blob: cf1d8ef0c4308119ec2c66ca965656d79a18ea96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "hbase/client/location-cache.h"
#include <folly/Conv.h>
#include <folly/Logging.h>
#include <folly/io/IOBuf.h>
#include <wangle/concurrent/CPUThreadPoolExecutor.h>
#include <wangle/concurrent/IOThreadPoolExecutor.h>
#include <map>
#include <shared_mutex>
#include <utility>
#include "hbase/connection/response.h"
#include "hbase/connection/rpc-connection.h"
#include "hbase/client/meta-utils.h"
#include "hbase/exceptions/exception.h"
#include "client/Client.pb.h"
#include "server/zookeeper/ZooKeeper.pb.h"
#include "hbase/serde/region-info.h"
#include "hbase/serde/server-name.h"
#include "hbase/serde/zk.h"
using hbase::pb::MetaRegionServer;
using hbase::pb::ServerName;
using hbase::pb::TableName;
namespace hbase {
LocationCache::LocationCache(std::shared_ptr<hbase::Configuration> conf,
std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
std::shared_ptr<ConnectionPool> cp)
: conf_(conf),
io_executor_(io_executor),
cpu_executor_(cpu_executor),
cp_(cp),
meta_promise_(nullptr),
meta_lock_(),
meta_util_(),
zk_(nullptr),
cached_locations_(),
locations_lock_() {
zk_quorum_ = ZKUtil::ParseZooKeeperQuorum(*conf_);
EnsureZooKeeperConnection();
}
LocationCache::~LocationCache() { CloseZooKeeperConnection(); }
void LocationCache::CloseZooKeeperConnection() {
if (zk_ != nullptr) {
zookeeper_close(zk_);
zk_ = nullptr;
LOG(INFO) << "Closed connection to ZooKeeper.";
}
}
void LocationCache::EnsureZooKeeperConnection() {
if (zk_ == nullptr) {
LOG(INFO) << "Connecting to ZooKeeper. Quorum:" + zk_quorum_;
auto session_timeout = ZKUtil::SessionTimeout(*conf_);
zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, session_timeout, nullptr, nullptr, 0);
}
}
folly::Future<ServerName> LocationCache::LocateMeta() {
std::lock_guard<std::recursive_mutex> g(meta_lock_);
if (meta_promise_ == nullptr) {
this->RefreshMetaLocation();
}
return meta_promise_->getFuture().onError([&](const folly::exception_wrapper &ew) {
auto promise = InvalidateMeta();
if (promise) {
promise->setException(ew);
}
throw ew;
return ServerName{};
});
}
std::shared_ptr<folly::SharedPromise<hbase::pb::ServerName>> LocationCache::InvalidateMeta() {
VLOG(2) << "Invalidating meta location";
std::lock_guard<std::recursive_mutex> g(meta_lock_);
if (meta_promise_ != nullptr) {
// return the unique_ptr back to the caller.
std::shared_ptr<folly::SharedPromise<hbase::pb::ServerName>> ret = nullptr;
std::swap(ret, meta_promise_);
return ret;
} else {
return nullptr;
}
}
void LocationCache::RefreshMetaLocation() {
meta_promise_ = std::make_shared<folly::SharedPromise<ServerName>>();
auto p = meta_promise_;
cpu_executor_->add([this, p] {
std::lock_guard<std::recursive_mutex> g(meta_lock_);
p->setWith([&] { return this->ReadMetaLocation(); });
});
}
// Note: this is a blocking call to zookeeper
ServerName LocationCache::ReadMetaLocation() {
auto buf = folly::IOBuf::create(4096);
ZkDeserializer derser;
EnsureZooKeeperConnection();
// This needs to be int rather than size_t as that's what ZK expects.
int len = buf->capacity();
std::string zk_node = ZKUtil::MetaZNode(*conf_);
int zk_result = zoo_get(this->zk_, zk_node.c_str(), 0,
reinterpret_cast<char *>(buf->writableData()), &len, nullptr);
if (zk_result != ZOK || len < 9) {
LOG(ERROR) << "Error getting meta location.";
// We just close the zk connection, and let the upper levels retry.
CloseZooKeeperConnection();
throw std::runtime_error("Error getting meta location. Quorum: " + zk_quorum_);
}
buf->append(len);
MetaRegionServer mrs;
if (derser.Parse(buf.get(), &mrs) == false) {
LOG(ERROR) << "Unable to decode";
throw std::runtime_error("Error getting meta location (Unable to decode). Quorum: " +
zk_quorum_);
}
return mrs.server();
}
folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
const TableName &tn, const std::string &row) {
return this->LocateMeta()
.via(cpu_executor_.get())
.then([this](ServerName sn) {
// TODO: use RpcClient?
auto remote_id = std::make_shared<ConnectionId>(sn.host_name(), sn.port());
return this->cp_->GetConnection(remote_id);
})
.then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
return rpc_connection->SendRequest(std::move(meta_util_.MetaRequest(tn, row)));
})
.onError([&](const folly::exception_wrapper &ew) {
auto promise = InvalidateMeta();
throw ew;
return static_cast<std::unique_ptr<Response>>(nullptr);
})
.then([tn, this](std::unique_ptr<Response> resp) {
// take the protobuf response and make it into
// a region location.
return meta_util_.CreateLocation(std::move(*resp), tn);
})
.then([tn, this](std::shared_ptr<RegionLocation> rl) {
// Make sure that the correct location was found.
if (rl->region_info().table_name().namespace_() != tn.namespace_() ||
rl->region_info().table_name().qualifier() != tn.qualifier()) {
throw TableNotFoundException(folly::to<std::string>(tn));
}
return rl;
})
.then([this](std::shared_ptr<RegionLocation> rl) {
auto remote_id =
std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port());
return rl;
})
.then([tn, this](std::shared_ptr<RegionLocation> rl) {
// now add fetched location to the cache.
this->CacheLocation(tn, rl);
return rl;
});
}
constexpr const char *MetaUtil::kMetaRegionName;
folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateRegion(
const TableName &tn, const std::string &row, const RegionLocateType locate_type,
const int64_t locate_ns) {
// We maybe asked to locate meta itself
if (MetaUtil::IsMeta(tn)) {
return LocateMeta().then([this](const ServerName &server_name) {
auto rl = std::make_shared<RegionLocation>(MetaUtil::kMetaRegionName,
meta_util_.meta_region_info(), server_name);
return rl;
});
}
// TODO: implement region locate type and timeout
auto cached_loc = this->GetCachedLocation(tn, row);
if (cached_loc != nullptr) {
return cached_loc;
} else {
return this->LocateFromMeta(tn, row);
}
}
// must hold shared lock on locations_lock_
std::shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName &tn,
const std::string &row) {
auto t_locs = this->GetTableLocations(tn);
std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
// looking for the "floor" key as a start key
auto possible_region = t_locs->upper_bound(row);
if (t_locs->empty()) {
VLOG(5) << "Could not find region in cache, table map is empty";
return nullptr;
}
if (possible_region == t_locs->begin()) {
VLOG(5) << "Could not find region in cache, all keys are greater, row:" << row
<< " ,possible_region:" << possible_region->second->DebugString();
return nullptr;
}
--possible_region;
VLOG(5) << "Found possible region in cache for row:" << row
<< " ,possible_region:" << possible_region->second->DebugString();
// found possible start key, now need to check end key
if (possible_region->second->region_info().end_key() == "" ||
possible_region->second->region_info().end_key() > row) {
VLOG(2) << "Found region in cache for row:" << row
<< " ,region:" << possible_region->second->DebugString();
return possible_region->second;
} else {
return nullptr;
}
}
// must hold unique lock on locations_lock_
void LocationCache::CacheLocation(const hbase::pb::TableName &tn,
const std::shared_ptr<RegionLocation> loc) {
auto t_locs = this->GetTableLocations(tn);
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
(*t_locs)[loc->region_info().start_key()] = loc;
VLOG(1) << "Cached location for region:" << loc->DebugString();
}
// must hold shared lock on locations_lock_
bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, const std::string &row) {
return (this->GetCachedLocation(tn, row) != nullptr);
}
// shared lock needed for cases when this table has been requested before;
// in the rare case it hasn't, unique lock will be grabbed to add it to cache
std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetTableLocations(
const hbase::pb::TableName &tn) {
auto found_locs = this->GetCachedTableLocations(tn);
if (found_locs == nullptr) {
found_locs = this->GetNewTableLocations(tn);
}
return found_locs;
}
std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetCachedTableLocations(
const hbase::pb::TableName &tn) {
folly::SharedMutexWritePriority::ReadHolder r_holder{locations_lock_};
auto table_locs = cached_locations_.find(tn);
if (table_locs != cached_locations_.end()) {
return table_locs->second;
} else {
return nullptr;
}
}
std::shared_ptr<hbase::PerTableLocationMap> LocationCache::GetNewTableLocations(
const hbase::pb::TableName &tn) {
// double-check locking under upgradable lock
folly::SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_};
auto table_locs = cached_locations_.find(tn);
if (table_locs != cached_locations_.end()) {
return table_locs->second;
}
folly::SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
auto t_locs_p = std::make_shared<std::map<std::string, std::shared_ptr<RegionLocation>>>();
cached_locations_.insert(std::make_pair(tn, t_locs_p));
return t_locs_p;
}
// must hold unique lock on locations_lock_
void LocationCache::ClearCache() {
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
cached_locations_.clear();
}
// must hold unique lock on locations_lock_
void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) {
VLOG(1) << "ClearCachedLocations, table:" << folly::to<std::string>(tn);
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
cached_locations_.erase(tn);
if (MetaUtil::IsMeta(tn)) {
InvalidateMeta();
}
}
// must hold unique lock on locations_lock_
void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row) {
VLOG(1) << "ClearCachedLocation, table:" << folly::to<std::string>(tn) << ", row:" << row;
auto table_locs = this->GetTableLocations(tn);
std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
table_locs->erase(row);
if (MetaUtil::IsMeta(tn)) {
InvalidateMeta();
}
}
void LocationCache::UpdateCachedLocation(const RegionLocation &loc,
const folly::exception_wrapper &error) {
// TODO: just clears the location for now. We can inspect RegionMovedExceptions, etc later.
ClearCachedLocation(loc.region_info().table_name(), loc.region_info().start_key());
}
} // namespace hbase