Bugfix: WeightedRandomizedLoadBalancer returns 0 without server (#3108)

diff --git a/src/brpc/load_balancer.cpp b/src/brpc/load_balancer.cpp
index 16e051d..2532d9e 100644
--- a/src/brpc/load_balancer.cpp
+++ b/src/brpc/load_balancer.cpp
@@ -19,6 +19,7 @@
 #include <gflags/gflags.h>
 #include "brpc/reloadable_flags.h"
 #include "brpc/load_balancer.h"
+#include "brpc/socket.h"
 
 
 namespace brpc {
@@ -34,6 +35,15 @@
 // For assigning unique names for lb.
 static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);
 
+bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
+    SocketUniquePtr ptr;
+    bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
+    if (res) {
+        *out = std::move(ptr);
+    }
+    return res;
+}
+
 void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
     (static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
 }
diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517..2a76fa4 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -105,6 +105,10 @@
 
 protected:
     virtual ~LoadBalancer() { }
+
+    // Returns true and set `out' if the server is available (not failed, not logoff).
+    // Otherwise, returns false.
+    static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
 };
 
 DECLARE_bool(show_lb_in_vars);
diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp
index 085ddf9..d29ad55 100644
--- a/src/brpc/policy/consistent_hashing_load_balancer.cpp
+++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp
@@ -323,8 +323,7 @@
     for (size_t i = 0; i < s->size(); ++i) {
         if (((i + 1) == s->size() // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, choice->server_sock.id))
-            && Socket::Address(choice->server_sock.id, out->ptr) == 0 
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(choice->server_sock.id, out->ptr)) {
             return 0;
         } else {
             if (++choice == s->end()) {
diff --git a/src/brpc/policy/locality_aware_load_balancer.cpp b/src/brpc/policy/locality_aware_load_balancer.cpp
index 68d85ad..beea516 100644
--- a/src/brpc/policy/locality_aware_load_balancer.cpp
+++ b/src/brpc/policy/locality_aware_load_balancer.cpp
@@ -302,8 +302,7 @@
             if (index < n) {
                 continue;
             }
-        } else if (Socket::Address(info.server_id, out->ptr) == 0
-                   && (*out->ptr)->IsAvailable()) {
+        } else if (IsServerAvailable(info.server_id, out->ptr)) {
             if ((ntry + 1) == n  // Instead of fail with EHOSTDOWN, we prefer
                                  // choosing the server again.
                 || !ExcludedServers::IsExcluded(in.excluded, info.server_id)) {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp
index 65cfdee..4ff43d7 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -113,8 +113,7 @@
         const SocketId id = s->server_list[offset].id;
         if (((i + 1) == n  // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, id))
-            && Socket::Address(id, out->ptr) == 0
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(id, out->ptr)) {
             // We found an available server
             return 0;
         }
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp
index fa69aa8..cf67624 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -120,8 +120,7 @@
         const SocketId id = s->server_list[tls.offset].id;
         if (((i + 1) == n  // always take last chance
              || !ExcludedServers::IsExcluded(in.excluded, id))
-            && Socket::Address(id, out->ptr) == 0
-            && (*out->ptr)->IsAvailable()) {
+            && IsServerAvailable(id, out->ptr)) {
             s.tls() = tls;
             return 0;
         }
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 819c550..46923ac 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -117,10 +117,6 @@
     return _db_servers.Modify(BatchRemove, servers);
 }
 
-bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
-    return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
-}
-
 int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
     butil::DoublyBufferedData<Servers>::ScopedPtr s;
     if (_db_servers.Read(&s) != 0) {
@@ -144,13 +140,13 @@
             continue;
         }
         random_traversed.insert(id);
-        if (0 == IsServerAvailable(id, out->ptr)) {
+        if (IsServerAvailable(id, out->ptr)) {
             // An available server is found.
             return 0;
         }
     }
 
-    if (random_traversed.size() == n) {
+    if (random_traversed.size() < n) {
         // Try to traverse the remaining servers to find an available server.
         uint32_t offset = butil::fast_rand_less_than(n);
         uint32_t stride = bthread::prime_offset();
@@ -161,19 +157,18 @@
                 continue;
             }
             if (IsServerAvailable(id, out->ptr)) {
-                // An available server is found.
-                return 0;
+                if (!ExcludedServers::IsExcluded(in.excluded, id)) {
+                    // Prioritize servers that are not excluded.
+                    return 0;
+                }
             }
         }
     }
 
-    if (NULL != out->ptr) {
-        // Use the excluded but available server.
-        return 0;
-    }
-
-    // After traversing the whole server list, no available server is found.
-    return EHOSTDOWN;
+    // Returns EHOSTDOWN, if no available server is found
+    // after traversing the whole server list.
+    // Otherwise, returns 0 with a available excluded server.
+    return NULL == out->ptr ? EHOSTDOWN : 0;
 }
 
 LoadBalancer* WeightedRandomizedLoadBalancer::New(
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h b/src/brpc/policy/weighted_randomized_load_balancer.h
index 3842aff..9d7a705 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -41,7 +41,7 @@
     void Describe(std::ostream& os, const DescribeOptions&) override;
 
     struct Server {
-        Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+        explicit Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
             : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
         SocketId id;
         uint32_t weight;
@@ -61,7 +61,6 @@
     static bool Remove(Servers& bg, const ServerId& id);
     static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
     static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers);
-    static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
 
     butil::DoublyBufferedData<Servers> _db_servers;
 };
diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp
index cca44a0..0705948 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -431,7 +431,7 @@
 };
 
 void* select_server(void* arg) {
-    SelectArg *sa = (SelectArg *)arg;
+    SelectArg *sa = (SelectArg*)arg;
     brpc::LoadBalancer* c = sa->lb;
     brpc::SocketUniquePtr ptr;
     CountMap *selected_count = new CountMap;
@@ -951,6 +951,7 @@
     brpc::policy::WeightedRandomizedLoadBalancer wrlb;
     size_t valid_weight_num = 4;
 
+    std::vector<brpc::SocketId> ids;
     // Add server to selected list. The server with invalid weight will be skipped.
     for (size_t i = 0;  i < ARRAY_SIZE(servers); ++i) {
         const char *addr = servers[i];
@@ -961,6 +962,7 @@
         options.remote_side = dummy;
         options.user = new SaveRecycle;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
+        ids.emplace_back(id.id);
         id.tag = weight[i];
         if (i < valid_weight_num) {
             int weight_num = 0;
@@ -1010,6 +1012,16 @@
         // actual_rate <= expect_rate * 2
         ASSERT_LE(actual_rate, expect_rate * 2);
     }
+
+    for (size_t i = 1; i < ids.size(); ++i) {
+        brpc::Socket::SetFailed(ids[i]);
+    }
+    select_result.clear();
+    for (int i = 0; i < run_times; ++i) {
+        EXPECT_EQ(0, wrlb.SelectServer(in, &out));
+        // The only choice is servers[0].
+        ASSERT_STREQ(butil::endpoint2str(ptr->remote_side()).c_str(), servers[0]);
+    }
 }
 
 TEST_F(LoadBalancerTest, health_check_no_valid_server) {