blob: 37bbed5c944309eca3a5bd02cec29ae79e9f61af [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <map>
#include <random>
#include "scheduling/hash-ring.h"
#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
#include "util/container-util.h"
#include "util/hash-util.h"
#include "util/network-util.h"
#include "gen-cpp/Types_types.h"
#include "common/names.h"
namespace impala {
HashRing::HashRing(const HashRing& hash_ring)
: nodes_(hash_ring.nodes_), num_replicas_(hash_ring.num_replicas_) {
for (const auto& old_pair : hash_ring.hash_to_node_) {
uint32_t hash_value = old_pair.first;
NodeIterator old_node_it = old_pair.second;
// Look up the equivalent node iterator in the new nodes_ set.
NodeIterator new_node_it = nodes_.find(*old_node_it);
DCHECK(new_node_it != nodes_.end());
hash_to_node_.emplace(hash_value, new_node_it);
}
}
void HashRing::AddNode(const IpAddr& node) {
// This node should not already be in the set.
std::pair<NodeIterator, bool> node_pair = nodes_.insert(node);
// 'second' tells whether a new element was inserted. It must be true.
DCHECK(node_pair.second) << "Failed to add: " << node;
NodeIterator node_it = node_pair.first;
// Generate multiple hashes of the IpAddr by using the hash as a seed to a PRNG.
uint32_t hash = HashUtil::Hash(node.data(), node.length(), 0);
pcg32 prng(hash);
for (uint32_t i = 0; i < num_replicas_; i++) {
uint32_t hash_val = prng();
// Check for hash collision
auto hashmap_it = hash_to_node_.find(hash_val);
// Write this new value if:
// 1. There is no hash collision.
// -OR-
// 2. There is a hash collision, but the underlying IpAddr is less than
// the current value.
// This guarantees consistency regardless of the order of adds and removes.
if (hashmap_it == hash_to_node_.end() || *node_it < *hashmap_it->second) {
hash_to_node_[hash_val] = node_it;
}
}
}
void HashRing::RemoveNode(const IpAddr& node) {
// This node must be in the set. Keep the iterator to erase it later.
NodeIterator node_it = nodes_.find(node);
DCHECK(node_it != nodes_.end());
// Walk the map and remove any entries that have a NodeIterator pointing to this node.
size_t num_removed = 0;
auto hash_to_node_it = hash_to_node_.begin();
while (hash_to_node_it != hash_to_node_.end()) {
if (hash_to_node_it->second == node_it) {
hash_to_node_it = hash_to_node_.erase(hash_to_node_it);
num_removed++;
} else {
hash_to_node_it++;
}
}
DCHECK_GT(num_removed, 0);
nodes_.erase(node_it);
}
const IpAddr* HashRing::GetNode(uint32_t hash_value) const {
if (hash_to_node_.empty()) return nullptr;
// Find the element that immediately follows this hash value
auto next_elem = hash_to_node_.lower_bound(hash_value);
if (next_elem == hash_to_node_.end()) {
// This is larger than the largest elem. Return the smallest elem
next_elem = hash_to_node_.begin();
}
NodeIterator node_it = next_elem->second;
return &(*node_it);
}
void HashRing::GetDistributionMap(
map<IpAddr, uint64_t>* distribution_map) const {
// Start at zero and add up the ranges for each distinct node by walking the map.
// There are UINT_MAX + 1 total values. GetNode() uses map::lower_bound, which is
// inclusive of the value (i.e. GetNode(5) would match a hash value of 5).
// This means all of ranges are off by one compared to typical indexes. This is
// irrelevent for the internal ranges, but the (end - start) calculation is wrong for
// the first range. It needs to be one longer. This is incorporated into the final
// section.
uint32_t last_index = 0;
for (const auto& hash_to_node_pair : hash_to_node_) {
const IpAddr& addr = *(hash_to_node_pair.second);
uint32_t end_index = hash_to_node_pair.first;
uint32_t range = end_index - last_index;
(*distribution_map)[addr] += range;
last_index = end_index;
}
// Any room from last_index to UINT_MAX goes to the min element
uint64_t range = UINT_MAX - last_index;
// Incorporate the missing +1 from the first range
range++;
const IpAddr& addr = *(hash_to_node_.begin()->second);
(*distribution_map)[addr] += range;
}
}