blob: 37d918198111bbfd9d807d6d9c7541155f5389dc [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
///////////////////////////////////////////////////////////////////////////////
// This file defines the Server class.
// Server is a Base class for any heron server. The application server
// derives from the base class by implementing a bunch of virtual methods
// dealing with what to do when a new connection appears, a new packet appears
// and so on.
// The Server class knows how to bind and listen on a particular port.
// It opens up a new Connection when a client connects to that port. It reads
// new requests off of the Connection and passes it on to the Derived class
// via the virtual methods. It also carries responses from the Derived class
// thru the Connection class to the remote client.
// An application Server derives from the Server class and has to implement
// the following methods.
// 1. HandleNewConnection:- When a new connection is established with a remote
// client, the server calls this method. Server can
// send greeting message and/or establish some state
// associated with the particular connection
// 2. HandleConnectionClose:- When a client connection closes(either because
// the client hung up, or we explicitly closed the
// connection or because of a read/write error, this
// virtual method is called. Application servers can
// cleanup stuff assoicated with the particular
// Connection in this method.
// 3. HandleNewRequest:- This method is called upon receipt of a new Packet
// from the server. Application server can parse the
// request in this method and queue it for further
// processing or send back response.
// 4. HandleSentResponse:- This method is called after a response that is
// sent by the Application server using the
// SendResponse method is finally sent. Application
// server can examine the status code that is sent
// in this message to see if the response was sent
// properly or not and update their state.
// Application servers can use the SendResponse method to send a
// message/response to the client. They can terminate the connection using
// the CloseConnection method.
// The server class can be used as both a single threaded server or with a
// thread pool to service requests. In case where a thread pool is specified,
// the server will run HandleNewConnection, HandleConnectionClose,
// HandleNewRequest and HandleSentResponse in the thread pool.
// An example server can be seen at network/tests/echoserver.h/cpp
///////////////////////////////////////////////////////////////////////////////
#ifndef SERVER_H_
#define SERVER_H_
#include <google/protobuf/message.h>
#include <google/protobuf/repeated_field.h>
#include <functional>
#include <iostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <typeindex>
#include <list>
#include "basics/basics.h"
#include "glog/logging.h"
#include "network/connection.h"
#include "network/baseserver.h"
#include "network/baseconnection.h"
#include "network/event_loop.h"
#include "network/networkoptions.h"
#include "network/network_error.h"
#include "network/packet.h"
/*
* Server class definition
* Given a host/port, the server binds and listens on that host/port
* It calls various virtual methods when events happen. The events are
* HandleNewConnection:- Upon a new connection accept, we invoke
* this method. In this method derived classes can
* send greeting messages, close the connection, etc
* HandleConnectionClose:- We call this function after closing the connection.
* Derived classes can cleanup stuff in this method
* HandleNewRequest:- Whenever a new packet arrives, we call this method.
* Derived classes can parse stuff in this method.
* HandleSentResponse:- Whenever we send a response to the client, we call
* this method. Derived classes can update their state, etc.
* Derived classes can use the SendResponse method to send a packet.
* They can use the CloseConnection to explicitly close a connection.
* Note that during this method, the Server will call the
* HandleConnectionClose as well.
*/
class Server : public BaseServer {
public:
// Constructor
// The Constructor simply inits the member variable.
// Users must call Start method to start sending/receiving packets.
Server(EventLoop* eventLoop, const NetworkOptions& options);
// Destructor.
virtual ~Server();
// Start listening on the host port pair for new requests
// A zero return value means success. A negative value implies
// that the server could not bind/listen on the port.
sp_int32 Start();
// Close all active connections and stop listening.
// A zero return value means success. No more new connections
// will be accepted from this point onwards. All active
// connections will be closed. This might result in responses
// that were sent using SendResponse but not yet acked by HandleSentResponse
// being discarded.
// A negative return value implies some error happened.
sp_int32 Stop();
// Send a response back to the client. This is the primary way of
// communicating with the client.
// When the method returns it doesn't mean that the packet was sent out.
// but that it was merely queued up. Server now owns the response object
void SendResponse(REQID id, Connection* connection, const google::protobuf::Message& response);
// Send a message to initiate a non request-response style communication
// message is now owned by the Server class
void SendMessage(Connection* connection, const google::protobuf::Message& message);
void SendMessage(Connection* _connection,
sp_int32 _byte_size,
const sp_string _type_name,
const char* _message);
// Close a connection. This function doesn't return anything.
// When the connection is attempted to be closed(which can happen
// at a later time if using thread pool), The HandleConnectionClose
// will contain a status of how the closing process went.
void CloseConnection(Connection* connection);
// Add a timer function to be called after msecs microseconds.
void AddTimer(VCallback<> cb, sp_int64 msecs);
// Register a handler for a particular request type
template <typename T, typename M>
void InstallRequestHandler(void (T::*method)(REQID id, Connection* conn, M*)) {
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
requestHandlers[m->GetTypeName()] = std::bind(&Server::dispatchRequest<T, M>, this, t, method,
std::placeholders::_1, std::placeholders::_2);
delete m;
}
// Register a handler for a particular message type
template <typename T, typename M>
void InstallMessageHandler(void (T::*method)(Connection* conn, M*)) {
google::protobuf::Message* m = new M();
T* t = static_cast<T*>(this);
messageHandlers[m->GetTypeName()] = std::bind(&Server::dispatchMessage<T, M>, this, t, method,
std::placeholders::_1, std::placeholders::_2);
delete m;
}
// One can also send requests to the client
void SendRequest(Connection* _conn, google::protobuf::Message* _request, void* _ctx,
google::protobuf::Message* _response_placeholder);
void SendRequest(Connection* _conn, google::protobuf::Message* _request, void* _ctx,
sp_int64 _msecs, google::protobuf::Message* _response_placeholder);
// Backpressure handler
virtual void StartBackPressureConnectionCb(Connection* connection);
// Backpressure Reliever
virtual void StopBackPressureConnectionCb(Connection* _connection);
// Return the underlying EventLoop.
EventLoop* getEventLoop() { return eventLoop_; }
protected:
// Called when a new connection is accepted.
virtual void HandleNewConnection(Connection* newConnection) = 0;
// Called when a connection is closed.
// The connection object must not be used by the application after this call.
virtual void HandleConnectionClose(Connection* connection, NetworkErrorCode _status) = 0;
// Handle the responses for any sent requests
// We provide a basic handler that just deletes the response
virtual void HandleResponse(google::protobuf::Message* _response, void* _ctx,
NetworkErrorCode _status);
public:
// The interfaces implemented of the BaseServer
// Create the connection
BaseConnection* CreateConnection(ConnectionEndPoint* endpoint, ConnectionOptions* options,
EventLoop* ss);
// Called when connection is accepted
virtual void HandleNewConnection_Base(BaseConnection* newConnection);
// Called when the connection is closed
virtual void HandleConnectionClose_Base(BaseConnection* connection, NetworkErrorCode _status);
private:
// When a new packet arrives on the connection, this is invoked by the Connection
void OnNewPacket(Connection* connection, IncomingPacket* packet);
void InternalSendResponse(Connection* _connection, OutgoingPacket* _packet);
template <typename T, typename M>
void dispatchRequest(T* _t, void (T::*method)(REQID id, Connection* conn, M*), Connection* _conn,
IncomingPacket* _ipkt) {
REQID rid;
CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
M* m = nullptr;
m = __global_protobuf_pool_acquire__(m);
if (_ipkt->UnPackProtocolBuffer(m) != 0) {
// We could not decode the pb properly
std::cerr << "Could not decode protocol buffer of type " << m->GetTypeName();
__global_protobuf_pool_release__(m);
CloseConnection(_conn);
return;
}
CHECK(m->IsInitialized());
std::function<void()> cb = std::bind(method, _t, rid, _conn, m);
cb();
}
template <typename T, typename M>
void dispatchMessage(T* _t, void (T::*method)(Connection* conn, M*), Connection* _conn,
IncomingPacket* _ipkt) {
REQID rid;
CHECK(_ipkt->UnPackREQID(&rid) == 0) << "REQID unpacking failed";
M* m = nullptr;
m = __global_protobuf_pool_acquire__(m);
if (_ipkt->UnPackProtocolBuffer(m) != 0) {
// We could not decode the pb properly
std::cerr << "Could not decode protocol buffer of type " << m->GetTypeName();
__global_protobuf_pool_release__(m);
CloseConnection(_conn);
return;
}
std::function<void()> cb = std::bind(method, _t, _conn, m);
cb();
}
void InternalSendRequest(Connection* _conn, google::protobuf::Message* _request, sp_int64 _msecs,
google::protobuf::Message* _response_placeholder, void* _ctx);
void OnPacketTimer(REQID _id, EventLoop::Status status);
typedef std::function<void(Connection*, IncomingPacket*)> handler;
std::unordered_map<std::string, handler> requestHandlers;
std::unordered_map<std::string, handler> messageHandlers;
// For acting like a client
std::unordered_map<REQID, std::pair<google::protobuf::Message*, void*> > context_map_;
REQID_Generator* request_rid_gen_;
};
#endif // SERVER_H_