blob: fdf44c16b80d4d528701fe6ad8bf2c365dc64a6c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/client-cache.h"
#include <sstream>
#include <thrift/server/TServer.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <memory>
#include "common/logging.h"
#include "util/container-util.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "rpc/thrift-util.h"
#include "common/names.h"
using namespace apache::thrift;
using namespace apache::thrift::server;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
namespace impala {
Status ClientCacheHelper::GetClient(const TNetworkAddress& address,
ClientFactory factory_method, ClientKey* client_key) {
shared_ptr<PerHostCache> host_cache;
{
lock_guard<mutex> lock(cache_lock_);
VLOG(2) << "GetClient(" << TNetworkAddressToString(address) << ")";
shared_ptr<PerHostCache>* ptr = &per_host_caches_[address];
if (ptr->get() == NULL) ptr->reset(new PerHostCache());
host_cache = *ptr;
}
{
lock_guard<mutex> lock(host_cache->lock);
if (!host_cache->clients.empty()) {
*client_key = host_cache->clients.front();
VLOG(2) << "GetClient(): returning cached client for " <<
TNetworkAddressToString(address);
host_cache->clients.pop_front();
if (metrics_enabled_) clients_in_use_metric_->Increment(1);
return Status::OK();
}
}
// Only get here if host_cache->clients.empty(). No need for the lock.
RETURN_IF_ERROR(CreateClient(address, factory_method, client_key));
if (metrics_enabled_) clients_in_use_metric_->Increment(1);
return Status::OK();
}
Status ClientCacheHelper::ReopenClient(ClientFactory factory_method,
ClientKey* client_key) {
// Clients are not ordinarily removed from the cache completely (in the future, they may
// be); this is the only method where a client may be deleted and replaced with another.
shared_ptr<ThriftClientImpl> client_impl;
ClientMap::iterator client;
{
lock_guard<mutex> lock(client_map_lock_);
client = client_map_.find(*client_key);
DCHECK(client != client_map_.end());
client_impl = client->second;
}
VLOG(1) << "ReopenClient(): re-creating client for " <<
TNetworkAddressToString(client_impl->address());
client_impl->Close();
// TODO: Thrift TBufferedTransport cannot be re-opened after Close() because it does not
// clean up internal buffers it reopens. To work around this issue, create a new client
// instead.
ClientKey old_client_key = *client_key;
Status status = CreateClient(client_impl->address(), factory_method, client_key);
// Only erase the existing client from the map if creation of the new one succeeded.
// This helps to ensure the proper accounting of metrics in the presence of
// re-connection failures (the original client should be released as usual).
if (status.ok()) {
// CreateClient() will increment total_clients_metric_ if succeed.
if (metrics_enabled_) {
total_clients_metric_->Increment(-1);
DCHECK_GE(total_clients_metric_->GetValue(), 0);
}
lock_guard<mutex> lock(client_map_lock_);
client_map_.erase(client);
} else {
// Restore the client used before the failed re-opening attempt, so the caller can
// properly release it.
*client_key = old_client_key;
}
return status;
}
Status ClientCacheHelper::CreateClient(const TNetworkAddress& address,
ClientFactory factory_method, ClientKey* client_key) {
shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key));
VLOG(2) << "CreateClient(): creating new client for " <<
TNetworkAddressToString(client_impl->address());
if (!client_impl->init_status().ok()) {
*client_key = nullptr;
return client_impl->init_status();
}
// Set the TSocket's send and receive timeouts.
client_impl->setRecvTimeout(recv_timeout_ms_);
client_impl->setSendTimeout(send_timeout_ms_);
Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_);
if (!status.ok()) {
*client_key = nullptr;
return status;
}
// Because the client starts life 'checked out', we don't add it to its host cache.
{
lock_guard<mutex> lock(client_map_lock_);
client_map_[*client_key] = client_impl;
}
if (metrics_enabled_) total_clients_metric_->Increment(1);
return Status::OK();
}
void ClientCacheHelper::ReleaseClient(ClientKey* client_key) {
DCHECK(*client_key != NULL) << "Trying to release NULL client";
shared_ptr<ThriftClientImpl> client_impl;
{
lock_guard<mutex> lock(client_map_lock_);
ClientMap::iterator client = client_map_.find(*client_key);
DCHECK(client != client_map_.end());
client_impl = client->second;
}
VLOG(2) << "Releasing client for " << TNetworkAddressToString(client_impl->address())
<< " back to cache";
{
lock_guard<mutex> lock(cache_lock_);
PerHostCacheMap::iterator cache = per_host_caches_.find(client_impl->address());
DCHECK(cache != per_host_caches_.end());
lock_guard<mutex> entry_lock(cache->second->lock);
cache->second->clients.push_back(*client_key);
}
if (metrics_enabled_) clients_in_use_metric_->Increment(-1);
*client_key = NULL;
}
void ClientCacheHelper::DestroyClient(ClientKey* client_key) {
DCHECK(*client_key != NULL) << "Trying to destroy NULL client";
shared_ptr<ThriftClientImpl> client_impl;
ClientMap::iterator client;
{
lock_guard<mutex> lock(client_map_lock_);
client = client_map_.find(*client_key);
DCHECK(client != client_map_.end());
client_impl = client->second;
}
VLOG(1) << "Broken Connection, destroy client for " <<
TNetworkAddressToString(client_impl->address());
client_impl->Close();
if (metrics_enabled_) total_clients_metric_->Increment(-1);
if (metrics_enabled_) clients_in_use_metric_->Increment(-1);
lock_guard<mutex> lock(client_map_lock_);
client_map_.erase(client);
*client_key = NULL;
}
void ClientCacheHelper::CloseConnections(const TNetworkAddress& address) {
PerHostCache* cache;
{
lock_guard<mutex> lock(cache_lock_);
PerHostCacheMap::iterator cache_it = per_host_caches_.find(address);
if (cache_it == per_host_caches_.end()) return;
cache = cache_it->second.get();
}
{
VLOG(2) << "Invalidating all " << cache->clients.size() << " clients for: "
<< TNetworkAddressToString(address);
lock_guard<mutex> entry_lock(cache->lock);
lock_guard<mutex> map_lock(client_map_lock_);
for (ClientKey client_key: cache->clients) {
ClientMap::iterator client_map_entry = client_map_.find(client_key);
DCHECK(client_map_entry != client_map_.end());
client_map_entry->second->Close();
}
}
}
string ClientCacheHelper::DebugString() {
lock_guard<mutex> lock(cache_lock_);
stringstream out;
out << "ClientCacheHelper(#hosts=" << per_host_caches_.size()
<< " [";
bool first = true;
for (const PerHostCacheMap::value_type& cache: per_host_caches_) {
lock_guard<mutex> host_cache_lock(cache.second->lock);
if (!first) out << " ";
out << TNetworkAddressToString(cache.first) << ":" << cache.second->clients.size();
first = false;
}
out << "])";
return out.str();
}
void ClientCacheHelper::TestShutdown() {
vector<TNetworkAddress> addresses;
{
lock_guard<mutex> lock(cache_lock_);
for (const PerHostCacheMap::value_type& cache_entry: per_host_caches_) {
addresses.push_back(cache_entry.first);
}
}
for (const TNetworkAddress& address: addresses) {
CloseConnections(address);
}
}
void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string& key_prefix) {
DCHECK(metrics != NULL);
// Not strictly needed if InitMetrics is called before any cache usage, but ensures that
// metrics_enabled_ is published.
lock_guard<mutex> lock(cache_lock_);
stringstream count_ss;
count_ss << key_prefix << ".client-cache.clients-in-use";
clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
stringstream max_ss;
max_ss << key_prefix << ".client-cache.total-clients";
total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
metrics_enabled_ = true;
}
}