blob: a92937c7549c1f75ef27d1109c378ba522ffc25c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <string>
#include <vector>
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
#include "utils/synchronize.h"
namespace dsn {
static constexpr int kInvalidIndex = -1;
// Base on group_address, a group of host_post.
// Please use host_port like example if you want call group of host_port.
// e.g.
//
// dsn::rpc_host_port group;
// group.assign_group("test");
// group.group_host_port()->add(host_port("test_fqdn", 34601));
// group.group_host_port()->add(host_port("test_fqdn", 34602));
// group.group_host_port()->add(host_port("test_fqdn", 34603));
//
class rpc_group_host_port
{
public:
rpc_group_host_port(const char *name);
rpc_group_host_port(const rpc_group_address *g_addr);
rpc_group_host_port(const rpc_group_host_port &other);
rpc_group_host_port &operator=(const rpc_group_host_port &other);
bool add(const host_port &hp) WARN_UNUSED_RESULT;
void add_list(const std::vector<host_port> &hps)
{
for (const auto &hp : hps) {
LOG_WARNING_IF(!add(hp), "duplicate adress {}", hp);
}
}
void set_leader(const host_port &hp);
bool remove(const host_port &hp) WARN_UNUSED_RESULT;
bool contains(const host_port &hp) const WARN_UNUSED_RESULT;
int count() const;
const std::vector<host_port> &members() const
{
arl_t l(_lock);
return _members;
}
uint32_t random_index_unlocked() const;
host_port random_member() const
{
arl_t l(_lock);
return _members.empty() ? host_port::s_invalid_host_port
: _members[random_index_unlocked()];
}
host_port next(const host_port &current) const;
host_port leader() const
{
arl_t l(_lock);
return _leader_index >= 0 ? _members[_leader_index] : host_port::s_invalid_host_port;
}
void leader_forward();
// We should use 'possible_leader' for rpc group call, but not 'leader()'.
// Caz we not have leader sometimes in initialization phase.
host_port possible_leader();
// failure_detector should avoid failure detecting logic is affected by rpc failure or rpc
// forwarding. So we need a switch to contronl update leader automatically.
bool is_update_leader_automatically() const { return _update_leader_automatically; }
void set_update_leader_automatically(bool value) { _update_leader_automatically = value; }
const char *name() const { return _name.c_str(); }
private:
typedef std::vector<host_port> members_t;
typedef utils::auto_read_lock arl_t;
typedef utils::auto_write_lock awl_t;
mutable utils::rw_lock_nr _lock;
members_t _members;
// It's not always valid even if _members is not empty.
// Initialization is a possible value, which needs to be negotiated.
int _leader_index;
bool _update_leader_automatically;
std::string _name;
};
// ------------------ inline implementation --------------------
inline rpc_group_host_port::rpc_group_host_port(const char *name)
{
_name = name;
_leader_index = kInvalidIndex;
_update_leader_automatically = true;
}
inline rpc_group_host_port::rpc_group_host_port(const rpc_group_host_port &other)
{
_name = other._name;
_leader_index = other._leader_index;
_update_leader_automatically = other._update_leader_automatically;
_members = other._members;
}
inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr)
{
_name = g_addr->name();
for (const auto &addr : g_addr->members()) {
CHECK_TRUE(add(host_port::from_address(addr)));
}
_update_leader_automatically = g_addr->is_update_leader_automatically();
set_leader(host_port::from_address(g_addr->leader()));
}
inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other)
{
if (this == &other) {
return *this;
}
_name = other._name;
_leader_index = other._leader_index;
_update_leader_automatically = other._update_leader_automatically;
_members = other._members;
return *this;
}
inline bool rpc_group_host_port::add(const host_port &hp)
{
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
awl_t l(_lock);
if (_members.end() == std::find(_members.begin(), _members.end(), hp)) {
_members.push_back(hp);
return true;
} else {
return false;
}
}
inline void rpc_group_host_port::leader_forward()
{
awl_t l(_lock);
if (_members.empty()) {
return;
}
_leader_index = (_leader_index + 1) % _members.size();
}
inline void rpc_group_host_port::set_leader(const host_port &hp)
{
awl_t l(_lock);
if (hp.is_invalid()) {
_leader_index = kInvalidIndex;
return;
}
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
for (int i = 0; i < _members.size(); i++) {
if (_members[i] == hp) {
_leader_index = i;
return;
}
}
_members.push_back(hp);
_leader_index = static_cast<int>(_members.size() - 1);
}
inline uint32_t rpc_group_host_port::random_index_unlocked() const
{
CHECK(!_members.empty(), "invaild group member size");
return rand::next_u32(0, static_cast<uint32_t>(_members.size() - 1));
}
inline host_port rpc_group_host_port::possible_leader()
{
awl_t l(_lock);
if (_members.empty()) {
return host_port::s_invalid_host_port;
}
if (_leader_index == kInvalidIndex) {
_leader_index = random_index_unlocked();
}
return _members[_leader_index];
}
inline bool rpc_group_host_port::remove(const host_port &hp)
{
awl_t l(_lock);
auto it = std::find(_members.begin(), _members.end(), hp);
if (it == _members.end()) {
return false;
}
if (kInvalidIndex != _leader_index && hp == _members[_leader_index]) {
_leader_index = kInvalidIndex;
}
_members.erase(it);
return true;
}
inline bool rpc_group_host_port::contains(const host_port &hp) const
{
arl_t l(_lock);
return _members.end() != std::find(_members.begin(), _members.end(), hp);
}
inline int rpc_group_host_port::count() const
{
arl_t l(_lock);
return _members.size();
}
inline host_port rpc_group_host_port::next(const host_port &current) const
{
arl_t l(_lock);
if (_members.empty()) {
return host_port::s_invalid_host_port;
}
if (current.is_invalid()) {
return _members[random_index_unlocked()];
}
auto it = std::find(_members.begin(), _members.end(), current);
if (it == _members.end()) {
return _members[random_index_unlocked()];
}
it++;
return it == _members.end() ? _members[0] : *it;
}
} // namespace dsn