blob: eaa79288b57bd702f893d843b60b7be3c110c6d1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/executor-blacklist.h"
#include <algorithm>
#include "statestore/statestore.h"
#include "util/time.h"
DEFINE_bool(blacklisting_enabled, true,
"(Advanced) If false, disables local blacklisting of executors by coordinators, "
"which temporarily removes executors that appear to be problematic from scheduling "
"decisions.");
namespace impala {
const int32_t ExecutorBlacklist::PROBATION_TIMEOUT_MULTIPLIER = 5;
const float ExecutorBlacklist::BLACKLIST_TIMEOUT_PADDING = 1.2;
bool ExecutorBlacklist::BlacklistingEnabled() {
return FLAGS_blacklisting_enabled;
}
void ExecutorBlacklist::Blacklist(
const BackendDescriptorPB& be_desc, const Status& cause) {
DCHECK(BlacklistingEnabled());
DCHECK(be_desc.has_backend_id());
auto entry_it = executor_list_.find(be_desc.backend_id());
if (entry_it != executor_list_.end()) {
// If this executor was already blacklisted, it must be that two different queries
// tried to blacklist it at about the same time, so just leave it as is.
Entry& entry = entry_it->second;
if (entry.state == State::ON_PROBATION) {
// This executor was on probation, so re-blacklist it.
int64_t probation_timeout = GetBlacklistTimeoutMs() * PROBATION_TIMEOUT_MULTIPLIER;
int64_t current_time_ms = MonotonicMillis();
int64_t elapsed = current_time_ms - entry.blacklist_time_ms;
entry.state = State::BLACKLISTED;
entry.blacklist_time_ms = current_time_ms;
entry.cause = cause;
// Since NeedsMaintenance() doesn't consider executors that can be removed from
// probation, executors may stay on probation much longer than the timeout, so check
// the timeout here.
if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
// This executor should have already been taken off probation, so act like it was.
entry.num_consecutive_blacklistings = 1;
} else {
++entry.num_consecutive_blacklistings;
}
}
} else {
// This executor was not already on the list, create a new Entry for it.
executor_list_.insert(
make_pair(be_desc.backend_id(), Entry(be_desc, MonotonicMillis(), cause)));
}
VLOG(2) << "Blacklisted " << be_desc.address() << ", current blacklist: "
<< DebugString();
}
ExecutorBlacklist::State ExecutorBlacklist::FindAndRemove(
const BackendDescriptorPB& be_desc) {
auto remove_it = executor_list_.find(be_desc.backend_id());
if (remove_it == executor_list_.end()) {
// Executor wasn't on the blacklist.
return NOT_BLACKLISTED;
}
State removed_state = remove_it->second.state;
executor_list_.erase(remove_it);
return removed_state;
}
bool ExecutorBlacklist::NeedsMaintenance() const {
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t now = MonotonicMillis();
for (auto entry_it : executor_list_) {
const Entry& entry = entry_it.second;
if (entry.state == State::BLACKLISTED) {
int64_t elapsed = now - entry.blacklist_time_ms;
if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
// This backend has passed the timeout and can be put on probation.
return true;
}
}
}
return false;
}
void ExecutorBlacklist::Maintenance(std::list<BackendDescriptorPB>* probation_list) {
int64_t blacklist_timeout = GetBlacklistTimeoutMs();
int64_t probation_timeout = blacklist_timeout * PROBATION_TIMEOUT_MULTIPLIER;
int64_t now = MonotonicMillis();
auto entry_it = executor_list_.begin();
while (entry_it != executor_list_.end()) {
Entry& entry = entry_it->second;
int64_t elapsed = now - entry.blacklist_time_ms;
if (entry.state == State::BLACKLISTED) {
// Check if we can take it off the blacklist and put it on probation.
if (elapsed > blacklist_timeout * entry.num_consecutive_blacklistings) {
LOG(INFO) << "Executor " << entry.be_desc.address()
<< " passed the timeout and will be taken off the blacklist.";
probation_list->push_back(entry.be_desc);
entry.state = State::ON_PROBATION;
}
++entry_it;
} else {
// Check if we can take it off probation.
if (elapsed > probation_timeout * entry.num_consecutive_blacklistings) {
entry_it = executor_list_.erase(entry_it);
} else {
++entry_it;
}
}
}
VLOG(2) << "Completed blacklist maintenance. Current blacklist: " << DebugString();
}
bool ExecutorBlacklist::IsBlacklisted(
const BackendDescriptorPB& be_desc, Status* cause, int64_t* time_remaining_ms) const {
auto entry_it = executor_list_.find(be_desc.backend_id());
if (entry_it != executor_list_.end()) {
const Entry& entry = entry_it->second;
if (entry.state == State::BLACKLISTED) {
if (cause != nullptr) *cause = entry.cause;
if (time_remaining_ms != nullptr) {
int64_t elapsed_ms = MonotonicMillis() - entry.blacklist_time_ms;
int64_t total_timeout_ms =
GetBlacklistTimeoutMs() * entry.num_consecutive_blacklistings;
*time_remaining_ms = total_timeout_ms - elapsed_ms;
}
return true;
}
}
return false;
}
std::string ExecutorBlacklist::BlacklistToString() const {
std::stringstream ss;
for (auto entry_it : executor_list_) {
const Entry& entry = entry_it.second;
if (entry.state == State::BLACKLISTED) ss << entry.be_desc.address() << " ";
}
return ss.str();
}
std::string ExecutorBlacklist::DebugString() const {
std::stringstream ss;
ss << "ExecutorBlacklist[";
for (auto entry_it : executor_list_) {
const Entry& entry = entry_it.second;
DCHECK(entry.state == BLACKLISTED || entry.state == ON_PROBATION);
ss << entry.be_desc.address() << " ("
<< (entry.state == BLACKLISTED ? "blacklisted" : "on probation") << ") ";
}
ss << "]";
return ss.str();
}
int64_t ExecutorBlacklist::GetBlacklistTimeoutMs() const {
// We add some padding to the timeout to account for possible small, random delays in
// statestore failure detection and to minimize the chance that an executor goes down,
// is blacklisted, and then is put on probation and causes another failure right before
// being removed from the cluster membership by a statestore update.
return BLACKLIST_TIMEOUT_PADDING * Statestore::FailedExecutorDetectionTimeMs();
}
} // namespace impala