blob: 8043bed0cefc31ee5ca6cabbd1dfda548738667d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "statestore/failure-detector.h"
#include <boost/assign.hpp>
#include <boost/thread.hpp>
#include "common/logging.h"
#include "common/names.h"
using boost::get_system_time;
using boost::posix_time::time_duration;
using boost::system_time;
using namespace impala;
static const map<FailureDetector::PeerState, string> PEER_STATE_TO_STRING =
boost::assign::map_list_of
(FailureDetector::OK, "OK")
(FailureDetector::SUSPECTED, "SUSPECTED")
(FailureDetector::UNKNOWN, "UNKNOWN")
(FailureDetector::FAILED, "FAILED");
const string& FailureDetector::PeerStateToString(FailureDetector::PeerState peer_state) {
map<FailureDetector::PeerState, string>::const_iterator it =
PEER_STATE_TO_STRING.find(peer_state);
DCHECK(it != PEER_STATE_TO_STRING.end());
return it->second;
}
FailureDetector::PeerState TimeoutFailureDetector::UpdateHeartbeat(
const string& peer, bool seen) {
{
lock_guard<mutex> l(lock_);
if (seen) peer_heartbeat_record_[peer] = get_system_time();
}
return GetPeerState(peer);
}
FailureDetector::PeerState TimeoutFailureDetector::GetPeerState(const string& peer) {
lock_guard<mutex> l(lock_);
map<string, system_time>::iterator heartbeat_record = peer_heartbeat_record_.find(peer);
if (heartbeat_record == peer_heartbeat_record_.end()) return UNKNOWN;
time_duration duration = get_system_time() - heartbeat_record->second;
if (duration > failure_timeout_) {
return FAILED;
} else if (duration > suspect_timeout_) {
return SUSPECTED;
}
return OK;
}
void TimeoutFailureDetector::EvictPeer(const string& peer) {
lock_guard<mutex> l(lock_);
peer_heartbeat_record_.erase(peer);
}
FailureDetector::PeerState MissedHeartbeatFailureDetector::UpdateHeartbeat(
const string& peer, bool seen) {
{
lock_guard<mutex> l(lock_);
int32_t* missed_heartbeat_count = &missed_heartbeat_counts_[peer];
if (seen) {
if (*missed_heartbeat_count != 0) {
LOG(INFO) << "Heartbeat for '" << peer << "' succeeded after "
<< *missed_heartbeat_count << " missed heartbeats. "
<< "Resetting missed heartbeat count.";
*missed_heartbeat_count = 0;
}
return OK;
} else {
++(*missed_heartbeat_count);
LOG(INFO) << *missed_heartbeat_count << " consecutive heartbeats failed for "
<< "'" << peer << "'. State is "
<< PeerStateToString(ComputePeerState(*missed_heartbeat_count));
}
}
return GetPeerState(peer);
}
FailureDetector::PeerState MissedHeartbeatFailureDetector::GetPeerState(
const string& peer) {
lock_guard<mutex> l(lock_);
auto it = missed_heartbeat_counts_.find(peer);
if (it == missed_heartbeat_counts_.end()) return UNKNOWN;
return ComputePeerState(it->second);
}
FailureDetector::PeerState MissedHeartbeatFailureDetector::ComputePeerState(
int32_t missed_heatbeat_count) {
if (missed_heatbeat_count > max_missed_heartbeats_) {
return FAILED;
} else if (missed_heatbeat_count > suspect_missed_heartbeats_) {
return SUSPECTED;
}
return OK;
}
void MissedHeartbeatFailureDetector::EvictPeer(const string& peer) {
lock_guard<mutex> l(lock_);
missed_heartbeat_counts_.erase(peer);
}