Bugfix: WeightedRandomizedLoadBalancer returns 0 without server (#3108)
diff --git a/src/brpc/load_balancer.cpp b/src/brpc/load_balancer.cpp
index 16e051d..2532d9e 100644
--- a/src/brpc/load_balancer.cpp
+++ b/src/brpc/load_balancer.cpp
@@ -19,6 +19,7 @@
#include <gflags/gflags.h>
#include "brpc/reloadable_flags.h"
#include "brpc/load_balancer.h"
+#include "brpc/socket.h"
namespace brpc {
@@ -34,6 +35,15 @@
// For assigning unique names for lb.
static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);
+bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
+ SocketUniquePtr ptr;
+ bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
+ if (res) {
+ *out = std::move(ptr);
+ }
+ return res;
+}
+
void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
(static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
}
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517..2a76fa4 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -105,6 +105,10 @@
protected:
virtual ~LoadBalancer() { }
+
+ // Returns true and set `out' if the server is available (not failed, not logoff).
+ // Otherwise, returns false.
+ static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
};
DECLARE_bool(show_lb_in_vars);
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 085ddf9..d29ad55 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -323,8 +323,7 @@
for (size_t i = 0; i < s->size(); ++i) {
if (((i + 1) == s->size() // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
- && Socket::Address(choice->server_sock.id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(choice->server_sock.id, out->ptr)) {
return 0;
} else {
if (++choice == s->end()) {
diff --git a/src/brpc/policy/locality_aware_load_balancer.cpp b/src/brpc/policy/locality_aware_load_balancer.cpp
index 68d85ad..beea516 100644
--- a/src/brpc/policy/locality_aware_load_balancer.cpp
+++ b/src/brpc/policy/locality_aware_load_balancer.cpp
@@ -302,8 +302,7 @@
if (index < n) {
continue;
}
- } else if (Socket::Address(info.server_id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ } else if (IsServerAvailable(info.server_id, out->ptr)) {
if ((ntry + 1) == n // Instead of fail with EHOSTDOWN, we prefer
// choosing the server again.
|| !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp
index 65cfdee..4ff43d7 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -113,8 +113,7 @@
const SocketId id = s->server_list[offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
- && Socket::Address(id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(id, out->ptr)) {
// We found an available server
return 0;
}
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp
index fa69aa8..cf67624 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -120,8 +120,7 @@
const SocketId id = s->server_list[tls.offset].id;
if (((i + 1) == n // always take last chance
|| !ExcludedServers::IsExcluded(in.excluded, id))
- && Socket::Address(id, out->ptr) == 0
- && (*out->ptr)->IsAvailable()) {
+ && IsServerAvailable(id, out->ptr)) {
s.tls() = tls;
return 0;
}
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 819c550..46923ac 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -117,10 +117,6 @@
return _db_servers.Modify(BatchRemove, servers);
}
-bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
- return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
-}
-
int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
butil::DoublyBufferedData<Servers>::ScopedPtr s;
if (_db_servers.Read(&s) != 0) {
@@ -144,13 +140,13 @@
continue;
}
random_traversed.insert(id);
- if (0 == IsServerAvailable(id, out->ptr)) {
+ if (IsServerAvailable(id, out->ptr)) {
// An available server is found.
return 0;
}
}
- if (random_traversed.size() == n) {
+ if (random_traversed.size() < n) {
// Try to traverse the remaining servers to find an available server.
uint32_t offset = butil::fast_rand_less_than(n);
uint32_t stride = bthread::prime_offset();
@@ -161,19 +157,18 @@
continue;
}
if (IsServerAvailable(id, out->ptr)) {
- // An available server is found.
- return 0;
+ if (!ExcludedServers::IsExcluded(in.excluded, id)) {
+ // Prioritize servers that are not excluded.
+ return 0;
+ }
}
}
}
- if (NULL != out->ptr) {
- // Use the excluded but available server.
- return 0;
- }
-
- // After traversing the whole server list, no available server is found.
- return EHOSTDOWN;
+ // Returns EHOSTDOWN, if no available server is found
+ // after traversing the whole server list.
+ // Otherwise, returns 0 with a available excluded server.
+ return NULL == out->ptr ? EHOSTDOWN : 0;
}
LoadBalancer* WeightedRandomizedLoadBalancer::New(
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h b/src/brpc/policy/weighted_randomized_load_balancer.h
index 3842aff..9d7a705 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -41,7 +41,7 @@
void Describe(std::ostream& os, const DescribeOptions&) override;
struct Server {
- Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+ explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
: id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
SocketId id;
uint32_t weight;
@@ -61,7 +61,6 @@
static bool Remove(Servers& bg, const ServerId& id);
static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
- static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
butil::DoublyBufferedData<Servers> _db_servers;
};
diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp
index cca44a0..0705948 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -431,7 +431,7 @@
};
void* select_server(void* arg) {
- SelectArg *sa = (SelectArg *)arg;
+ SelectArg *sa = (SelectArg*)arg;
brpc::LoadBalancer* c = sa->lb;
brpc::SocketUniquePtr ptr;
CountMap *selected_count = new CountMap;
@@ -951,6 +951,7 @@
brpc::policy::WeightedRandomizedLoadBalancer wrlb;
size_t valid_weight_num = 4;
+ std::vector<brpc::SocketId> ids;
// Add server to selected list. The server with invalid weight will be skipped.
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
const char *addr = servers[i];
@@ -961,6 +962,7 @@
options.remote_side = dummy;
options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
+ ids.emplace_back(id.id);
id.tag = weight[i];
if (i < valid_weight_num) {
int weight_num = 0;
@@ -1010,6 +1012,16 @@
// actual_rate <= expect_rate * 2
ASSERT_LE(actual_rate, expect_rate * 2);
}
+
+ for (size_t i = 1; i < ids.size(); ++i) {
+ brpc::Socket::SetFailed(ids[i]);
+ }
+ select_result.clear();
+ for (int i = 0; i < run_times; ++i) {
+ EXPECT_EQ(0, wrlb.SelectServer(in, &out));
+ // The only choice is servers[0].
+ ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(), servers[0]);
+ }
}
TEST_F(LoadBalancerTest, health_check_no_valid_server) {