blob: 2a76fa4305202f5ddfb46d832023a1ec9ca27998 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef BRPC_LOAD_BALANCER_H
#define BRPC_LOAD_BALANCER_H
#include "bvar/passive_status.h"
#include "brpc/describable.h"
#include "brpc/destroyable.h"
#include "brpc/excluded_servers.h" // ExcludedServers
#include "brpc/shared_object.h" // SharedObject
#include "brpc/server_id.h" // ServerId
#include "brpc/extension.h" // Extension<T>
namespace brpc {
class Controller;
// Select a server from a set of servers (in form of ServerId).
class LoadBalancer : public NonConstDescribable, public Destroyable {
public:
struct SelectIn {
int64_t begin_time_us;
// Weight of different nodes could be changed.
bool changable_weights;
bool has_request_code;
uint64_t request_code;
const ExcludedServers* excluded;
};
struct SelectOut {
explicit SelectOut(SocketUniquePtr* ptr_in)
: ptr(ptr_in), need_feedback(false) {}
SocketUniquePtr* ptr;
bool need_feedback;
};
struct CallInfo {
// Exactly same with SelectIn.begin_time_us, may be different from
// controller->_begin_time_us which is beginning of the RPC.
int64_t begin_time_us;
// Remote side of the call.
SocketId server_id;
// A RPC may have multiple calls, this error may be different from
// controller->ErrorCode();
int error_code;
// The controller for the RPC. Should NOT be saved in Feedback()
// and used after the function.
const Controller* controller;
};
LoadBalancer() { }
// ====================================================================
// All methods must be thread-safe!
// Take a look at policy/round_robin_load_balancer.cpp to see how to
// make SelectServer() low contended by using DoublyBufferedData<>
// =====================================================================
// Add `server' into this balancer.
// Returns true on added.
virtual bool AddServer(const ServerId& server) = 0;
// Remove `server' from this balancer.
// Returns true iff the server was removed.
virtual bool RemoveServer(const ServerId& server) = 0;
// Add a list of `servers' into this balancer.
// Returns number of servers added.
virtual size_t AddServersInBatch(const std::vector<ServerId>& servers) = 0;
// Remove a list of `servers' from this balancer.
// Returns number of servers removed.
virtual size_t RemoveServersInBatch(const std::vector<ServerId>& servers) = 0;
// Select a server and address it into `out->ptr'.
// If Feedback() should be called when the RPC is done, set
// out->need_feedback to true.
// Returns 0 on success, errno otherwise.
virtual int SelectServer(const SelectIn& in, SelectOut* out) = 0;
// Feedback this balancer with CallInfo gathered before RPC finishes.
// This function is only called when corresponding SelectServer was
// successful and out->need_feedback was set to true.
virtual void Feedback(const CallInfo& /*info*/) { }
// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual LoadBalancer* New(const butil::StringPiece& params) const = 0;
protected:
virtual ~LoadBalancer() { }
// Returns true and set `out' if the server is available (not failed, not logoff).
// Otherwise, returns false.
static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
};
DECLARE_bool(show_lb_in_vars);
DECLARE_int32(default_weight_of_wlb);
// A intrusively shareable load balancer created from name.
class SharedLoadBalancer : public SharedObject, public NonConstDescribable {
public:
SharedLoadBalancer();
~SharedLoadBalancer();
int Init(const char* lb_name);
int SelectServer(const LoadBalancer::SelectIn& in,
LoadBalancer::SelectOut* out) {
if (FLAGS_show_lb_in_vars && !_exposed) {
ExposeLB();
}
return _lb->SelectServer(in, out);
}
void Feedback(const LoadBalancer::CallInfo& info) { _lb->Feedback(info); }
bool AddServer(const ServerId& server) {
if (_lb->AddServer(server)) {
_weight_sum.fetch_add(1, butil::memory_order_relaxed);
return true;
}
return false;
}
bool RemoveServer(const ServerId& server) {
if (_lb->RemoveServer(server)) {
_weight_sum.fetch_sub(1, butil::memory_order_relaxed);
return true;
}
return false;
}
size_t AddServersInBatch(const std::vector<ServerId>& servers) {
size_t n = _lb->AddServersInBatch(servers);
if (n) {
_weight_sum.fetch_add(n, butil::memory_order_relaxed);
}
return n;
}
size_t RemoveServersInBatch(const std::vector<ServerId>& servers) {
size_t n = _lb->RemoveServersInBatch(servers);
if (n) {
_weight_sum.fetch_sub(n, butil::memory_order_relaxed);
}
return n;
}
virtual void Describe(std::ostream& os, const DescribeOptions&);
virtual int Weight() {
return _weight_sum.load(butil::memory_order_relaxed);
}
private:
static bool ParseParameters(const butil::StringPiece& lb_protocol,
std::string* lb_name,
butil::StringPiece* lb_params);
static void DescribeLB(std::ostream& os, void* arg);
void ExposeLB();
LoadBalancer* _lb;
butil::atomic<int> _weight_sum;
volatile bool _exposed;
butil::Mutex _st_mutex;
bvar::PassiveStatus<std::string> _st;
};
// For registering global instances.
inline Extension<const LoadBalancer>* LoadBalancerExtension() {
return Extension<const LoadBalancer>::instance();
}
} // namespace brpc
#endif // BRPC_LOAD_BALANCER_H