blob: 566115497c6345315ea9b960986b03dec7be61dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/executor-group.h"
namespace impala {
// Hand-testing shows that 25 replicas produces a reasonable balance between nodes
// across the hash ring. See HashRingTest::MaxMinRatio() for some empirical results
// at similar replication levels. There is nothing special about 25 (i.e. 24 or 26
// would be similar). Increasing this results in a more even distribution.
// TODO: This can be tuned further with real world tests
static const uint32_t NUM_HASH_RING_REPLICAS = 25;
ExecutorGroup::ExecutorGroup(string name) : ExecutorGroup(name, 1) {}
ExecutorGroup::ExecutorGroup(string name, int64_t min_size)
: name_(name), min_size_(min_size), executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {
DCHECK_GT(min_size_, 0);
}
ExecutorGroup::ExecutorGroup(const TExecutorGroupDesc& desc)
: ExecutorGroup(desc.name, desc.min_size) {}
const ExecutorGroup::Executors& ExecutorGroup::GetExecutorsForHost(
const IpAddr& ip) const {
ExecutorMap::const_iterator it = executor_map_.find(ip);
DCHECK(it != executor_map_.end());
return it->second;
}
ExecutorGroup::IpAddrs ExecutorGroup::GetAllExecutorIps() const {
IpAddrs ips;
ips.reserve(NumHosts());
for (auto& it: executor_map_) ips.push_back(it.first);
return ips;
}
ExecutorGroup::Executors ExecutorGroup::GetAllExecutorDescriptors() const {
Executors executors;
for (const auto& executor_list: executor_map_) {
executors.insert(executors.end(), executor_list.second.begin(),
executor_list.second.end());
}
return executors;
}
void ExecutorGroup::AddExecutor(const TBackendDescriptor& be_desc) {
// be_desc.is_executor can be false for the local backend when scheduling queries to run
// on the coordinator host.
DCHECK(!be_desc.ip_address.empty());
Executors& be_descs = executor_map_[be_desc.ip_address];
auto eq = [&be_desc](const TBackendDescriptor& existing) {
// The IP addresses must already match, so it is sufficient to check the port.
DCHECK_EQ(existing.ip_address, be_desc.ip_address);
return existing.address.port == be_desc.address.port;
};
if (find_if(be_descs.begin(), be_descs.end(), eq) != be_descs.end()) {
LOG(DFATAL) << "Tried to add existing backend to executor group: "
<< be_desc.krpc_address;
return;
}
if (!CheckConsistencyOrWarn(be_desc)) {
LOG(WARNING) << "Ignoring inconsistent backend for executor group: "
<< be_desc.krpc_address;
return;
}
if (be_descs.empty()) {
executor_ip_hash_ring_.AddNode(be_desc.ip_address);
}
be_descs.push_back(be_desc);
executor_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
}
void ExecutorGroup::RemoveExecutor(const TBackendDescriptor& be_desc) {
auto be_descs_it = executor_map_.find(be_desc.ip_address);
if (be_descs_it == executor_map_.end()) {
LOG(DFATAL) << "Tried to remove a backend from non-existing host: "
<< be_desc.krpc_address;
return;
}
auto eq = [&be_desc](const TBackendDescriptor& existing) {
// The IP addresses must already match, so it is sufficient to check the port.
DCHECK_EQ(existing.ip_address, be_desc.ip_address);
return existing.address.port == be_desc.address.port;
};
Executors& be_descs = be_descs_it->second;
auto remove_it = find_if(be_descs.begin(), be_descs.end(), eq);
if (remove_it == be_descs.end()) {
LOG(DFATAL) << "Tried to remove non-existing backend from per-host list: "
<< be_desc.krpc_address;
return;
}
be_descs.erase(remove_it);
if (be_descs.empty()) {
executor_map_.erase(be_descs_it);
executor_ip_map_.erase(be_desc.address.hostname);
executor_ip_hash_ring_.RemoveNode(be_desc.ip_address);
}
}
bool ExecutorGroup::LookUpExecutorIp(const Hostname& hostname, IpAddr* ip) const {
// Check if hostname is already a valid IP address.
if (executor_map_.find(hostname) != executor_map_.end()) {
if (ip != nullptr) *ip = hostname;
return true;
}
auto it = executor_ip_map_.find(hostname);
if (it != executor_ip_map_.end()) {
if (ip != nullptr) *ip = it->second;
return true;
}
return false;
}
const TBackendDescriptor* ExecutorGroup::LookUpBackendDesc(
const TNetworkAddress& host) const {
IpAddr ip;
if (LookUpExecutorIp(host.hostname, &ip)) {
const ExecutorGroup::Executors& be_list = GetExecutorsForHost(ip);
for (const TBackendDescriptor& desc : be_list) {
if (desc.address == host) return &desc;
}
}
return nullptr;
}
int ExecutorGroup::NumExecutors() const {
int count = 0;
for (const auto& executor_list : executor_map_) count += executor_list.second.size();
return count;
}
bool ExecutorGroup::IsHealthy() const {
int num_executors = NumExecutors();
if (num_executors < min_size_) {
LOG(WARNING) << "Executor group " << name_ << " is unhealthy: " << num_executors
<< " out of " << min_size_ << " are available.";
return false;
}
return true;
}
bool ExecutorGroup::CheckConsistencyOrWarn(const TBackendDescriptor& be_desc) const {
// Check if the executor's group configuration matches this group.
for (const TExecutorGroupDesc& desc : be_desc.executor_groups) {
if (desc.name == name_) {
if (desc.min_size == min_size_) {
return true;
} else {
LOG(WARNING) << "Backend " << be_desc << " is configured for executor group "
<< desc << " but group has minimum size " << min_size_;
return false;
}
}
}
// If the backend does not mention the group we consider it consistent to allow backends
// to be added to unrelated groups, e.g. for the coordinator-only scheuduling.
return true;
}
} // end ns impala