blob: 8d5e434dc884350a8bffdd92d2969e16944e4605 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
#include "meta/meta_bulk_load_ingestion_context.h"
#include "meta/meta_data.h"
#include "meta_test_base.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/fail_point.h"
namespace dsn {
namespace replication {
class node_context_test : public meta_test_base
{
public:
void SetUp()
{
_context = ingestion_context::node_context();
_context.node_ingesting_count = 0;
_context.address = NODE;
FLAGS_bulk_load_node_max_ingesting_count = 1;
FLAGS_bulk_load_node_min_disk_count = 1;
}
void TearDown()
{
_context.disk_ingesting_counts.clear();
_context.node_ingesting_count = 0;
}
void mock_context(const std::string &disk_tag,
const uint32_t disk_count = 0,
const uint32_t total_count = 0)
{
_context.node_ingesting_count = total_count;
_context.disk_ingesting_counts[disk_tag] = disk_count;
}
void init_disk(const std::string &disk_tag) { _context.init_disk(disk_tag); }
uint32_t get_disk_count(const std::string &disk_tag)
{
if (_context.disk_ingesting_counts.find(disk_tag) == _context.disk_ingesting_counts.end()) {
return -1;
}
return _context.disk_ingesting_counts[disk_tag];
}
void mock_get_max_disk_ingestion_count(const uint32_t node_min_disk_count,
const uint32_t current_disk_count)
{
FLAGS_bulk_load_node_min_disk_count = node_min_disk_count;
_context.disk_ingesting_counts.clear();
for (auto i = 0; i < current_disk_count; i++) {
_context.init_disk(std::to_string(i));
}
}
uint32_t get_max_disk_ingestion_count(const uint32_t max_node_count) const
{
return _context.get_max_disk_ingestion_count(max_node_count);
}
bool check_if_add() { return _context.check_if_add(TAG); }
public:
ingestion_context::node_context _context;
const host_port NODE = host_port("localhost", 10086);
const std::string TAG = "default";
const std::string TAG2 = "tag2";
};
TEST_F(node_context_test, init_disk_test)
{
mock_context(TAG, 1, 1);
struct init_disk_test
{
std::string disk_tag;
uint32_t expected_disk_count;
} tests[] = {{TAG, 1}, {TAG2, 0}};
for (const auto &test : tests) {
init_disk(test.disk_tag);
ASSERT_EQ(get_disk_count(test.disk_tag), test.expected_disk_count);
}
}
TEST_F(node_context_test, get_max_disk_ingestion_count_test)
{
struct get_max_disk_ingestion_count_test
{
uint32_t max_node_count;
uint32_t min_disk_count;
uint32_t current_disk_count;
uint32_t expected_count;
} tests[] = {// min_disk_count = 1
{1, 1, 1, 1},
{2, 1, 1, 2},
// min_disk_count = 3
{1, 3, 1, 1},
{4, 3, 3, 2},
// min_disk_count = 7
{1, 7, 1, 1},
{1, 7, 11, 1},
{7, 7, 1, 1},
{7, 7, 11, 1},
{8, 7, 3, 2},
{8, 7, 8, 1},
{8, 7, 11, 1}};
for (const auto &test : tests) {
mock_get_max_disk_ingestion_count(test.min_disk_count, test.current_disk_count);
ASSERT_EQ(get_max_disk_ingestion_count(test.max_node_count), test.expected_count);
}
}
TEST_F(node_context_test, check_if_add_test)
{
fail::setup();
struct check_if_add_test
{
const uint32_t max_node_count;
const uint32_t current_node_count;
std::string max_disk_count_str;
const uint32_t current_disk_count;
bool expected_result;
} tests[] = {{1, 1, "1", 1, false}, {3, 2, "2", 2, false}, {1, 0, "7", 0, true}};
for (const auto &test : tests) {
FLAGS_bulk_load_node_max_ingesting_count = test.max_node_count;
mock_context(TAG, test.current_disk_count, test.current_node_count);
auto str = "return(" + test.max_disk_count_str + ")";
fail::cfg("ingestion_node_context_disk_count", str);
ASSERT_EQ(check_if_add(), test.expected_result);
}
fail::teardown();
}
class ingestion_context_test : public meta_test_base
{
public:
/// mock app and node info context
/// node1 node2 node3 node4
/// p0(tag1) s0(tag1) s0(tag2)
/// s1(tag1) s1(tag2) p1(tag2)
/// s2(tag2) p2(tag1) s2(tag1)
/// p3(tag1) s3(tag1) s3(tag2)
void SetUp()
{
_context = std::make_unique<ingestion_context>();
add_node_context({NODE1, NODE2, NODE3, NODE4});
mock_app();
FLAGS_bulk_load_node_min_disk_count = MIN_DISK_COUNT;
FLAGS_bulk_load_node_max_ingesting_count = MAX_NODE_COUNT;
}
void TearDown() { _context->reset_all(); }
void update_max_node_count(const uint32_t max_node_count)
{
FLAGS_bulk_load_node_max_ingesting_count = max_node_count;
}
bool check_node_ingestion(const uint32_t max_node_count,
const host_port &node,
const std::string &tag)
{
_context->reset_all();
update_max_node_count(max_node_count);
_context->_nodes_context[NODE1] = ingestion_context::node_context(NODE1, TAG1);
_context->_nodes_context[NODE1].add(TAG1);
return _context->check_node_ingestion(node, tag);
}
void mock_app()
{
app_info ainfo;
ainfo.app_id = APP_ID;
ainfo.partition_count = PARTITION_COUNT;
_app = std::make_shared<app_state>(ainfo);
_app->partitions.reserve(PARTITION_COUNT);
_app->helpers->contexts.reserve(PARTITION_COUNT);
mock_partition(0,
{NODE1, NODE2, NODE3},
{TAG1, TAG1, TAG2},
_app->partitions[0],
_app->helpers->contexts[0]);
mock_partition(1,
{NODE4, NODE1, NODE2},
{TAG2, TAG1, TAG2},
_app->partitions[1],
_app->helpers->contexts[1]);
mock_partition(2,
{NODE3, NODE1, NODE4},
{TAG1, TAG2, TAG1},
_app->partitions[2],
_app->helpers->contexts[2]);
mock_partition(3,
{NODE2, NODE3, NODE4},
{TAG1, TAG1, TAG2},
_app->partitions[3],
_app->helpers->contexts[3]);
}
void mock_partition(const uint32_t pidx,
std::vector<host_port> nodes,
const std::vector<std::string> tags,
partition_configuration &config,
config_context &cc)
{
config.pid = gpid(APP_ID, pidx);
SET_IP_AND_HOST_PORT_BY_DNS(config, primary, nodes[0]);
SET_IPS_AND_HOST_PORTS_BY_DNS(config, secondaries, nodes[1], nodes[2]);
auto count = nodes.size();
for (auto i = 0; i < count; i++) {
serving_replica r;
r.node = nodes[i];
r.disk_tag = tags[i];
cc.serving.emplace_back(r);
}
}
void add_node_context(std::vector<host_port> nodes)
{
for (const auto &address : nodes) {
ingestion_context::node_context node(address, TAG1);
node.init_disk(TAG2);
_context->_nodes_context[address] = node;
}
}
bool try_partition_ingestion(const uint32_t pidx)
{
return _context->try_partition_ingestion(_app->partitions[pidx],
_app->helpers->contexts[pidx]);
}
void add_partition(const uint32_t pidx)
{
auto pinfo = ingestion_context::partition_node_info(_app->partitions[pidx],
_app->helpers->contexts[pidx]);
_context->add_partition(pinfo);
}
void remove_partition(const uint32_t pidx) { _context->remove_partition(gpid(APP_ID, pidx)); }
bool is_partition_ingesting(const uint32_t pidx) const
{
return _context->_running_partitions.find(gpid(APP_ID, pidx)) !=
_context->_running_partitions.end();
}
uint32_t get_app_ingesting_count() const { return _context->get_app_ingesting_count(APP_ID); }
void reset_app() { return _context->reset_app(APP_ID); }
int32_t get_node_running_count(const host_port &node)
{
if (_context->_nodes_context.find(node) == _context->_nodes_context.end()) {
return 0;
}
return _context->_nodes_context[node].node_ingesting_count;
}
uint32_t get_disk_running_count(const host_port &node, const std::string &disk_tag)
{
if (_context->_nodes_context.find(node) == _context->_nodes_context.end()) {
return 0;
}
auto node_cc = _context->_nodes_context[node];
if (node_cc.disk_ingesting_counts.find(disk_tag) == node_cc.disk_ingesting_counts.end()) {
return 0;
}
return node_cc.disk_ingesting_counts[disk_tag];
}
bool validate_count(const host_port &node,
const uint32_t expected_node_count,
const uint32_t expected_disk1_count,
const uint32_t expected_disk2_count)
{
return get_node_running_count(node) == expected_node_count &&
get_disk_running_count(node, TAG1) == expected_disk1_count &&
get_disk_running_count(node, TAG2) == expected_disk2_count;
}
public:
std::unique_ptr<ingestion_context> _context;
std::shared_ptr<app_state> _app;
const uint32_t APP_ID = 1;
const uint32_t PARTITION_COUNT = 4;
const uint32_t MAX_NODE_COUNT = 2;
const uint32_t MIN_DISK_COUNT = 2;
const host_port NODE1 = host_port("localhost", 10086);
const host_port NODE2 = host_port("localhost", 10085);
const host_port NODE3 = host_port("localhost", 10087);
const host_port NODE4 = host_port("localhost", 10088);
const std::string TAG1 = "tag1";
const std::string TAG2 = "tag2";
};
TEST_F(ingestion_context_test, check_node_ingestion_test)
{
struct check_node_ingestion_test
{
host_port node;
std::string tag;
uint32_t max_node_count;
bool expected_result;
} tests[] = {{NODE2, TAG1, 1, true}, {NODE1, TAG2, 2, true}, {NODE1, TAG2, 1, false}};
for (const auto &test : tests) {
ASSERT_EQ(check_node_ingestion(test.max_node_count, test.node, test.tag),
test.expected_result);
}
}
TEST_F(ingestion_context_test, try_partition_ingestion_test)
{
update_max_node_count(1);
ASSERT_EQ(try_partition_ingestion(0), true);
ASSERT_EQ(try_partition_ingestion(1), false);
update_max_node_count(2);
ASSERT_EQ(try_partition_ingestion(1), false);
ASSERT_EQ(try_partition_ingestion(2), true);
ASSERT_EQ(try_partition_ingestion(3), false);
update_max_node_count(3);
ASSERT_EQ(try_partition_ingestion(1), true);
ASSERT_EQ(try_partition_ingestion(3), true);
ASSERT_EQ(get_app_ingesting_count(), 4);
}
TEST_F(ingestion_context_test, operation_test)
{
ASSERT_FALSE(is_partition_ingesting(0));
add_partition(0);
ASSERT_TRUE(is_partition_ingesting(0));
ASSERT_TRUE(validate_count(NODE1, 1, 1, 0));
ASSERT_TRUE(validate_count(NODE2, 1, 1, 0));
ASSERT_TRUE(validate_count(NODE3, 1, 0, 1));
ASSERT_TRUE(validate_count(NODE4, 0, 0, 0));
ASSERT_EQ(get_app_ingesting_count(), 1);
ASSERT_FALSE(is_partition_ingesting(1));
add_partition(1);
ASSERT_TRUE(is_partition_ingesting(1));
ASSERT_TRUE(validate_count(NODE1, 2, 2, 0));
ASSERT_TRUE(validate_count(NODE2, 2, 1, 1));
ASSERT_TRUE(validate_count(NODE3, 1, 0, 1));
ASSERT_TRUE(validate_count(NODE4, 1, 0, 1));
ASSERT_EQ(get_app_ingesting_count(), 2);
add_partition(2);
remove_partition(0);
ASSERT_TRUE(is_partition_ingesting(2));
ASSERT_FALSE(is_partition_ingesting(0));
ASSERT_TRUE(validate_count(NODE1, 2, 1, 1));
ASSERT_TRUE(validate_count(NODE2, 1, 0, 1));
ASSERT_TRUE(validate_count(NODE3, 1, 1, 0));
ASSERT_TRUE(validate_count(NODE4, 2, 1, 1));
ASSERT_EQ(get_app_ingesting_count(), 2);
reset_app();
ASSERT_TRUE(validate_count(NODE1, 0, 0, 0));
ASSERT_TRUE(validate_count(NODE2, 0, 0, 0));
ASSERT_TRUE(validate_count(NODE3, 0, 0, 0));
ASSERT_TRUE(validate_count(NODE4, 0, 0, 0));
ASSERT_EQ(get_app_ingesting_count(), 0);
}
} // namespace replication
} // namespace dsn