blob: ea394c80452e1c2ad0a03beb22d7af46610d8138 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/thrift-config.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/transport/PlatformSocket.h>
#include <algorithm>
#include <iostream>
#ifdef HAVE_POLL_H
#include <poll.h>
#elif HAVE_SYS_POLL_H
#include <sys/poll.h>
#elif HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#ifdef HAVE_NETDB_H
#include <netdb.h>
#endif
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include <assert.h>
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
#endif
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
#endif
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
namespace apache {
namespace thrift {
namespace server {
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::concurrency;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransportException;
using std::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
/**
* Five states for the nonblocking server:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
/**
* Represents a connection that is handled via libevent. This connection
* essentially encapsulates a socket that has some associated libevent state.
*/
class TNonblockingServer::TConnection {
private:
/// Server IO Thread handling this connection
TNonblockingIOThread* ioThread_;
/// Server handle
TNonblockingServer* server_;
/// TProcessor
std::shared_ptr<TProcessor> processor_;
/// Object wrapping network socket
std::shared_ptr<TSocket> tSocket_;
/// Libevent object
struct event event_;
/// Libevent flags
short eventFlags_;
/// Socket mode
TSocketState socketState_;
/// Application state
TAppState appState_;
/// How much data needed to read
uint32_t readWant_;
/// Where in the read buffer are we
uint32_t readBufferPos_;
/// Read buffer
uint8_t* readBuffer_;
/// Read buffer size
uint32_t readBufferSize_;
/// Write buffer
uint8_t* writeBuffer_;
/// Write buffer size
uint32_t writeBufferSize_;
/// How far through writing are we?
uint32_t writeBufferPos_;
/// Largest size of write buffer seen since buffer was constructed
size_t largestWriteBufferSize_;
/// Count of the number of calls for use with getResizeBufferEveryN().
int32_t callsForResize_;
/// Transport to read from
std::shared_ptr<TMemoryBuffer> inputTransport_;
/// Transport that processor writes to
std::shared_ptr<TMemoryBuffer> outputTransport_;
/// extra transport generated by transport factory (e.g. BufferedRouterTransport)
std::shared_ptr<TTransport> factoryInputTransport_;
std::shared_ptr<TTransport> factoryOutputTransport_;
/// Protocol decoder
std::shared_ptr<TProtocol> inputProtocol_;
/// Protocol encoder
std::shared_ptr<TProtocol> outputProtocol_;
/// Server event handler, if any
std::shared_ptr<TServerEventHandler> serverEventHandler_;
/// Thrift call context, if any
void* connectionContext_;
/// Go into read mode
void setRead() { setFlags(EV_READ | EV_PERSIST); }
/// Go into write mode
void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
/// Set socket idle
void setIdle() { setFlags(0); }
/**
* Set event flags for this connection.
*
* @param eventFlags flags we pass to libevent for the connection.
*/
void setFlags(short eventFlags);
/**
* Libevent handler called (via our static wrapper) when the connection
* socket had something happen. Rather than use the flags libevent passed,
* we use the connection state to determine whether we need to read or
* write the socket.
*/
void workSocket();
public:
class Task;
/// Constructor
TConnection(std::shared_ptr<TSocket> socket,
TNonblockingIOThread* ioThread) {
readBuffer_ = nullptr;
readBufferSize_ = 0;
ioThread_ = ioThread;
server_ = ioThread->getServer();
// Allocate input and output transports these only need to be allocated
// once per TConnection (they don't need to be reallocated on init() call)
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(
new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
tSocket_ = socket;
init(ioThread);
}
~TConnection() { std::free(readBuffer_); }
/// Close this connection and free or reset its resources.
void close();
/**
* Check buffers against any size limits and shrink it if exceeded.
*
* @param readLimit we reduce read buffer size to this (if nonzero).
* @param writeLimit if nonzero and write buffer is larger, replace it.
*/
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
void init(TNonblockingIOThread* ioThread);
/// set socket for connection
void setSocket(std::shared_ptr<TSocket> socket);
/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
void transition();
/**
* C-callable event handler for connection events. Provides a callback
* that libevent can understand which invokes connection_->workSocket().
*
* @param fd the descriptor the event occurred on.
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TConnection's "this".
*/
static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
((TConnection*)v)->workSocket();
}
/**
* Notification to server that processing has ended on this request.
* Can be called either when processing is completed or when a waiting
* task has been preemptively terminated (on overload).
*
* Don't call this from the IO thread itself.
*
* @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
*/
bool notifyIOThread() { return ioThread_->notify(this); }
/*
* Returns the number of this connection's currently assigned IO
* thread.
*/
int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
/// Force connection shutdown for this connection.
void forceClose() {
appState_ = APP_CLOSE_CONNECTION;
if (!notifyIOThread()) {
server_->decrementActiveProcessors();
close();
throw TException("TConnection::forceClose: failed write on notify pipe");
}
}
/// return the server this connection was initialized for.
TNonblockingServer* getServer() const { return server_; }
/// get state of connection.
TAppState getState() const { return appState_; }
/// return the TSocket transport wrapping this network connection
std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
/// return the server event handler if any
std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
/// return the Thrift connection context if any
void* getConnectionContext() { return connectionContext_; }
};
class TNonblockingServer::TConnection::Task : public Runnable {
public:
Task(std::shared_ptr<TProcessor> processor,
std::shared_ptr<TProtocol> input,
std::shared_ptr<TProtocol> output,
TConnection* connection)
: processor_(processor),
input_(input),
output_(output),
connection_(connection),
serverEventHandler_(connection_->getServerEventHandler()),
connectionContext_(connection_->getConnectionContext()) {}
void run() override {
try {
for (;;) {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
}
if (!processor_->process(input_, output_, connectionContext_)
|| !input_->getTransport()->peek()) {
break;
}
}
} catch (const TTransportException& ttx) {
GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const std::bad_alloc&) {
GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(1);
} catch (const std::exception& x) {
GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(),
x.what());
} catch (...) {
GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
if (!connection_->notifyIOThread()) {
GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
connection_->server_->decrementActiveProcessors();
connection_->close();
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
TConnection* getTConnection() { return connection_; }
private:
std::shared_ptr<TProcessor> processor_;
std::shared_ptr<TProtocol> input_;
std::shared_ptr<TProtocol> output_;
TConnection* connection_;
std::shared_ptr<TServerEventHandler> serverEventHandler_;
void* connectionContext_;
};
void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
ioThread_ = ioThread;
server_ = ioThread->getServer();
appState_ = APP_INIT;
eventFlags_ = 0;
readBufferPos_ = 0;
readWant_ = 0;
writeBuffer_ = nullptr;
writeBufferSize_ = 0;
writeBufferPos_ = 0;
largestWriteBufferSize_ = 0;
socketState_ = SOCKET_RECV_FRAMING;
callsForResize_ = 0;
// get input/transports
factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
if (server_->getHeaderTransport()) {
inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
factoryOutputTransport_);
outputProtocol_ = inputProtocol_;
} else {
inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
}
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_) {
connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
} else {
connectionContext_ = nullptr;
}
// Get the processor
processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) {
tSocket_ = socket;
}
void TNonblockingServer::TConnection::workSocket() {
int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;
switch (socketState_) {
case SOCKET_RECV_FRAMING:
union {
uint8_t buf[sizeof(uint32_t)];
uint32_t size;
} framing;
// if we've already received some bytes we kept them here
framing.size = readWant_;
// determine size of this frame
try {
// Read from the socket
fetch = tSocket_->read(&framing.buf[readBufferPos_],
uint32_t(sizeof(framing.size) - readBufferPos_));
if (fetch == 0) {
// Whenever we get here it means a remote disconnect
close();
return;
}
readBufferPos_ += fetch;
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
return;
}
}
if (readBufferPos_ < sizeof(framing.size)) {
// more needed before frame size is known -- save what we have so far
readWant_ = framing.size;
return;
}
readWant_ = ntohl(framing.size);
if (readWant_ > server_->getMaxFrameSize()) {
// Don't allow giant frame sizes. This prevents bad clients from
// causing us to try and allocate a giant buffer.
GlobalOutput.printf(
"TNonblockingServer: frame size too large "
"(%" PRIu32 " > %" PRIu64
") from client %s. "
"Remote side not using TFramedTransport?",
readWant_,
(uint64_t)server_->getMaxFrameSize(),
tSocket_->getSocketInfo().c_str());
close();
return;
}
// size known; now get the rest of the frame
transition();
// If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
// regular sockets, because if there is more data, libevent will fire the event handler registered for read
// readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
// data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
// that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
// despite having more data.
if (tSocket_->hasPendingDataToRead())
{
workSocket();
}
return;
case SOCKET_RECV:
// It is an error to be in this state if we already have all the data
if (!(readBufferPos_ < readWant_)) {
GlobalOutput.printf("TNonblockingServer: frame size too short");
close();
return;
}
try {
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
} catch (TTransportException& te) {
//In Nonblocking SSLSocket some operations need to be retried again.
//Current approach is parsing exception message, but a better solution needs to be investigated.
if(!strstr(te.what(), "retry")) {
GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
close();
}
return;
}
if (got > 0) {
// Move along in the buffer
readBufferPos_ += got;
// Check that we did not overdo it
assert(readBufferPos_ <= readWant_);
// We are done reading, move onto the next state
if (readBufferPos_ == readWant_) {
transition();
}
return;
}
// Whenever we get down here it means a remote disconnect
close();
return;
case SOCKET_SEND:
// Should never have position past size
assert(writeBufferPos_ <= writeBufferSize_);
// If there is no data to send, then let us move on
if (writeBufferPos_ == writeBufferSize_) {
GlobalOutput("WARNING: Send state with no data to send");
transition();
return;
}
try {
left = writeBufferSize_ - writeBufferPos_;
sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
} catch (TTransportException& te) {
GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
close();
return;
}
writeBufferPos_ += sent;
// Did we overdo it?
assert(writeBufferPos_ <= writeBufferSize_);
// We are done!
if (writeBufferPos_ == writeBufferSize_) {
transition();
}
return;
default:
GlobalOutput.printf("Unexpected Socket State %d", socketState_);
assert(0);
}
}
bool TNonblockingServer::getHeaderTransport() {
// Currently if there is no output protocol factory,
// we assume header transport (without having to create
// a new transport and check)
return getOutputProtocolFactory() == nullptr;
}
/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
void TNonblockingServer::TConnection::transition() {
// ensure this connection is active right now
assert(ioThread_);
assert(server_);
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
if (server_->getHeaderTransport()) {
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
} else {
// We saved room for the framing size in case header transport needed it,
// but just skip it for the non-header case
inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
outputTransport_->resetBuffer();
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
}
server_->incrementActiveProcessors();
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
// Create task and dispatch to the thread manager
std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
new Task(processor_, inputProtocol_, outputProtocol_, this));
// The application is now waiting on the task to finish
appState_ = APP_WAIT_TASK;
// Set this connection idle so that libevent doesn't process more
// data on it while we're still waiting for the threadmanager to
// finish this task
setIdle();
try {
server_->addTask(task);
} catch (IllegalStateException& ise) {
// The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
server_->decrementActiveProcessors();
close();
} catch (TimedOutException& to) {
GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
server_->decrementActiveProcessors();
close();
}
return;
} else {
try {
if (serverEventHandler_) {
serverEventHandler_->processContext(connectionContext_, getTSocket());
}
// Invoke the processor
processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
} catch (const TTransportException& ttx) {
GlobalOutput.printf(
"TNonblockingServer transport error in "
"process(): %s",
ttx.what());
server_->decrementActiveProcessors();
close();
return;
} catch (const std::exception& x) {
GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
typeid(x).name(),
x.what());
server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
server_->decrementActiveProcessors();
close();
return;
}
}
// fallthrough
// Intentionally fall through here, the call to process has written into
// the writeBuffer_
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
// Put the frame size into the write buffer
auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4);
// Socket into write mode
appState_ = APP_SEND_RESULT;
setWrite();
return;
}
// In this case, the request was oneway and we should fall through
// right back into the read frame header state
goto LABEL_APP_INIT;
case APP_SEND_RESULT:
// it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
// fallthrough
// N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
// Clear write buffer variables
writeBuffer_ = nullptr;
writeBufferPos_ = 0;
writeBufferSize_ = 0;
// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
readBufferPos_ = 0;
// Register read event
setRead();
return;
case APP_READ_FRAME_SIZE:
readWant_ += 4;
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2;
}
auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == nullptr) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}
readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
// Move into read request state
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
return;
case APP_CLOSE_CONNECTION:
server_->decrementActiveProcessors();
close();
return;
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
}
}
void TNonblockingServer::TConnection::setFlags(short eventFlags) {
// Catch the do nothing case
if (eventFlags_ == eventFlags) {
return;
}
// Delete a previously existing event
if (eventFlags_ && event_del(&event_) == -1) {
GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
return;
}
// Update in memory structure
eventFlags_ = eventFlags;
// Do not call event_set if there are no flags
if (!eventFlags_) {
return;
}
/*
* event_set:
*
* Prepares the event structure &event to be used in future calls to
* event_add() and event_del(). The event will be prepared to call the
* eventHandler using the 'sock' file descriptor to monitor events.
*
* The events can be either EV_READ, EV_WRITE, or both, indicating
* that an application can read or write from the file respectively without
* blocking.
*
* The eventHandler will be called with the file descriptor that triggered
* the event and the type of event which will be one of: EV_TIMEOUT,
* EV_SIGNAL, EV_READ, EV_WRITE.
*
* The additional flag EV_PERSIST makes an event_add() persistent until
* event_del() has been called.
*
* Once initialized, the &event struct can be used repeatedly with
* event_add() and event_del() and does not need to be reinitialized unless
* the eventHandler and/or the argument to it are to be changed. However,
* when an ev structure has been added to libevent using event_add() the
* structure must persist until the event occurs (assuming EV_PERSIST
* is not set) or is removed using event_del(). You may not reuse the same
* ev structure for multiple monitored descriptors; each descriptor needs
* its own ev.
*/
event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
if (event_add(&event_, nullptr) == -1) {
GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
}
}
/**
* Closes a connection
*/
void TNonblockingServer::TConnection::close() {
setIdle();
if (serverEventHandler_) {
serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
}
ioThread_ = nullptr;
// Close the socket
tSocket_->close();
// close any factory produced transports
factoryInputTransport_->close();
factoryOutputTransport_->close();
// release processor and handler
processor_.reset();
// Give this object back to the server that owns it
server_->returnConnection(this);
}
void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
free(readBuffer_);
readBuffer_ = nullptr;
readBufferSize_ = 0;
}
if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
// just start over
outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
largestWriteBufferSize_ = 0;
}
}
TNonblockingServer::~TNonblockingServer() {
// Close any active connections (moves them to the idle connection stack)
while (activeConnections_.size()) {
activeConnections_.front()->close();
}
// Clean up unused TConnection objects in connectionStack_
while (!connectionStack_.empty()) {
TConnection* connection = connectionStack_.top();
connectionStack_.pop();
delete connection;
}
// The TNonblockingIOThread objects have shared_ptrs to the Thread
// objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
// objects (as runnable) so these objects will never deallocate without help.
while (!ioThreads_.empty()) {
std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
ioThreads_.pop_back();
iot->setThread(std::shared_ptr<Thread>());
}
}
/**
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) {
// Check the stack
Guard g(connMutex_);
// pick an IO thread to handle this connection -- currently round robin
assert(nextIOThread_ < ioThreads_.size());
int selectedThreadIdx = nextIOThread_;
nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
// Check the connection stack to see if we can re-use
TConnection* result = nullptr;
if (connectionStack_.empty()) {
result = new TConnection(socket, ioThread);
++numTConnections_;
} else {
result = connectionStack_.top();
connectionStack_.pop();
result->setSocket(socket);
result->init(ioThread);
}
activeConnections_.push_back(result);
return result;
}
/**
* Returns a connection to the stack
*/
void TNonblockingServer::returnConnection(TConnection* connection) {
Guard g(connMutex_);
activeConnections_.erase(std::remove(activeConnections_.begin(),
activeConnections_.end(),
connection),
activeConnections_.end());
if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
--numTConnections_;
} else {
connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
connectionStack_.push(connection);
}
}
/**
* Server socket had something happen. We accept all waiting client
* connections on fd and assign TConnection objects to handle those requests.
*/
void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
(void)which;
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
// Going to accept a new client socket
std::shared_ptr<TSocket> clientSocket;
clientSocket = serverTransport_->accept();
if (clientSocket) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
clientSocket->close();
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
clientSocket->close();
return;
}
}
}
// Create a new TConnection for this client socket.
TConnection* clientConnection = createConnection(clientSocket);
// Fail fast if we could not create a TConnection object
if (clientConnection == nullptr) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
clientSocket->close();
return;
}
/*
* Either notify the ioThread that is assigned this connection to
* start processing, or if it is us, we'll just ask this
* connection to do its initial state change here.
*
* (We need to avoid writing to our own notification pipe, to
* avoid possible deadlocks if the pipe is full.)
*
* The IO thread #0 is the only one that handles these listen
* events, so unless the connection has been assigned to thread #0
* we know it's not on our thread.
*/
if (clientConnection->getIOThreadNumber() == 0) {
clientConnection->transition();
} else {
if (!clientConnection->notifyIOThread()) {
GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
clientConnection->close();
}
}
}
}
/**
* Creates a socket to listen on and binds it to the local port.
*/
void TNonblockingServer::createAndListenOnSocket() {
serverTransport_->listen();
serverSocket_ = serverTransport_->getSocketFD();
}
void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager) {
threadManager->setExpireCallback(
std::bind(&TNonblockingServer::expireClose,
this,
std::placeholders::_1));
threadPoolProcessing_ = true;
} else {
threadPoolProcessing_ = false;
}
}
bool TNonblockingServer::serverOverloaded() {
size_t activeConnections = numTConnections_ - connectionStack_.size();
if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
if (!overloaded_) {
GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
&& (activeConnections <= overloadHysteresis_ * maxConnections_)) {
GlobalOutput.printf(
"TNonblockingServer: overload ended; "
"%u dropped (%llu total)",
nConnectionsDropped_,
nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
}
}
return overloaded_;
}
bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
std::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
}
return false;
}
void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) {
TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
void TNonblockingServer::stop() {
// Breaks the event loop in all threads so that they end ASAP.
for (auto & ioThread : ioThreads_) {
ioThread->stop();
}
}
void TNonblockingServer::registerEvents(event_base* user_event_base) {
userEventBase_ = user_event_base;
// init listen socket
if (serverSocket_ == THRIFT_INVALID_SOCKET)
createAndListenOnSocket();
// set up the IO threads
assert(ioThreads_.empty());
if (!numIOThreads_) {
numIOThreads_ = DEFAULT_IO_THREADS;
}
// User-provided event-base doesn't works for multi-threaded servers
assert(numIOThreads_ == 1 || !userEventBase_);
for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
ioThreads_.push_back(thread);
}
// Notify handler of the preServe event
if (eventHandler_) {
eventHandler_->preServe();
}
// Start all of our helper IO threads. Note that the threads run forever,
// only terminating if stop() is called.
assert(ioThreads_.size() == numIOThreads_);
assert(ioThreads_.size() > 0);
GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
ioThreads_.size());
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
ioThreadFactory_.reset(new ThreadFactory(
false // detached
));
assert(ioThreadFactory_.get());
// intentionally starting at thread 1, not 0
for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
ioThreads_[i]->setThread(thread);
thread->start();
}
}
// Register the events for the primary (listener) IO thread
ioThreads_[0]->registerEvents();
}
/**
* Main workhorse function, starts up the server listening on a port and
* loops over the libevent handler.
*/
void TNonblockingServer::serve() {
if (ioThreads_.empty())
registerEvents(nullptr);
// Run the primary (listener) IO thread loop in our main thread; this will
// only return when the server is shutting down.
ioThreads_[0]->run();
// Ensure all threads are finished before exiting serve()
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}
TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
int number,
THRIFT_SOCKET listenSocket,
bool useHighPriority)
: server_(server),
number_(number),
threadId_{},
listenSocket_(listenSocket),
useHighPriority_(useHighPriority),
eventBase_(nullptr),
ownEventBase_(false),
serverEvent_{},
notificationEvent_{} {
notificationPipeFDs_[0] = -1;
notificationPipeFDs_[1] = -1;
}
TNonblockingIOThread::~TNonblockingIOThread() {
// make sure our associated thread is fully finished
join();
if (eventBase_ && ownEventBase_) {
event_base_free(eventBase_);
ownEventBase_ = false;
}
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
}
listenSocket_ = THRIFT_INVALID_SOCKET;
}
for (auto notificationPipeFD : notificationPipeFDs_) {
if (notificationPipeFD >= 0) {
if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) {
GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
THRIFT_GET_SOCKET_ERROR);
}
notificationPipeFD = THRIFT_INVALID_SOCKET;
}
}
}
void TNonblockingIOThread::createNotificationPipe() {
if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
|| evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
}
for (auto notificationPipeFD : notificationPipeFDs_) {
#if LIBEVENT_VERSION_NUMBER < 0x02000000
int flags;
if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
|| THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
#else
if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
#endif
::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
throw TException(
"TNonblockingServer::createNotificationPipe() "
"FD_CLOEXEC");
}
}
}
/**
* Register the core libevent events onto the proper base.
*/
void TNonblockingIOThread::registerEvents() {
threadId_ = Thread::get_current();
assert(eventBase_ == nullptr);
eventBase_ = getServer()->getUserEventBase();
if (eventBase_ == nullptr) {
eventBase_ = event_base_new();
ownEventBase_ = true;
}
// Print some libevent stats
if (number_ == 0) {
GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
event_get_version(),
event_base_get_method(eventBase_));
}
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_);
// Add the event and start up the server
if (-1 == event_add(&serverEvent_, nullptr)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on server listen event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
}
createNotificationPipe();
// Create an event to be notified when a task finishes
event_set(&notificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TNonblockingIOThread::notifyHandler,
this);
// Attach to the base
event_base_set(eventBase_, &notificationEvent_);
// Add the event and start up the server
if (-1 == event_add(&notificationEvent_, nullptr)) {
throw TException(
"TNonblockingServer::serve(): "
"event_add() failed on task-done notification event");
}
GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
}
bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
auto fd = getNotificationSendFD();
if (fd < 0) {
return false;
}
int ret = -1;
long kSize = sizeof(conn);
const char * pos = (const char *)const_cast_sockopt(&conn);
#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
struct pollfd pfd = {fd, POLLOUT, 0};
while (kSize > 0) {
pfd.revents = 0;
ret = poll(&pfd, 1, -1);
if (ret < 0) {
return false;
} else if (ret == 0) {
continue;
}
if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
::THRIFT_CLOSESOCKET(fd);
return false;
}
if (pfd.revents & POLLOUT) {
ret = send(fd, pos, kSize, 0);
if (ret < 0) {
if (errno == EAGAIN) {
continue;
}
::THRIFT_CLOSESOCKET(fd);
return false;
}
kSize -= ret;
pos += ret;
}
}
#else
fd_set wfds, efds;
while (kSize > 0) {
FD_ZERO(&wfds);
FD_ZERO(&efds);
FD_SET(fd, &wfds);
FD_SET(fd, &efds);
ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
if (ret < 0) {
return false;
} else if (ret == 0) {
continue;
}
if (FD_ISSET(fd, &efds)) {
::THRIFT_CLOSESOCKET(fd);
return false;
}
if (FD_ISSET(fd, &wfds)) {
ret = send(fd, pos, kSize, 0);
if (ret < 0) {
if (errno == EAGAIN) {
continue;
}
::THRIFT_CLOSESOCKET(fd);
return false;
}
kSize -= ret;
pos += ret;
}
}
#endif
return true;
}
/* static */
void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
auto* ioThread = (TNonblockingIOThread*)v;
assert(ioThread);
(void)which;
while (true) {
TNonblockingServer::TConnection* connection = nullptr;
const int kSize = sizeof(connection);
long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
if (nBytes == kSize) {
if (connection == nullptr) {
// this is the command to stop our thread, exit the handler!
ioThread->breakLoop(false);
return;
}
connection->transition();
} else if (nBytes > 0) {
// throw away these bytes and hope that next time we get a solid read
GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
ioThread->breakLoop(true);
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
&& THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
ioThread->breakLoop(true);
return;
}
// exit the loop
break;
}
}
}
void TNonblockingIOThread::breakLoop(bool error) {
if (error) {
GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
// TODO: figure out something better to do here, but for now kill the
// whole process.
GlobalOutput.printf("TNonblockingServer: aborting process.");
::abort();
}
// If we're running in the same thread, we can't use the notify(0)
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
if (!Thread::is_current(threadId_)) {
notify(nullptr);
} else {
// cause the loop to stop ASAP - even if it has things to do in it
event_base_loopbreak(eventBase_);
}
}
void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
bzero((void*)&sp, sizeof(sp));
int policy = SCHED_OTHER;
// If desired, set up high-priority sched params structure.
if (value) {
// FIFO scheduler, ranked above default SCHED_OTHER queue
policy = SCHED_FIFO;
// The priority only compares us to other SCHED_FIFO threads, so we
// just pick a random priority halfway between min & max.
const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
sp.sched_priority = priority;
}
// Actually set the sched params for the current thread.
if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
}
#else
THRIFT_UNUSED_VARIABLE(value);
#endif
}
void TNonblockingIOThread::run() {
if (eventBase_ == nullptr) {
registerEvents();
}
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}
if (eventBase_ != nullptr)
{
GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
// Run libevent engine, never returns, invokes calls to eventHandler
event_base_loop(eventBase_, 0);
if (useHighPriority_) {
setCurrentThreadHighPriority(false);
}
// cleans up our registered events
cleanupEvents();
}
GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
void TNonblockingIOThread::cleanupEvents() {
// stop the listen socket, if any
if (listenSocket_ != THRIFT_INVALID_SOCKET) {
if (event_del(&serverEvent_) == -1) {
GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
}
}
event_del(&notificationEvent_);
}
void TNonblockingIOThread::stop() {
// This should cause the thread to fall out of its event loop ASAP.
breakLoop(false);
}
void TNonblockingIOThread::join() {
// If this was a thread created by a factory (not the thread that called
// serve()), we join() it to make sure we shut down fully.
if (thread_) {
try {
// Note that it is safe to both join() ourselves twice, as well as join
// the current thread as the pthread implementation checks for deadlock.
thread_->join();
} catch (...) {
// swallow everything
}
}
}
}
}
} // apache::thrift::server