blob: d0783ceaeebe6941801f3df6992b1b7b8aa26e35 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/heartbeater.h"
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <memory>
#include <vector>
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.h"
#include "kudu/master/master_rpc.h"
#include "kudu/master/master.proxy.h"
#include "kudu/server/webserver.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/thread.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
DEFINE_int32(heartbeat_rpc_timeout_ms, 15000,
"Timeout used for the TS->Master heartbeat RPCs.");
TAG_FLAG(heartbeat_rpc_timeout_ms, advanced);
DEFINE_int32(heartbeat_interval_ms, 1000,
"Interval at which the TS heartbeats to the master.");
TAG_FLAG(heartbeat_interval_ms, advanced);
DEFINE_int32(heartbeat_max_failures_before_backoff, 3,
"Maximum number of consecutive heartbeat failures until the "
"Tablet Server backs off to the normal heartbeat interval, "
"rather than retrying.");
TAG_FLAG(heartbeat_max_failures_before_backoff, advanced);
using google::protobuf::RepeatedPtrField;
using kudu::HostPortPB;
using kudu::consensus::RaftPeerPB;
using kudu::master::GetLeaderMasterRpc;
using kudu::master::ListMastersResponsePB;
using kudu::master::Master;
using kudu::master::MasterServiceProxy;
using kudu::rpc::RpcController;
using std::shared_ptr;
using strings::Substitute;
namespace kudu {
namespace tserver {
namespace {
// Creates a proxy to 'hostport'.
Status MasterServiceProxyForHostPort(const HostPort& hostport,
const shared_ptr<rpc::Messenger>& messenger,
gscoped_ptr<MasterServiceProxy>* proxy) {
vector<Sockaddr> addrs;
RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
if (addrs.size() > 1) {
LOG(WARNING) << "Master address '" << hostport.ToString() << "' "
<< "resolves to " << addrs.size() << " different addresses. Using "
<< addrs[0].ToString();
}
proxy->reset(new MasterServiceProxy(messenger, addrs[0]));
return Status::OK();
}
} // anonymous namespace
// Most of the actual logic of the heartbeater is inside this inner class,
// to avoid having too many dependencies from the header itself.
//
// This is basically the "PIMPL" pattern.
class Heartbeater::Thread {
public:
Thread(const TabletServerOptions& opts, TabletServer* server);
Status Start();
Status Stop();
void TriggerASAP();
private:
void RunThread();
Status FindLeaderMaster(const MonoTime& deadline,
HostPort* leader_hostport);
Status ConnectToMaster();
int GetMinimumHeartbeatMillis() const;
int GetMillisUntilNextHeartbeat() const;
Status DoHeartbeat();
Status SetupRegistration(master::TSRegistrationPB* reg);
void SetupCommonField(master::TSToMasterCommonPB* common);
bool IsCurrentThread() const;
// The hosts/ports of masters that we may heartbeat to.
//
// We keep the HostPort around rather than a Sockaddr because the
// masters may change IP addresses, and we'd like to re-resolve on
// every new attempt at connecting.
vector<HostPort> master_addrs_;
// Index of the master we last succesfully obtained the master
// consensus configuration information from.
int last_locate_master_idx_;
// The server for which we are heartbeating.
TabletServer* const server_;
// The actual running thread (NULL before it is started)
scoped_refptr<kudu::Thread> thread_;
// Host and port of the most recent leader master.
HostPort leader_master_hostport_;
// Current RPC proxy to the leader master.
gscoped_ptr<master::MasterServiceProxy> proxy_;
// The most recent response from a heartbeat.
master::TSHeartbeatResponsePB last_hb_response_;
// The number of heartbeats which have failed in a row.
// This is tracked so as to back-off heartbeating.
int consecutive_failed_heartbeats_;
// Mutex/condition pair to trigger the heartbeater thread
// to either heartbeat early or exit.
Mutex mutex_;
ConditionVariable cond_;
// Protected by mutex_.
bool should_run_;
bool heartbeat_asap_;
DISALLOW_COPY_AND_ASSIGN(Thread);
};
////////////////////////////////////////////////////////////
// Heartbeater
////////////////////////////////////////////////////////////
Heartbeater::Heartbeater(const TabletServerOptions& opts, TabletServer* server)
: thread_(new Thread(opts, server)) {
}
Heartbeater::~Heartbeater() {
WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
}
Status Heartbeater::Start() { return thread_->Start(); }
Status Heartbeater::Stop() { return thread_->Stop(); }
void Heartbeater::TriggerASAP() { thread_->TriggerASAP(); }
////////////////////////////////////////////////////////////
// Heartbeater::Thread
////////////////////////////////////////////////////////////
Heartbeater::Thread::Thread(const TabletServerOptions& opts, TabletServer* server)
: master_addrs_(opts.master_addresses),
last_locate_master_idx_(0),
server_(server),
consecutive_failed_heartbeats_(0),
cond_(&mutex_),
should_run_(false),
heartbeat_asap_(true) {
CHECK(!master_addrs_.empty());
}
namespace {
void LeaderMasterCallback(HostPort* dst_hostport,
Synchronizer* sync,
const Status& status,
const HostPort& result) {
if (status.ok()) {
*dst_hostport = result;
}
sync->StatusCB(status);
}
} // anonymous namespace
Status Heartbeater::Thread::FindLeaderMaster(const MonoTime& deadline,
HostPort* leader_hostport) {
Status s = Status::OK();
if (master_addrs_.size() == 1) {
// "Shortcut" the process when a single master is specified.
*leader_hostport = master_addrs_[0];
return Status::OK();
}
vector<Sockaddr> master_sock_addrs;
for (const HostPort& master_addr : master_addrs_) {
vector<Sockaddr> addrs;
Status s = master_addr.ResolveAddresses(&addrs);
if (!s.ok()) {
LOG(WARNING) << "Unable to resolve address '" << master_addr.ToString()
<< "': " << s.ToString();
continue;
}
if (addrs.size() > 1) {
LOG(WARNING) << "Master address '" << master_addr.ToString() << "' "
<< "resolves to " << addrs.size() << " different addresses. Using "
<< addrs[0].ToString();
}
master_sock_addrs.push_back(addrs[0]);
}
if (master_sock_addrs.empty()) {
return Status::NotFound("unable to resolve any of the master addresses!");
}
Synchronizer sync;
scoped_refptr<GetLeaderMasterRpc> rpc(new GetLeaderMasterRpc(
Bind(&LeaderMasterCallback,
leader_hostport,
&sync),
master_sock_addrs,
deadline,
server_->messenger()));
rpc->SendRpc();
return sync.Wait();
}
Status Heartbeater::Thread::ConnectToMaster() {
vector<Sockaddr> addrs;
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
// TODO send heartbeats without tablet reports to non-leader masters.
RETURN_NOT_OK(FindLeaderMaster(deadline, &leader_master_hostport_));
gscoped_ptr<MasterServiceProxy> new_proxy;
MasterServiceProxyForHostPort(leader_master_hostport_,
server_->messenger(),
&new_proxy);
RETURN_NOT_OK(leader_master_hostport_.ResolveAddresses(&addrs));
// Ping the master to verify that it's alive.
master::PingRequestPB req;
master::PingResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
RETURN_NOT_OK_PREPEND(new_proxy->Ping(req, &resp, &rpc),
Substitute("Failed to ping master at $0", addrs[0].ToString()));
LOG(INFO) << "Connected to a leader master server at " << leader_master_hostport_.ToString();
proxy_.reset(new_proxy.release());
return Status::OK();
}
void Heartbeater::Thread::SetupCommonField(master::TSToMasterCommonPB* common) {
common->mutable_ts_instance()->CopyFrom(server_->instance_pb());
}
Status Heartbeater::Thread::SetupRegistration(master::TSRegistrationPB* reg) {
reg->Clear();
vector<Sockaddr> addrs;
RETURN_NOT_OK(CHECK_NOTNULL(server_->rpc_server())->GetBoundAddresses(&addrs));
RETURN_NOT_OK_PREPEND(AddHostPortPBs(addrs, reg->mutable_rpc_addresses()),
"Failed to add RPC addresses to registration");
addrs.clear();
RETURN_NOT_OK_PREPEND(CHECK_NOTNULL(server_->web_server())->GetBoundAddresses(&addrs),
"Unable to get bound HTTP addresses");
RETURN_NOT_OK_PREPEND(AddHostPortPBs(addrs, reg->mutable_http_addresses()),
"Failed to add HTTP addresses to registration");
return Status::OK();
}
int Heartbeater::Thread::GetMinimumHeartbeatMillis() const {
// If we've failed a few heartbeats in a row, back off to the normal
// interval, rather than retrying in a loop.
if (consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
LOG(WARNING) << "Failed " << consecutive_failed_heartbeats_ <<" heartbeats "
<< "in a row: no longer allowing fast heartbeat attempts.";
}
return consecutive_failed_heartbeats_ > FLAGS_heartbeat_max_failures_before_backoff ?
FLAGS_heartbeat_interval_ms : 0;
}
int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
// If the master needs something from us, we should immediately
// send another heartbeat with that info, rather than waiting for the interval.
if (last_hb_response_.needs_reregister() ||
last_hb_response_.needs_full_tablet_report()) {
return GetMinimumHeartbeatMillis();
}
return FLAGS_heartbeat_interval_ms;
}
Status Heartbeater::Thread::DoHeartbeat() {
if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
return Status::IOError("failing all heartbeats for tests");
}
CHECK(IsCurrentThread());
if (!proxy_) {
VLOG(1) << "No valid master proxy. Connecting...";
RETURN_NOT_OK(ConnectToMaster());
DCHECK(proxy_);
}
master::TSHeartbeatRequestPB req;
SetupCommonField(req.mutable_common());
if (last_hb_response_.needs_reregister()) {
LOG(INFO) << "Registering TS with master...";
RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
"Unable to set up registration");
}
if (last_hb_response_.needs_full_tablet_report()) {
LOG(INFO) << "Sending a full tablet report to master...";
server_->tablet_manager()->GenerateFullTabletReport(
req.mutable_tablet_report());
} else {
VLOG(2) << "Sending an incremental tablet report to master...";
server_->tablet_manager()->GenerateIncrementalTabletReport(
req.mutable_tablet_report());
}
req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(10));
VLOG(2) << "Sending heartbeat:\n" << req.DebugString();
master::TSHeartbeatResponsePB resp;
RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc),
"Failed to send heartbeat");
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
VLOG(2) << "Received heartbeat response:\n" << resp.DebugString();
if (!resp.leader_master()) {
// If the master is no longer a leader, reset proxy so that we can
// determine the master and attempt to heartbeat during in the
// next heartbeat interval.
proxy_.reset();
return Status::ServiceUnavailable("master is no longer the leader");
}
last_hb_response_.Swap(&resp);
// TODO: Handle TSHeartbeatResponsePB (e.g. deleted tablets and schema changes)
server_->tablet_manager()->MarkTabletReportAcknowledged(req.tablet_report());
return Status::OK();
}
void Heartbeater::Thread::RunThread() {
CHECK(IsCurrentThread());
VLOG(1) << "Heartbeat thread starting";
// Set up a fake "last heartbeat response" which indicates that we
// need to register -- since we've never registered before, we know
// this to be true. This avoids an extra
// heartbeat/response/heartbeat cycle.
last_hb_response_.set_needs_reregister(true);
last_hb_response_.set_needs_full_tablet_report(true);
while (true) {
MonoTime next_heartbeat = MonoTime::Now(MonoTime::FINE);
next_heartbeat.AddDelta(MonoDelta::FromMilliseconds(GetMillisUntilNextHeartbeat()));
// Wait for either the heartbeat interval to elapse, or for an "ASAP" heartbeat,
// or for the signal to shut down.
{
MutexLock l(mutex_);
while (true) {
MonoDelta remaining = next_heartbeat.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
if (remaining.ToMilliseconds() <= 0 ||
heartbeat_asap_ ||
!should_run_) {
break;
}
cond_.TimedWait(remaining);
}
heartbeat_asap_ = false;
if (!should_run_) {
VLOG(1) << "Heartbeat thread finished";
return;
}
}
Status s = DoHeartbeat();
if (!s.ok()) {
LOG(WARNING) << "Failed to heartbeat to " << leader_master_hostport_.ToString()
<< ": " << s.ToString();
consecutive_failed_heartbeats_++;
if (master_addrs_.size() > 1) {
// If we encountered a network error (e.g., connection
// refused) and there's more than one master available, try
// determining the leader master again.
if (s.IsNetworkError() ||
consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
proxy_.reset();
}
}
continue;
}
consecutive_failed_heartbeats_ = 0;
}
}
bool Heartbeater::Thread::IsCurrentThread() const {
return thread_.get() == kudu::Thread::current_thread();
}
Status Heartbeater::Thread::Start() {
CHECK(thread_ == nullptr);
should_run_ = true;
return kudu::Thread::Create("heartbeater", "heartbeat",
&Heartbeater::Thread::RunThread, this, &thread_);
}
Status Heartbeater::Thread::Stop() {
if (!thread_) {
return Status::OK();
}
{
MutexLock l(mutex_);
should_run_ = false;
cond_.Signal();
}
RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
thread_ = nullptr;
return Status::OK();
}
void Heartbeater::Thread::TriggerASAP() {
MutexLock l(mutex_);
heartbeat_asap_ = true;
cond_.Signal();
}
} // namespace tserver
} // namespace kudu