blob: 2eb76175635f8094940c7348456e0889ff176d94 [file] [log] [blame]
/**
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* @file InterceptPlugin.cc
*/
#include "tscpp/api/InterceptPlugin.h"
#include "ts/ts.h"
#include "logging_internal.h"
#include "tscpp/api/noncopyable.h"
#include "utils_internal.h"
#include <cstdlib>
#include <cerrno>
#ifndef INT64_MAX
#define INT64_MAX (9223372036854775807LL)
#endif
using namespace atscppapi;
using std::string;
/**
* @private
*/
struct InterceptPlugin::State {
TSCont cont_;
TSVConn net_vc_ = nullptr;
struct IoHandle {
TSVIO vio_ = nullptr;
TSIOBuffer buffer_ = nullptr;
TSIOBufferReader reader_ = nullptr;
IoHandle() = default;
;
~IoHandle()
{
if (reader_) {
TSIOBufferReaderFree(reader_);
}
if (buffer_) {
TSIOBufferDestroy(buffer_);
}
};
};
IoHandle input_;
IoHandle output_;
/** the API doesn't recognize end of input; so we have to explicitly
* figure out when to continue reading and when to stop */
TSHttpParser http_parser_;
int expected_body_size_ = 0;
int num_body_bytes_read_ = 0;
bool hdr_parsed_ = false;
TSMBuffer hdr_buf_ = nullptr;
TSMLoc hdr_loc_ = nullptr;
int num_bytes_written_ = 0;
std::shared_ptr<Mutex> plugin_mutex_;
InterceptPlugin *plugin_ = nullptr;
Headers request_headers_;
/** these two fields to be used by the continuation callback only */
TSEvent saved_event_ = TS_EVENT_NONE;
void *saved_edata_ = nullptr;
TSAction timeout_action_ = nullptr;
bool plugin_io_done_ = false;
State(TSCont cont, InterceptPlugin *plugin) : cont_(cont), plugin_(plugin)
{
plugin_mutex_ = plugin->getMutex();
http_parser_ = TSHttpParserCreate();
}
~State()
{
TSHttpParserDestroy(http_parser_);
if (hdr_loc_) {
TSHandleMLocRelease(hdr_buf_, TS_NULL_MLOC, hdr_loc_);
}
if (hdr_buf_) {
TSMBufferDestroy(hdr_buf_);
}
}
};
namespace
{
int handleEvents(TSCont cont, TSEvent event, void *edata);
void destroyCont(InterceptPlugin::State *state);
} // namespace
InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type type) : TransactionPlugin(transaction)
{
TSCont cont = TSContCreate(handleEvents, TSMutexCreate());
state_ = new State(cont, this);
TSContDataSet(cont, state_);
TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle());
if (type == SERVER_INTERCEPT) {
TSHttpTxnServerIntercept(cont, txn);
} else {
TSHttpTxnIntercept(cont, txn);
}
}
InterceptPlugin::~InterceptPlugin()
{
if (state_->cont_) {
LOG_DEBUG("Relying on callback for cleanup");
state_->plugin_ = nullptr; // prevent callback from invoking plugin
} else { // safe to cleanup
LOG_DEBUG("Normal cleanup");
delete state_;
}
}
bool
InterceptPlugin::produce(const void *data, int data_size)
{
std::lock_guard<Mutex> lock(*getMutex());
if (!state_->net_vc_) {
LOG_ERROR("Intercept not operational");
return false;
}
if (!state_->output_.buffer_) {
state_->output_.buffer_ = TSIOBufferCreate();
state_->output_.reader_ = TSIOBufferReaderAlloc(state_->output_.buffer_);
state_->output_.vio_ = TSVConnWrite(state_->net_vc_, state_->cont_, state_->output_.reader_, INT64_MAX);
}
int num_bytes_written = TSIOBufferWrite(state_->output_.buffer_, data, data_size);
if (num_bytes_written != data_size) {
LOG_ERROR("Error while writing to buffer! Attempted %d bytes but only wrote %d bytes", data_size, num_bytes_written);
return false;
}
TSVIOReenable(state_->output_.vio_);
state_->num_bytes_written_ += data_size;
LOG_DEBUG("Wrote %d bytes in response", data_size);
return true;
}
bool
InterceptPlugin::setOutputComplete()
{
std::lock_guard<Mutex> scopedLock(*getMutex());
if (!state_->net_vc_) {
LOG_ERROR("Intercept not operational");
return false;
}
if (!state_->output_.buffer_) {
LOG_ERROR("No output produced so far");
return false;
}
TSVIONBytesSet(state_->output_.vio_, state_->num_bytes_written_);
TSVIOReenable(state_->output_.vio_);
state_->plugin_io_done_ = true;
LOG_DEBUG("Response complete");
return true;
}
Headers &
InterceptPlugin::getRequestHeaders()
{
return state_->request_headers_;
}
TSSslConnection
InterceptPlugin::getSslConnection()
{
if (!state_->net_vc_) {
LOG_ERROR("Intercept Plugin is not ready to provide SSL Connection");
return nullptr;
}
return TSVConnSslConnectionGet(state_->net_vc_);
}
bool
InterceptPlugin::doRead()
{
int avail = TSIOBufferReaderAvail(state_->input_.reader_);
if (avail == TS_ERROR) {
LOG_ERROR("Error while getting number of bytes available");
return false;
}
int consumed = 0; // consumed is used to update the input buffers
if (avail > 0) {
int64_t num_body_bytes_in_block;
int64_t data_len; // size of all data (header + body) in a block
const char *data, *startptr;
TSIOBufferBlock block = TSIOBufferReaderStart(state_->input_.reader_);
while (block != nullptr) {
startptr = data = TSIOBufferBlockReadStart(block, state_->input_.reader_, &data_len);
num_body_bytes_in_block = 0;
if (!state_->hdr_parsed_) {
const char *endptr = data + data_len;
if (TSHttpHdrParseReq(state_->http_parser_, state_->hdr_buf_, state_->hdr_loc_, &data, endptr) == TS_PARSE_DONE) {
LOG_DEBUG("Parsed header");
string content_length_str = state_->request_headers_.value("Content-Length");
if (!content_length_str.empty()) {
const char *start_ptr = content_length_str.data();
char *end_ptr;
int content_length = strtol(start_ptr, &end_ptr, 10 /* base */);
if ((errno != ERANGE) && (end_ptr != start_ptr) && (*end_ptr == '\0')) {
LOG_DEBUG("Got content length: %d", content_length);
state_->expected_body_size_ = content_length;
} else {
LOG_ERROR("Invalid content length header [%s]; Assuming no content", content_length_str.c_str());
}
}
if (state_->request_headers_.value("Transfer-Encoding") == "chunked") {
// implementing a "dechunker" is non-trivial and in the real
// world, most browsers don't send chunked requests
LOG_ERROR("Support for chunked request not implemented! Assuming no body");
}
LOG_DEBUG("Expecting %d bytes of request body", state_->expected_body_size_);
state_->hdr_parsed_ = true;
// remaining data in this block is body; 'data' will be pointing to first byte of the body
num_body_bytes_in_block = endptr - data;
}
consume(string(startptr, data - startptr), InterceptPlugin::REQUEST_HEADER);
} else {
num_body_bytes_in_block = data_len;
}
if (num_body_bytes_in_block) {
state_->num_body_bytes_read_ += num_body_bytes_in_block;
consume(string(data, num_body_bytes_in_block), InterceptPlugin::REQUEST_BODY);
}
consumed += data_len;
block = TSIOBufferBlockNext(block);
}
}
LOG_DEBUG("Consumed %d bytes from input vio", consumed);
TSIOBufferReaderConsume(state_->input_.reader_, consumed);
// Modify the input VIO to reflect how much data we've completed.
TSVIONDoneSet(state_->input_.vio_, TSVIONDoneGet(state_->input_.vio_) + consumed);
if (isWebsocket()) {
TSVIOReenable(state_->input_.vio_);
return true;
}
if ((state_->hdr_parsed_) && (state_->num_body_bytes_read_ >= state_->expected_body_size_)) {
LOG_DEBUG("Completely read body");
if (state_->num_body_bytes_read_ > state_->expected_body_size_) {
LOG_ERROR("Read more data than specified in request");
// TODO: any further action required?
}
handleInputComplete();
} else {
LOG_DEBUG("Reenabling input vio as %d bytes still need to be read", state_->expected_body_size_ - state_->num_body_bytes_read_);
TSVIOReenable(state_->input_.vio_);
}
return true;
}
void
InterceptPlugin::handleEvent(int abstract_event, void *edata)
{
TSEvent event = static_cast<TSEvent>(abstract_event);
LOG_DEBUG("Received event %d", event);
switch (event) {
case TS_EVENT_NET_ACCEPT:
LOG_DEBUG("Handling net accept");
state_->net_vc_ = static_cast<TSVConn>(edata);
state_->input_.buffer_ = TSIOBufferCreate();
state_->input_.reader_ = TSIOBufferReaderAlloc(state_->input_.buffer_);
state_->input_.vio_ = TSVConnRead(state_->net_vc_, state_->cont_, state_->input_.buffer_,
INT64_MAX /* number of bytes to read - high value initially */);
state_->hdr_buf_ = TSMBufferCreate();
state_->hdr_loc_ = TSHttpHdrCreate(state_->hdr_buf_);
state_->request_headers_.reset(state_->hdr_buf_, state_->hdr_loc_);
TSHttpHdrTypeSet(state_->hdr_buf_, state_->hdr_loc_, TS_HTTP_TYPE_REQUEST);
break;
case TS_EVENT_VCONN_WRITE_READY: // nothing to do
LOG_DEBUG("Got write ready");
break;
case TS_EVENT_VCONN_READ_READY:
LOG_DEBUG("Handling read ready");
if (doRead()) {
break;
}
// else fall through into the next shut down cases
LOG_ERROR("Error while reading request!");
// fallthrough
case TS_EVENT_VCONN_READ_COMPLETE: // fall throughs intentional
case TS_EVENT_VCONN_WRITE_COMPLETE:
case TS_EVENT_VCONN_EOS:
case TS_EVENT_ERROR: // erroring out, nothing more to do
case TS_EVENT_NET_ACCEPT_FAILED: // somebody canceled the transaction
if (event == TS_EVENT_ERROR) {
LOG_ERROR("Unknown Error!");
} else if (event == TS_EVENT_NET_ACCEPT_FAILED) {
LOG_ERROR("Got net_accept_failed!");
}
LOG_DEBUG("Shutting down intercept");
destroyCont(state_);
break;
default:
LOG_ERROR("Unknown event %d", event);
}
}
namespace
{
class TryLockGuard
{
public:
TryLockGuard(Mutex &m) : _m(m), _isLocked(m.try_lock()) {}
bool
isLocked() const
{
return _isLocked;
}
~TryLockGuard()
{
if (_isLocked) {
_m.unlock();
}
}
private:
std::recursive_mutex &_m;
const bool _isLocked;
};
int
handleEvents(TSCont cont, TSEvent pristine_event, void *pristine_edata)
{
// Separating pristine and mutable data helps debugging
TSEvent event = pristine_event;
void *edata = pristine_edata;
InterceptPlugin::State *state = static_cast<InterceptPlugin::State *>(TSContDataGet(cont));
if (!state) { // plugin is done, return.
return 0;
}
TryLockGuard scopedTryLock(*(state->plugin_mutex_));
if (!scopedTryLock.isLocked()) {
LOG_ERROR("Couldn't get plugin lock. Will retry");
if (event != TS_EVENT_TIMEOUT) { // save only "non-retry" info
state->saved_event_ = event;
state->saved_edata_ = edata;
}
state->timeout_action_ = TSContScheduleOnPool(cont, 1, TS_THREAD_POOL_NET);
return 0;
}
if (event == TS_EVENT_TIMEOUT) { // we have a saved event to restore
state->timeout_action_ = nullptr;
if (state->plugin_io_done_) { // plugin is done, so can't send it saved event
event = TS_EVENT_VCONN_EOS; // fake completion
edata = nullptr;
} else {
event = state->saved_event_;
edata = state->saved_edata_;
}
}
if (state->plugin_) {
utils::internal::dispatchInterceptEvent(state->plugin_, event, edata);
} else { // plugin was destroyed before intercept was completed; cleaning up here
LOG_DEBUG("Cleaning up as intercept plugin is already destroyed");
destroyCont(state);
TSContDataSet(cont, nullptr);
delete state;
}
return 0;
}
void
destroyCont(InterceptPlugin::State *state)
{
if (state->net_vc_) {
TSVConnShutdown(state->net_vc_, 1, 1);
TSVConnClose(state->net_vc_);
state->net_vc_ = nullptr;
}
if (state->cont_) {
if (state->timeout_action_) {
TSActionCancel(state->timeout_action_);
state->timeout_action_ = nullptr;
}
TSContDestroy(state->cont_);
state->cont_ = nullptr;
}
}
} // namespace