blob: c115b603c618af0a559eddd423847ec52580fc93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "server.h"
#include <sstream>
#include "server_intercept.h"
#include "server_connection.h"
#include "connection_pool.h"
#include "ats_fastcgi.h"
using namespace ats_plugin;
uint UniqueRequesID::_id = 1;
bool
interceptTransferData(ServerIntercept *intercept, ServerConnection *server_conn)
{
TSIOBufferBlock block;
int64_t consumed = 0;
bool responseStatus = false;
std::ostringstream output;
// Walk the list of buffer blocks in from the read VIO.
for (block = TSIOBufferReaderStart(server_conn->readio.reader); block; block = TSIOBufferBlockNext(block)) {
int64_t remain = 0;
const char *ptr;
ptr = TSIOBufferBlockReadStart(block, server_conn->readio.reader, &remain);
if (remain) {
responseStatus = server_conn->fcgiRequest()->fcgiDecodeRecordChunk((uchar *)ptr, remain, output);
}
consumed += remain;
}
if (consumed) {
TSDebug(PLUGIN_NAME, "[%s] Read %ld bytes from server and writing it to client side.", __FUNCTION__, consumed);
TSIOBufferReaderConsume(server_conn->readio.reader, consumed);
}
TSVIONDoneSet(server_conn->readio.vio, TSVIONDoneGet(server_conn->readio.vio) + consumed);
std::string data = std::move(output.str());
// TODO(oschaaf): check this if statement.
if (data.size()) {
// std::cout << "Output: " << data << std::endl;
intercept->writeResponseChunkToATS(data);
}
return responseStatus;
}
static int
handlePHPConnectionEvents(TSCont contp, TSEvent event, void *edata)
{
TSDebug(PLUGIN_NAME, "[%s]: event( %d )\tEventName: %s\tContp: %p ", __FUNCTION__, event, TSHttpEventNameLookup(event), contp);
ServerConnectionInfo *conn_info = (ServerConnectionInfo *)TSContDataGet(contp);
Server *server = conn_info->server;
ServerConnection *server_connection = conn_info->server_connection;
switch (event) {
case TS_EVENT_NET_CONNECT: {
TSStatIntIncrement(InterceptGlobal::phpConnCount, 1);
server_connection->vc_ = (TSVConn)edata;
server_connection->setState(ServerConnection::READY);
TSDebug(PLUGIN_NAME, "%s: New Connection success, %p", __FUNCTION__, server_connection);
ServerIntercept *intercept = server->getIntercept(server_connection->requestId());
if (intercept)
server_connection->createFCGIClient(intercept);
} break;
case TS_EVENT_NET_CONNECT_FAILED: {
// Try reconnection with new connection.
// TODO: Have to stop trying to reconnect after some tries
TSStatIntIncrement(InterceptGlobal::phpConnCount, 1);
server->reConnect(server_connection->requestId());
server_connection->setState(ServerConnection::CLOSED);
server->connectionClosed(server_connection);
return TS_EVENT_NONE;
} break;
case TS_EVENT_VCONN_READ_READY: {
ServerIntercept *intercept = server->getIntercept(server_connection->requestId());
if (intercept && interceptTransferData(intercept, server_connection)) {
server_connection->setState(ServerConnection::COMPLETE);
intercept->setResponseOutputComplete();
TSStatIntIncrement(InterceptGlobal::respBegId, 1);
// TSContCall(TSVIOContGet(server_connection->readio.vio), TS_EVENT_VCONN_READ_COMPLETE, server_connection->readio.vio);
}
} break;
case TS_EVENT_VCONN_READ_COMPLETE: {
TSDebug(PLUGIN_NAME, "[%s]: ResponseComplete...Sending Response to client stream. _request_id: %d", __FUNCTION__,
server_connection->requestId());
ServerIntercept *intercept = server->getIntercept(server_connection->requestId());
server_connection->setState(ServerConnection::COMPLETE);
intercept->setResponseOutputComplete();
TSStatIntIncrement(InterceptGlobal::respBegId, 1);
} break;
case TS_EVENT_VCONN_WRITE_READY: {
if (server_connection->writeio.readEnable) {
TSContCall(TSVIOContGet(server_connection->writeio.vio), TS_EVENT_VCONN_WRITE_COMPLETE, server_connection->writeio.vio);
}
} break;
case TS_EVENT_VCONN_WRITE_COMPLETE: {
TSStatIntIncrement(InterceptGlobal::reqEndId, 1);
server_connection->readio.read(server_connection->vc_, server_connection->contp());
} break;
case TS_EVENT_VCONN_EOS: {
ServerIntercept *intercept = server->getIntercept(server_connection->requestId());
if (!server_connection->writeio.readEnable) {
TSDebug(PLUGIN_NAME, "[%s]: EOS Request Failed. _request_id: %d, connection: %p,maxConn: %d, requestCount: %d", __FUNCTION__,
server_connection->requestId(), server_connection, server_connection->maxRequests(),
server_connection->requestCount());
server->reConnect(server_connection->requestId());
server_connection->setState(ServerConnection::CLOSED);
server->connectionClosed(server_connection);
break;
}
if (server_connection->getState() != ServerConnection::COMPLETE)
if (intercept && !intercept->getOutputCompleteState()) {
TSDebug(PLUGIN_NAME, "[%s]: EOS intercept->setResponseOutputComplete, _request_id: %d, connection: %p", __FUNCTION__,
server_connection->requestId(), server_connection);
Transaction &transaction = utils::internal::getTransaction(intercept->_txn);
transaction.error("Internal server error");
}
server_connection->setState(ServerConnection::CLOSED);
server->connectionClosed(server_connection);
} break;
case TS_EVENT_ERROR: {
TSVConnAbort(server_connection->vc_, 1);
ServerIntercept *intercept = server->getIntercept(server_connection->requestId());
if (intercept) {
TSDebug(PLUGIN_NAME, "[%s]:ERROR intercept->setResponseOutputComplete", __FUNCTION__);
server_connection->setState(ServerConnection::CLOSED);
Transaction &transaction = utils::internal::getTransaction(intercept->_txn);
transaction.setStatusCode(HTTP_STATUS_BAD_GATEWAY);
transaction.error("Internal server error");
}
server->connectionClosed(server_connection);
} break;
default:
break;
}
return TS_EVENT_NONE;
}
void
ThreadData::createConnectionPool(Server *server)
{
_connection_pool = new ConnectionPool(server, handlePHPConnectionEvents);
}
Server *
Server::server()
{
return InterceptGlobal::gServer;
}
Server::Server() : _reqId_mutex(TSMutexCreate()), _intecept_mutex(TSMutexCreate()) {}
bool
Server::setupThreadLocalStorage()
{
int result = 0;
if ((result = pthread_key_create(&InterceptGlobal::threadKey, nullptr)) == 0) {
ThreadData *threadData;
if ((threadData = static_cast<ThreadData *>(pthread_getspecific(InterceptGlobal::threadKey))) == nullptr) {
threadData = new ThreadData(this);
if (pthread_setspecific(InterceptGlobal::threadKey, threadData)) {
TSDebug(PLUGIN_NAME, "[Server:%s] Unable to set threadData to the key", __FUNCTION__);
pthread_key_delete(InterceptGlobal::threadKey);
InterceptGlobal::threadKey = 0;
return false;
}
TSStatIntIncrement(InterceptGlobal::threadCount, 1);
TSDebug(PLUGIN_NAME, "[Server:%s] Data is set for this thread [threadData]%p [threadID]%lu", __FUNCTION__, threadData,
pthread_self());
}
return true;
}
TSDebug(PLUGIN_NAME, "[Server:%s] Could not create key", __FUNCTION__);
return false;
}
ServerIntercept *
Server::getIntercept(uint request_id)
{
TSMutexLock(_intecept_mutex);
auto itr = _intercept_list.find(request_id);
if (itr != _intercept_list.end()) {
TSMutexUnlock(_intecept_mutex);
return std::get<0>(itr->second);
}
TSMutexUnlock(_intecept_mutex);
return nullptr;
}
ServerConnection *
Server::getServerConnection(uint request_id)
{
TSMutexLock(_intecept_mutex);
auto itr = _intercept_list.find(request_id);
if (itr != _intercept_list.end()) {
TSMutexUnlock(_intecept_mutex);
return std::get<1>(itr->second);
}
TSMutexUnlock(_intecept_mutex);
return nullptr;
}
void
Server::removeIntercept(uint request_id)
{
ThreadData *tdata = static_cast<ThreadData *>(pthread_getspecific(InterceptGlobal::threadKey));
TSMutexLock(_intecept_mutex);
auto itr = _intercept_list.find(request_id);
if (itr != _intercept_list.end()) {
ServerConnection *serv_conn = std::get<1>(itr->second);
_intercept_list.erase(itr);
TSMutexUnlock(_intecept_mutex);
TSDebug(PLUGIN_NAME, "[Server:%s] ReqQueueLength:%d ,request_id: %d,ServerConn: %p ,max_requests: %d, req_count: %d ",
__FUNCTION__, tdata->getRequestQueue()->getSize(), serv_conn->requestId(), serv_conn, serv_conn->maxRequests(),
serv_conn->requestCount());
serv_conn->releaseFCGIClient();
serv_conn->setRequestId(0);
// TODO(oschaaf): fix hang and re-enable.
if (false && serv_conn->maxRequests() > serv_conn->requestCount()) {
tdata->getConnectionPool()->reuseConnection(serv_conn);
} else {
serv_conn->setState(ServerConnection::CLOSED);
connectionClosed(serv_conn);
}
ServerIntercept *intercept = tdata->getRequestQueue()->popFromQueue();
if (intercept) {
connect(intercept);
}
return;
}
TSMutexUnlock(_intecept_mutex);
return;
}
bool
Server::writeRequestHeader(uint request_id)
{
ServerConnection *server_conn = getServerConnection(request_id);
if (!server_conn) {
return false;
}
TSDebug(PLUGIN_NAME, "[Server::%s] : Write Request Header: _request_id: %d,ServerConn: %p", __FUNCTION__, request_id,
server_conn);
FCGIClientRequest *fcgiRequest = server_conn->fcgiRequest();
unsigned char *clientReq;
int reqLen = 0;
// TODO: possibly move all this as one function in server_connection
fcgiRequest->createBeginRequest();
clientReq = fcgiRequest->addClientRequest(reqLen);
bool endflag = false;
server_conn->writeio.phpWrite(server_conn->vc_, server_conn->contp(), clientReq, reqLen, endflag);
return true;
}
bool
Server::writeRequestBody(uint request_id, const string &data)
{
ServerConnection *server_conn = getServerConnection(request_id);
if (!server_conn) {
return false;
}
TSDebug(PLUGIN_NAME, "[Server::%s] : Write Request Body: request_id: %d,Server_conn: %p", __FUNCTION__, request_id, server_conn);
FCGIClientRequest *fcgiRequest = server_conn->fcgiRequest();
// TODO: possibly move all this as one function in server_connection
unsigned char *clientReq;
int reqLen = 0;
fcgiRequest->postData = data;
fcgiRequest->postBodyChunk();
clientReq = fcgiRequest->addClientRequest(reqLen);
bool endflag = false;
server_conn->writeio.phpWrite(server_conn->vc_, server_conn->contp(), clientReq, reqLen, endflag);
return true;
}
bool
Server::writeRequestBodyComplete(uint request_id)
{
ServerConnection *server_conn = getServerConnection(request_id);
if (!server_conn) {
return false;
}
TSDebug(PLUGIN_NAME, "[Server::%s] : Write Request Complete: request_id: %d,Server_conn: %p", __FUNCTION__, request_id,
server_conn);
FCGIClientRequest *fcgiRequest = server_conn->fcgiRequest();
// TODO: possibly move all this as one function in server_connection
unsigned char *clientReq;
int reqLen = 0;
fcgiRequest->emptyParam();
clientReq = fcgiRequest->addClientRequest(reqLen);
bool endflag = true;
server_conn->writeio.phpWrite(server_conn->vc_, server_conn->contp(), clientReq, reqLen, endflag);
return true;
}
const uint
Server::connect(ServerIntercept *intercept)
{
// Get connections from thread Local storage and use it or store it in thread Queue
ThreadData *tdata = static_cast<ThreadData *>(pthread_getspecific(InterceptGlobal::threadKey));
if (tdata) {
ServerConnection *conn = nullptr;
conn = tdata->getConnectionPool()->getAvailableConnection();
if (conn) {
initiateBackendConnection(intercept, conn);
return 0;
}
TSDebug(PLUGIN_NAME, "[Server:%s] : Added to RequestQueue. QueueSize: %d", __FUNCTION__, tdata->getRequestQueue()->getSize());
tdata->getRequestQueue()->addToQueue(intercept);
return 0;
}
return 1;
}
void
Server::reConnect(uint request_id)
{
ServerIntercept *intercept = getIntercept(request_id);
if (intercept) {
TSMutexLock(_intecept_mutex);
_intercept_list.erase(request_id);
TSMutexUnlock(_intecept_mutex);
connect(intercept);
TSDebug(PLUGIN_NAME, "[Server:%s]: Initiating reconnection...", __FUNCTION__);
}
}
void
Server::initiateBackendConnection(ServerIntercept *intercept, ServerConnection *conn)
{
TSMutexLock(_reqId_mutex);
const uint request_id = UniqueRequesID::getNext();
TSMutexUnlock(_reqId_mutex);
intercept->setRequestId(request_id);
conn->setRequestId(request_id);
TSMutexLock(_intecept_mutex);
_intercept_list[request_id] = std::make_tuple(intercept, conn);
TSMutexUnlock(_intecept_mutex);
TSDebug(PLUGIN_NAME, "[Server: %s] ServerConn: %p,_request_id: %d", __FUNCTION__, conn, request_id);
if (conn->getState() != ServerConnection::READY) {
TSDebug(PLUGIN_NAME, "[Server: %s] Setting up a new php Connection..", __FUNCTION__);
conn->createConnection();
return;
}
conn->createFCGIClient(intercept);
return;
}
void
Server::connectionClosed(ServerConnection *server_conn)
{
TSMutexLock(_intecept_mutex);
auto itr = _intercept_list.find(server_conn->requestId());
if (itr != _intercept_list.end()) {
_intercept_list.erase(itr);
}
TSMutexUnlock(_intecept_mutex);
ThreadData *tdata = static_cast<ThreadData *>(pthread_getspecific(InterceptGlobal::threadKey));
tdata->getConnectionPool()->connectionClosed(server_conn);
TSStatIntDecrement(InterceptGlobal::phpConnCount, 1);
}