blob: 678392eb224cbe2bd573678560ddc3a03f282aea [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/master/ts_descriptor.h"
#include <cmath>
#include <mutex>
#include <ostream>
#include <unordered_set>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
DEFINE_int32(tserver_unresponsive_timeout_ms, 60 * 1000,
"The period of time that a Master can go without receiving a heartbeat from a "
"tablet server before considering it unresponsive. Unresponsive servers are not "
"selected when assigning replicas during table creation or re-replication.");
TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
DEFINE_double(tserver_last_replica_creations_halflife_ms, 60 * 1000,
"The half-life of last replica creations time. Only for testing!");
TAG_FLAG(tserver_last_replica_creations_halflife_ms, hidden);
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace master {
Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
const boost::optional<std::string>& location,
DnsResolver* dns_resolver,
shared_ptr<TSDescriptor>* desc) {
shared_ptr<TSDescriptor> ret(TSDescriptor::make_shared(instance.permanent_uuid()));
RETURN_NOT_OK(ret->Register(
instance, registration, location, dns_resolver));
*desc = std::move(ret);
return Status::OK();
}
TSDescriptor::TSDescriptor(std::string perm_id)
: permanent_uuid_(std::move(perm_id)),
latest_seqno_(-1),
last_heartbeat_(MonoTime::Now()),
needs_full_report_(false),
recent_replica_creations_(0),
last_replica_creations_decay_(MonoTime::Now()),
num_live_replicas_(0) {
}
// Compares two repeated HostPortPB fields. Returns true if equal, false otherwise.
static bool HostPortPBsEqual(const google::protobuf::RepeatedPtrField<HostPortPB>& pb1,
const google::protobuf::RepeatedPtrField<HostPortPB>& pb2) {
if (pb1.size() != pb2.size()) {
return false;
}
// Do a set-based equality search.
std::unordered_set<HostPort, HostPortHasher, HostPortEqualityPredicate> hostports1;
std::unordered_set<HostPort, HostPortHasher, HostPortEqualityPredicate> hostports2;
for (int i = 0; i < pb1.size(); i++) {
hostports1.emplace(HostPortFromPB(pb1.Get(i)));
hostports2.emplace(HostPortFromPB(pb2.Get(i)));
}
return hostports1 == hostports2;
}
Status TSDescriptor::Register(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
const boost::optional<std::string>& location,
DnsResolver* dns_resolver) {
std::lock_guard<rw_spinlock> l(lock_);
CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
// TODO(KUDU-418): we don't currently support changing RPC addresses since the
// host/port is stored persistently in each tablet's metadata.
if (registration_ &&
!HostPortPBsEqual(registration_->rpc_addresses(), registration.rpc_addresses())) {
string msg = Substitute(
"Tablet server $0 is attempting to re-register with a different host/port. "
"This is not currently supported. Old: {$1} New: {$2}",
instance.permanent_uuid(),
SecureShortDebugString(*registration_),
SecureShortDebugString(registration));
LOG(ERROR) << msg;
return Status::InvalidArgument(msg);
}
if (registration.rpc_addresses().empty()) {
return Status::InvalidArgument(
"invalid registration: must have at least one RPC address",
SecureShortDebugString(registration));
}
if (instance.instance_seqno() < latest_seqno_) {
return Status::AlreadyPresent(Substitute(
"Cannot register with sequence number $0:"
" Already have a registration from sequence number $1",
instance.instance_seqno(), latest_seqno_));
} else if (instance.instance_seqno() == latest_seqno_) {
// It's possible that the TS registered, but our response back to it
// got lost, so it's trying to register again with the same sequence
// number. That's fine.
LOG(INFO) << "Processing retry of TS registration from "
<< SecureShortDebugString(instance);
}
latest_seqno_ = instance.instance_seqno();
registration_.reset(new ServerRegistrationPB(registration));
ts_admin_proxy_.reset();
consensus_proxy_.reset();
dns_resolver_ = dns_resolver;
location_ = location;
return Status::OK();
}
void TSDescriptor::UpdateHeartbeatTime() {
std::lock_guard<rw_spinlock> l(lock_);
last_heartbeat_ = MonoTime::Now();
}
MonoDelta TSDescriptor::TimeSinceHeartbeat() const {
MonoTime now(MonoTime::Now());
shared_lock<rw_spinlock> l(lock_);
return now - last_heartbeat_;
}
void TSDescriptor::UpdateNeedsFullTabletReport(bool needs_report) {
std::lock_guard<rw_spinlock> l(lock_);
needs_full_report_ = needs_report;
}
bool TSDescriptor::needs_full_report() const {
shared_lock<rw_spinlock> l(lock_);
return needs_full_report_;
}
bool TSDescriptor::PresumedDead() const {
return TimeSinceHeartbeat().ToMilliseconds() >= FLAGS_tserver_unresponsive_timeout_ms;
}
int64_t TSDescriptor::latest_seqno() const {
shared_lock<rw_spinlock> l(lock_);
return latest_seqno_;
}
void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
// In most cases, we won't have any recent replica creations, so
// we don't need to bother calling the clock, etc.
if (recent_replica_creations_ == 0) return;
const double kHalflifeSecs = FLAGS_tserver_last_replica_creations_halflife_ms / 1000;
MonoTime now = MonoTime::Now();
double secs_since_last_decay = (now - last_replica_creations_decay_).ToSeconds();
recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs);
// If sufficiently small, reset down to 0 to take advantage of the fast path above.
if (recent_replica_creations_ < 1e-12) {
recent_replica_creations_ = 0;
}
last_replica_creations_decay_ = now;
}
void TSDescriptor::IncrementRecentReplicaCreations() {
std::lock_guard<rw_spinlock> l(lock_);
DecayRecentReplicaCreationsUnlocked();
recent_replica_creations_ += 1;
}
double TSDescriptor::RecentReplicaCreations() {
// NOTE: not a shared lock because of the "Decay" side effect.
std::lock_guard<rw_spinlock> l(lock_);
DecayRecentReplicaCreationsUnlocked();
return recent_replica_creations_;
}
void TSDescriptor::GetRegistration(ServerRegistrationPB* reg) const {
shared_lock<rw_spinlock> l(lock_);
CHECK(registration_) << "No registration";
CHECK_NOTNULL(reg)->CopyFrom(*registration_);
}
void TSDescriptor::GetTSInfoPB(TSInfoPB* tsinfo_pb) const {
shared_lock<rw_spinlock> l(lock_);
CHECK(registration_);
const auto& reg = *registration_;
tsinfo_pb->mutable_rpc_addresses()->CopyFrom(reg.rpc_addresses());
if (reg.has_unix_domain_socket_path()) {
tsinfo_pb->set_unix_domain_socket_path(reg.unix_domain_socket_path());
}
if (location_) {
tsinfo_pb->set_location(*location_);
}
}
void TSDescriptor::GetNodeInstancePB(NodeInstancePB* instance_pb) const {
shared_lock<rw_spinlock> l(lock_);
instance_pb->set_permanent_uuid(permanent_uuid_);
instance_pb->set_instance_seqno(latest_seqno_);
}
Status TSDescriptor::ResolveSockaddr(Sockaddr* addr, string* host) const {
vector<HostPort> hostports;
{
shared_lock<rw_spinlock> l(lock_);
for (const HostPortPB& addr : registration_->rpc_addresses()) {
hostports.emplace_back(addr.host(), addr.port());
}
}
// Resolve DNS outside the lock.
HostPort last_hostport;
vector<Sockaddr> addrs;
for (const HostPort& hostport : hostports) {
RETURN_NOT_OK(dns_resolver_->ResolveAddresses(hostport, &addrs));
if (!addrs.empty()) {
last_hostport = hostport;
break;
}
}
if (addrs.empty()) {
return Status::NetworkError("Unable to find the TS address: ",
SecureDebugString(*registration_));
}
if (addrs.size() > 1) {
LOG(WARNING) << "TS address " << last_hostport.ToString()
<< " resolves to " << addrs.size() << " different addresses. Using "
<< addrs[0].ToString();
}
*addr = addrs[0];
*host = last_hostport.host();
return Status::OK();
}
Status TSDescriptor::GetTSAdminProxy(const shared_ptr<rpc::Messenger>& messenger,
shared_ptr<tserver::TabletServerAdminServiceProxy>* proxy) {
{
shared_lock<rw_spinlock> l(lock_);
if (ts_admin_proxy_) {
*proxy = ts_admin_proxy_;
return Status::OK();
}
}
Sockaddr addr;
string host;
RETURN_NOT_OK(ResolveSockaddr(&addr, &host));
std::lock_guard<rw_spinlock> l(lock_);
if (!ts_admin_proxy_) {
ts_admin_proxy_.reset(new tserver::TabletServerAdminServiceProxy(
messenger, addr, std::move(host)));
}
*proxy = ts_admin_proxy_;
return Status::OK();
}
Status TSDescriptor::GetConsensusProxy(const shared_ptr<rpc::Messenger>& messenger,
shared_ptr<consensus::ConsensusServiceProxy>* proxy) {
{
shared_lock<rw_spinlock> l(lock_);
if (consensus_proxy_) {
*proxy = consensus_proxy_;
return Status::OK();
}
}
Sockaddr addr;
string host;
RETURN_NOT_OK(ResolveSockaddr(&addr, &host));
std::lock_guard<rw_spinlock> l(lock_);
if (!consensus_proxy_) {
consensus_proxy_.reset(new consensus::ConsensusServiceProxy(
messenger, addr, std::move(host)));
}
*proxy = consensus_proxy_;
return Status::OK();
}
string TSDescriptor::ToString() const {
shared_lock<rw_spinlock> l(lock_);
CHECK(!registration_->rpc_addresses().empty());
const auto& addr = registration_->rpc_addresses(0);
return Substitute("$0 ($1:$2)", permanent_uuid_, addr.host(), addr.port());
}
} // namespace master
} // namespace kudu