blob: 58035d5a8899eaac4924550b7268797ddd4fde5f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/executor-blacklist.h"
#include <algorithm>
#include "statestore/statestore.h"
#include "util/time.h"
DEFINE_bool(blacklisting_enabled, true,
"(Advanced) If false, disables local blacklisting of executors by coordinators, "
"which temporarily removes executors that appear to be problematic from scheduling "
"decisions.");
namespace impala {
const int32_t ExecutorBlacklist::PROBATION_TIMEOUT_MULTIPLIER = 5;
const float ExecutorBlacklist::BLACKLIST_TIMEOUT_PADDING = 1.2;
bool ExecutorBlacklist::BlacklistingEnabled() {
return FLAGS_blacklisting_enabled;
}
void ExecutorBlacklist::Blacklist(const TBackendDescriptor& be_desc) {
DCHECK(BlacklistingEnabled());
DCHECK(!be_desc.ip_address.empty());
vector<Entry>& be_descs = executor_list_[be_desc.ip_address];
auto it = find_if(be_descs.begin(), be_descs.end(),
std::bind(eqBePort, be_desc, std::placeholders::_1));
if (it != be_descs.end()) {
// If this executor was already blacklisted, it must be that two different queries
// tried to blacklist it at about the same time, so just leave it as is.
if (it->state == State::ON_PROBATION) {
// This executor was on probation, so re-blacklist it.
it->state = State::BLACKLISTED;
it->blacklist_time_ms = MonotonicMillis();
int64_t probation_timeout = GetBlacklistTimeoutMs() * PROBATION_TIMEOUT_MULTIPLIER;
int64_t elapsed = MonotonicMillis() - it->blacklist_time_ms;
// Since NeedsMaintenance() doesn't consider executors that can be removed from
// probation, executors may stay on probation much longer than the timeout, so check
// the timeout here.
if (elapsed > probation_timeout * it->num_consecutive_blacklistings) {
// This executor should have already been taken off probation, so act like it was.
it->num_consecutive_blacklistings = 1;
} else {
++it->num_consecutive_blacklistings;
}
}
} else {
// This executor was not already on the list, create a new Entry for it.
be_descs.emplace_back(be_desc, MonotonicMillis());
}
VLOG(2) << "Blacklisted " << TNetworkAddressToString(be_desc.address)
<< ", current blacklist: " << DebugString();
}
ExecutorBlacklist::State ExecutorBlacklist::FindAndRemove(
const TBackendDescriptor& be_desc) {
auto be_descs_it = executor_list_.find(be_desc.ip_address);
if (be_descs_it == executor_list_.end()) {
// Executor wasn't on the blacklist.
return NOT_BLACKLISTED;
}
vector<Entry>& be_descs = be_descs_it->second;
auto remove_it = find_if(be_descs.begin(), be_descs.end(),
std::bind(eqBePort, be_desc, std::placeholders::_1));
if (remove_it == be_descs.end()) {
// Executor wasn't on the blacklist.
return NOT_BLACKLISTED;
}
State removed_state = remove_it->state;
be_descs.erase(remove_it);
if (be_descs.empty()) {
executor_list_.erase(be_descs_it);
}
return removed_state;
}
bool ExecutorBlacklist::NeedsMaintenance() const {
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t now = MonotonicMillis();
for (auto executor_it : executor_list_) {
for (auto entry_it : executor_it.second) {
if (entry_it.state == State::BLACKLISTED) {
int64_t elapsed = now - entry_it.blacklist_time_ms;
if (elapsed > blacklist_timeout * entry_it.num_consecutive_blacklistings) {
// This backend has passed the timeout and can be put on probation.
return true;
}
}
}
}
return false;
}
void ExecutorBlacklist::Maintenance(std::list<TBackendDescriptor>* probation_list) {
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t probation_timeout = blacklist_timeout * PROBATION_TIMEOUT_MULTIPLIER;
int64_t now = MonotonicMillis();
auto executor_it = executor_list_.begin();
while (executor_it != executor_list_.end()) {
auto entry_it = executor_it->second.begin();
while (entry_it != executor_it->second.end()) {
int64_t elapsed = now - entry_it->blacklist_time_ms;
if (entry_it->state == State::BLACKLISTED) {
// Check if we can take it off the blacklist and put it on probation.
if (elapsed > blacklist_timeout * entry_it->num_consecutive_blacklistings) {
LOG(INFO) << "Executor " << TNetworkAddressToString(entry_it->be_desc.address)
<< " passed the timeout and will be taken off the blacklist.";
probation_list->push_back(entry_it->be_desc);
entry_it->state = State::ON_PROBATION;
}
++entry_it;
} else {
// Check if we can take it off probation.
if (elapsed > probation_timeout * entry_it->num_consecutive_blacklistings) {
entry_it = executor_it->second.erase(entry_it);
} else {
++entry_it;
}
}
}
if (executor_it->second.empty()) {
executor_it = executor_list_.erase(executor_it);
} else {
++executor_it;
}
}
VLOG(2) << "Completed blacklist maintenance. Current blacklist: " << DebugString();
}
bool ExecutorBlacklist::IsBlacklisted(const TBackendDescriptor& be_desc) const {
for (auto executor_it : executor_list_) {
if (executor_it.first == be_desc.ip_address) {
for (auto entry_it : executor_it.second) {
if (entry_it.be_desc.address.port == be_desc.address.port
&& entry_it.state == State::BLACKLISTED) {
return true;
}
}
}
}
return false;
}
std::string ExecutorBlacklist::BlacklistToString() const {
std::stringstream ss;
for (auto executor_it : executor_list_) {
for (auto entry_it : executor_it.second) {
if (entry_it.state == State::BLACKLISTED) {
ss << TNetworkAddressToString(entry_it.be_desc.address) << " ";
}
}
}
return ss.str();
}
std::string ExecutorBlacklist::DebugString() const {
std::stringstream ss;
ss << "ExecutorBlacklist[";
for (auto executor_it : executor_list_) {
for (auto entry_it : executor_it.second) {
ss << TNetworkAddressToString(entry_it.be_desc.address) << " ("
<< (entry_it.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
}
}
ss << "]";
return ss.str();
}
int64_t ExecutorBlacklist::GetBlacklistTimeoutMs() const {
// We add some padding to the timeout to account for possible small, random delays in
// statestore failure detection and to minimize the chance that an executor goes down,
// is blacklisted, and then is put on probation and causes another failure right before
// being removed from the cluster membership by a statestore update.
return BLACKLIST_TIMEOUT_PADDING * Statestore::FailedExecutorDetectionTimeMs();
}
bool ExecutorBlacklist::eqBePort(
const TBackendDescriptor& be_desc, const Entry& existing) {
// The IP addresses must already match, so it is sufficient to check the port.
DCHECK_EQ(existing.be_desc.ip_address, be_desc.ip_address);
return existing.be_desc.address.port == be_desc.address.port;
}
} // namespace impala