blob: eb8c69608feb4d59479b759864b33fe28150c4d4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <map>
#include <math.h>
#include <stdio.h>
#include <iostream>
#include "scheduling/hash-ring.h"
#include "testutil/gtest-util.h"
#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
#include "util/network-util.h"
#include "gen-cpp/Types_types.h"
#include "common/names.h"
namespace impala {
class HashRingTest : public ::testing::Test {
void GetDistributionMap(const HashRing& hash_ring,
std::map<IpAddr, uint64_t>* distribution_map) {
return hash_ring.GetDistributionMap(distribution_map);
// Verify the specified hash ring has the appropriate number of nodes and replicas.
// This assumes no collisions. If there are collisions, the total replicas will be
// smaller than expected.
void VerifyCounts(const HashRing& hash_ring, uint32_t num_nodes) {
EXPECT_EQ(hash_ring.GetNumNodes(), num_nodes);
EXPECT_EQ(hash_ring.GetTotalReplicas(), num_nodes * hash_ring.GetNumReplicas());
// Verify that the allocations that GetDistributionMap() returns for each node
// sum to the size of the hash space (UINT_MAX + 1).
void VerifyTotalAllocation(const vector<IpAddr>& addresses,
const uint32_t replication) {
HashRing hash_ring(replication);
for (const IpAddr& addr : addresses) hash_ring.AddNode(addr);
std::map<IpAddr, uint64_t> dist_map;
uint64_t total_allocation = 0;
uint64_t total_uint32_range = static_cast<uint64_t>(UINT_MAX) + 1;
for (auto dist_map_it : dist_map) {
total_allocation += dist_map_it.second;
EXPECT_EQ(total_allocation, total_uint32_range);
// Verify that ratio of the maximum allocation to the minimum allocation is below
// the 'expected_ratio_limit'.
void VerifyMaxMinRatio(const vector<IpAddr>& addresses,
const uint32_t replication, double expected_ratio_limit) {
HashRing hash_ring(replication);
for (const IpAddr& addr : addresses) hash_ring.AddNode(addr);
std::map<IpAddr, uint64_t> dist_map;
uint64_t min = static_cast<uint64_t>(UINT_MAX) + 1;
uint64_t max = 0;
for (auto it = dist_map.begin(); it != dist_map.end(); it++) {
uint64_t range = it->second;
if (range < min) min = range;
if (range > max) max = range;
double max_min_ratio = (double) max / (double) min;
LOG(INFO) << "VerifyMaxMinRatio: num_nodes: " << addresses.size()
<< " replication: " << replication << " max/min ratio: " << max_min_ratio
<< " expected limit: " << expected_ratio_limit;
EXPECT_LT(max_min_ratio, expected_ratio_limit);
void GetBasicNetworkAddresses(vector<IpAddr>& addresses) {
// Collection of three addresses
void GetMultipleNetworkAddresses(string basename, uint32_t num_nodes,
vector<IpAddr>& addresses) {
for (uint32_t i = 0; i < num_nodes; i++) {
addresses.push_back(basename + std::to_string(i));
TEST_F(HashRingTest, BasicAddRemove) {
vector<IpAddr> basic_addresses;
const uint32_t replication = 10;
HashRing h(replication);
VerifyCounts(h, 0);
int total_addresses = basic_addresses.size();
for (int i = 0; i < total_addresses; i++) {
VerifyCounts(h, i + 1);
// Remove the elements in a different order
std::random_shuffle(basic_addresses.begin(), basic_addresses.end());
for (int i = 0; i < total_addresses; i++) {
VerifyCounts(h, total_addresses - (i + 1));
TEST_F(HashRingTest, GetNode) {
vector<IpAddr> basic_addresses;
const uint32_t replication = 10;
HashRing h(replication);
// Test null return when empty
EXPECT_EQ(h.GetNode(0), nullptr);
for (const IpAddr& addr : basic_addresses) h.AddNode(addr);
// Test wraparound
// (Note that this assumes that nothing mapped to exactly UINT_MAX.)
const IpAddr* first_node = h.GetNode(0);
const IpAddr* wrapped_node = h.GetNode(UINT_MAX);
EXPECT_EQ(first_node, wrapped_node);
// Pick 100 random addresses and verify that they are all non-null and exist in
// the input addresses. This is not deterministic.
pcg32 prng(time(nullptr));
for (int i = 0; i < 100; i++) {
const IpAddr* getnode_output = h.GetNode(prng());
DCHECK(getnode_output != nullptr);
auto addr_it =
std::find(basic_addresses.begin(), basic_addresses.end(), *getnode_output);
DCHECK(addr_it != basic_addresses.end());
TEST_F(HashRingTest, CopyConstructor) {
vector<IpAddr> basic_addresses;
// Make a basic HashRing with a few elements
const uint32_t replication = 10;
unique_ptr<HashRing> old_h(new HashRing(replication));
for (const IpAddr& addr : basic_addresses) old_h->AddNode(addr);
std::map<IpAddr, uint64_t> old_dist_map;
GetDistributionMap(*old_h, &old_dist_map);
// Use copy constructor
HashRing new_h(*old_h);
// Destroy old HashRing
// Verify that the new HashRing is the same
VerifyCounts(new_h, basic_addresses.size());
std::map<IpAddr, uint64_t> new_dist_map;
// Getting the distribution visits every element, so it is an effective check for
// use-after-free, etc.
GetDistributionMap(new_h, &new_dist_map);
// The distribution should be identical
EXPECT_EQ(new_dist_map, old_dist_map);
TEST_F(HashRingTest, TotalAllocation) {
const uint32_t replication = 10;
// Test total allocation for basic addresses
vector<IpAddr> basic_addresses;
VerifyTotalAllocation(basic_addresses, replication);
// Test total allocation for 100 addresses
vector<IpAddr> multiple_addresses;
GetMultipleNetworkAddresses("total_alloc_host", 100, multiple_addresses);
VerifyTotalAllocation(multiple_addresses, replication);
TEST_F(HashRingTest, Collisions) {
// Collisions should be rare, even with a relatively large number of nodes.
// If there are collisions, the counts will not match up, because a node will not have
// the specified replication. This verifies that that there are no collisions on this
// case. Clearly, there are cases where there will be collisions and this could fail
// innocuously if the algorithm changes, but this is a good canary in the coalmine to
// catch changes that cause abnormal collisions.
const uint32_t replication = 25;
vector<IpAddr> multiple_addresses;
GetMultipleNetworkAddresses("collision_host", 1000, multiple_addresses);
HashRing h(replication);
for (const IpAddr& addr : multiple_addresses) h.AddNode(addr);
VerifyCounts(h, 1000);
TEST_F(HashRingTest, MaxMinRatio) {
// The ratio of the maximum node to the minimum node should be bounded.
// This is purely a functional question. It makes no assumption about the underlying
// statistics. If this ratio regresses, it could impact scheduling.
const uint32_t replication = 25;
vector<uint32_t> node_counts = {10, 25, 100};
vector<double> max_expected_ratios = {3.0, 3.0, 4.5};
DCHECK_EQ(node_counts.size(), max_expected_ratios.size());
// Anecdotally, the base hostname has a noticeable (but bounded) impact on the ratio.
// Test with multiple base hostnames.
vector<string> base_hostnames = {"minmaxhost", "qwert", "asdfjkl", "2a8b67", "91htgh"};
for (int i = 0; i < node_counts.size(); i++) {
for (const string& base_hostname : base_hostnames) {
vector<IpAddr> addresses;
GetMultipleNetworkAddresses(base_hostname, node_counts[i], addresses);
VerifyMaxMinRatio(addresses, replication, max_expected_ratios[i]);