blob: 35a2afd6d9e83db099f23996f6528aa110d5cb09 [file] [log] [blame]
/**
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/**
* @file TransformationPlugin.cc
*/
#include "tscpp/api/TransformationPlugin.h"
#include "ts/ts.h"
#include <cstddef>
#include <cinttypes>
#include "utils_internal.h"
#include "logging_internal.h"
#include "tscpp/api/noncopyable.h"
#include "tscpp/api/Continuation.h"
#ifndef INT64_MAX
#define INT64_MAX (9223372036854775807LL)
#endif
namespace atscppapi
{
namespace detail
{
class ResumeAfterPauseCont : public Continuation
{
public:
ResumeAfterPauseCont() : Continuation() {}
ResumeAfterPauseCont(Continuation::Mutex m) : Continuation(m) {}
protected:
int _run(TSEvent event, void *edata) override;
};
} // end namespace detail
/**
* @private
*/
struct TransformationPluginState : noncopyable, public detail::ResumeAfterPauseCont {
TSVConn vconn_;
Transaction &transaction_;
TransformationPlugin &transformation_plugin_;
TransformationPlugin::Type type_;
TSVIO output_vio_; // this gets initialized on an output().
TSHttpTxn txn_;
TSIOBuffer output_buffer_;
TSIOBufferReader output_buffer_reader_;
int64_t bytes_written_;
bool paused_;
// We can only send a single WRITE_COMPLETE even though
// we may receive an immediate event after we've sent a
// write complete, so we'll keep track of whether or not we've
// sent the input end our write complete.
bool input_complete_dispatched_;
std::string request_xform_output_; // in case of request xform, data produced is buffered here
TransformationPluginState(atscppapi::Transaction &transaction, TransformationPlugin &transformation_plugin,
TransformationPlugin::Type type, TSHttpTxn txn)
: detail::ResumeAfterPauseCont(TSMutexCreate()),
vconn_(nullptr),
transaction_(transaction),
transformation_plugin_(transformation_plugin),
type_(type),
output_vio_(nullptr),
txn_(txn),
output_buffer_(nullptr),
output_buffer_reader_(nullptr),
bytes_written_(0),
paused_(false),
input_complete_dispatched_(false)
{
output_buffer_ = TSIOBufferCreate();
output_buffer_reader_ = TSIOBufferReaderAlloc(output_buffer_);
};
~TransformationPluginState() override
{
if (output_buffer_reader_) {
TSIOBufferReaderFree(output_buffer_reader_);
output_buffer_reader_ = nullptr;
}
if (output_buffer_) {
TSIOBufferDestroy(output_buffer_);
output_buffer_ = nullptr;
}
}
};
} // end namespace atscppapi
using namespace atscppapi;
namespace
{
using ResumeAfterPauseCont = atscppapi::detail::ResumeAfterPauseCont;
void
cleanupTransformation(TSCont contp)
{
LOG_DEBUG("Destroying transformation contp=%p", contp);
TSContDataSet(contp, reinterpret_cast<void *>(0xDEADDEAD));
TSContDestroy(contp);
}
int
handleTransformationPluginRead(TSCont contp, TransformationPluginState *state)
{
// Traffic Server naming is quite confusing, in this context the write_vio
// is actually the vio we read from.
TSVIO write_vio = TSVConnWriteVIOGet(contp);
if (write_vio) {
if (state->paused_) {
LOG_DEBUG("Transformation contp=%p write_vio=%p, is paused", contp, write_vio);
return 0;
}
int64_t to_read = TSVIONTodoGet(write_vio);
LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64, contp, write_vio, to_read);
if (to_read > 0) {
/*
* The amount of data left to read needs to be truncated by
* the amount of data actually in the read buffer.
**/
int64_t avail = TSIOBufferReaderAvail(TSVIOReaderGet(write_vio));
LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64 ", buffer reader avail=%" PRId64, contp, write_vio,
to_read, avail);
if (to_read > avail) {
to_read = avail;
LOG_DEBUG("Transformation contp=%p write_vio=%p, to read > avail, fixing to_read to be equal to avail. to_read=%" PRId64
", buffer reader avail=%" PRId64,
contp, write_vio, to_read, avail);
}
if (to_read > 0) {
/* Create a buffer and a buffer reader */
TSIOBuffer input_buffer = TSIOBufferCreate();
TSIOBufferReader input_reader = TSIOBufferReaderAlloc(input_buffer);
/* Copy the data from the read buffer to the input buffer. */
TSIOBufferCopy(input_buffer, TSVIOReaderGet(write_vio), to_read, 0);
/* Tell the read buffer that we have read the data and are no
longer interested in it. */
TSIOBufferReaderConsume(TSVIOReaderGet(write_vio), to_read);
/* Modify the read VIO to reflect how much data we've completed. */
TSVIONDoneSet(write_vio, TSVIONDoneGet(write_vio) + to_read);
std::string in_data = utils::internal::consumeFromTSIOBufferReader(input_reader);
LOG_DEBUG("Transformation contp=%p write_vio=%p consumed %ld bytes from bufferreader", contp, write_vio, in_data.length());
/* Clean up the buffer and reader */
TSIOBufferReaderFree(input_reader);
TSIOBufferDestroy(input_buffer);
/* Now call the client to tell them about data */
if (in_data.length() > 0) {
state->transformation_plugin_.consume(in_data);
}
}
/* now that we've finished reading we will check if there is anything left to read. */
TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null
if (TSVIONTodoGet(write_vio) > 0) {
LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p still has bytes left to process, todo > 0.", contp, write_vio,
vio_cont);
if (to_read > 0) {
TSVIOReenable(write_vio);
/* Call back the read VIO continuation to let it know that we are ready for more data. */
if (vio_cont) {
TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_READY), write_vio);
}
}
} else {
LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process, will send WRITE_COMPLETE.",
contp, write_vio, vio_cont);
/* Call back the write VIO continuation to let it know that we have completed the write operation. */
if (!state->input_complete_dispatched_) {
state->transformation_plugin_.handleInputComplete();
state->input_complete_dispatched_ = true;
if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
}
}
}
} else {
TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null?
LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process.", contp, write_vio, vio_cont);
/* Call back the write VIO continuation to let it know that we have completed the write operation. */
if (!state->input_complete_dispatched_) {
state->transformation_plugin_.handleInputComplete();
state->input_complete_dispatched_ = true;
if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
}
}
}
} else {
LOG_ERROR("Transformation contp=%p write_vio=%p was nullptr!", contp, write_vio);
}
return 0;
}
int
handleTransformationPluginEvents(TSCont contp, TSEvent event, void *edata)
{
TransformationPluginState *state = static_cast<TransformationPluginState *>(TSContDataGet(contp));
LOG_DEBUG("Transformation contp=%p event=%d edata=%p tshttptxn=%p", contp, event, edata, state->txn_);
// The first thing you always do is check if the VConn is closed.
int connection_closed = TSVConnClosedGet(state->vconn_);
if (connection_closed) {
LOG_DEBUG("Transformation contp=%p tshttptxn=%p is closed connection_closed=%d ", contp, state->txn_, connection_closed);
// we'll do the cleanupTransformation in the TransformationPlugin destructor.
return 0;
}
if (event == TS_EVENT_VCONN_WRITE_COMPLETE) {
TSVConn output_vconn = TSTransformOutputVConnGet(state->vconn_);
LOG_DEBUG("Transformation contp=%p tshttptxn=%p received WRITE_COMPLETE, shutting down outputvconn=%p ", contp, state->txn_,
output_vconn);
TSVConnShutdown(output_vconn, 0, 1); // The other end is done reading our output
return 0;
} else if (event == TS_EVENT_ERROR) {
TSVIO write_vio;
/* Get the write VIO for the write operation that was
performed on ourself. This VIO contains the continuation of
our parent transformation. */
write_vio = TSVConnWriteVIOGet(state->vconn_);
TSCont vio_cont = TSVIOContGet(write_vio);
LOG_ERROR("Transformation contp=%p tshttptxn=%p received EVENT_ERROR forwarding to write_vio=%p viocont=%p", contp, state->txn_,
write_vio, vio_cont);
if (vio_cont) {
TSContCall(vio_cont, TS_EVENT_ERROR, write_vio);
}
return 0;
}
// All other events including WRITE_READY will just attempt to transform more data.
return handleTransformationPluginRead(state->vconn_, state);
}
} /* anonymous namespace */
TransformationPlugin::TransformationPlugin(Transaction &transaction, TransformationPlugin::Type type)
: TransactionPlugin(transaction), state_(nullptr)
{
state_ =
std::make_unique<TransformationPluginState>(transaction, *this, type, static_cast<TSHttpTxn>(transaction.getAtsHandle()));
state_->vconn_ = TSTransformCreate(handleTransformationPluginEvents, state_->txn_);
TSContDataSet(state_->vconn_, static_cast<void *>(state_.get())); // edata in a TransformationHandler is NOT a TSHttpTxn.
LOG_DEBUG("Creating TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p transformation_type=%d", this, state_->vconn_,
state_->txn_, type);
TSHttpTxnHookAdd(state_->txn_, utils::internal::convertInternalTransformationTypeToTsHook(type), state_->vconn_);
}
TransformationPlugin::~TransformationPlugin()
{
LOG_DEBUG("Destroying TransformationPlugin=%p", this);
cleanupTransformation(state_->vconn_);
}
void
TransformationPlugin::pause()
{
if (state_->paused_) {
LOG_ERROR("Can not pause transformation, already paused TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
state_->vconn_, state_->txn_);
} else if (state_->input_complete_dispatched_) {
LOG_ERROR("Can not pause transformation (transformation completed) TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
state_->vconn_, state_->txn_);
} else {
state_->paused_ = true;
if (!static_cast<bool>(static_cast<ResumeAfterPauseCont *>(state_.get()))) {
*static_cast<ResumeAfterPauseCont *>(state_.get()) =
ResumeAfterPauseCont(TSContMutexGet(reinterpret_cast<TSCont>(state_->txn_)));
}
}
}
bool
TransformationPlugin::isPaused() const
{
return state_->paused_;
}
Continuation &
TransformationPlugin::resumeCont()
{
TSReleaseAssert(state_->paused_);
// The cast to a pointer to the intermediate base class ResumeAfterPauseCont is not strictly necessary. It is
// possible that the transform plugin might want to defer work to other continuations in the future. This would
// naturally result in TransactionPluginState having Continuation as an indirect base class multiple times, making
// disambiguation necessary when converting.
//
return *static_cast<ResumeAfterPauseCont *>(state_.get());
}
int
ResumeAfterPauseCont::_run(TSEvent /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */)
{
auto state = static_cast<TransformationPluginState *>(this);
state->paused_ = false;
handleTransformationPluginRead(state->vconn_, state);
return TS_SUCCESS;
}
size_t
TransformationPlugin::doProduce(std::string_view data)
{
LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p producing output with length=%ld", this, state_->txn_, data.length());
int64_t write_length = static_cast<int64_t>(data.length());
if (!write_length) {
return 0;
}
if (!state_->output_vio_) {
TSVConn output_vconn = TSTransformOutputVConnGet(state_->vconn_);
LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p will issue a TSVConnWrite, output_vconn=%p.", this, state_->txn_, output_vconn);
if (output_vconn) {
// If you're confused about the following reference the traffic server transformation docs.
// You always write INT64_MAX, this basically says you're not sure how much data you're going to write
state_->output_vio_ = TSVConnWrite(output_vconn, state_->vconn_, state_->output_buffer_reader_, INT64_MAX);
} else {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p output_vconn=%p cannot issue TSVConnWrite due to null output vconn.", this,
state_->txn_, output_vconn);
return 0;
}
if (!state_->output_vio_) {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p state_->output_vio=%p, TSVConnWrite failed.", this, state_->txn_,
state_->output_vio_);
return 0;
}
}
// Finally we can copy this data into the output_buffer
int64_t bytes_written = TSIOBufferWrite(state_->output_buffer_, data.data(), write_length);
state_->bytes_written_ += bytes_written; // So we can set BytesDone on outputComplete().
LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p write to TSIOBuffer %" PRId64 " bytes total bytes written %" PRId64, this,
state_->txn_, bytes_written, state_->bytes_written_);
// Sanity Checks
if (bytes_written != write_length) {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p bytes written < expected. bytes_written=%" PRId64 " write_length=%" PRId64,
this, state_->txn_, bytes_written, write_length);
}
int connection_closed = TSVConnClosedGet(state_->vconn_);
LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d", this, state_->txn_, state_->vconn_,
connection_closed);
if (!connection_closed) {
TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
} else {
LOG_ERROR(
"TransformationPlugin=%p tshttptxn=%p output_vio=%p connection_closed=%d : Couldn't reenable output vio (connection closed).",
this, state_->txn_, state_->output_vio_, connection_closed);
}
return static_cast<size_t>(bytes_written);
}
size_t
TransformationPlugin::produce(std::string_view data)
{
if (state_->type_ == REQUEST_TRANSFORMATION) {
state_->request_xform_output_.append(data.data(), data.length());
return data.size();
} else if (state_->type_ == CLIENT_RESPONSE_SINK_TRANSFORMATION) {
LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a client response sink transform. Not producing any output",
this, state_->txn_);
return 0;
} else if (state_->type_ == CLIENT_REQUEST_SINK_TRANSFORMATION) {
LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a client request sink transform. Not producing any output",
this, state_->txn_);
return 0;
} else {
return doProduce(data);
}
}
size_t
TransformationPlugin::setOutputComplete()
{
if (state_->type_ == CLIENT_RESPONSE_SINK_TRANSFORMATION || state_->type_ == CLIENT_REQUEST_SINK_TRANSFORMATION) {
// There's no output stream for a sink transform, so we do nothing
//
// Warning: don't try to shutdown the VConn, since the default implementation (DummyVConnection)
// has a stubbed out shutdown/close implementation
return 0;
} else if (state_->type_ == REQUEST_TRANSFORMATION) {
doProduce(state_->request_xform_output_);
}
int connection_closed = TSVConnClosedGet(state_->vconn_);
LOG_DEBUG("OutputComplete TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d, total bytes written=%" PRId64, this,
state_->txn_, state_->vconn_, connection_closed, state_->bytes_written_);
if (!connection_closed && !state_->output_vio_) {
LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p output complete without writing any data, initiating write of 0 bytes.", this,
state_->txn_);
// We're done without ever outputting anything, to correctly
// clean up we'll initiate a write then immediately set it to 0 bytes done.
state_->output_vio_ = TSVConnWrite(TSTransformOutputVConnGet(state_->vconn_), state_->vconn_, state_->output_buffer_reader_, 0);
if (state_->output_vio_) {
TSVIONDoneSet(state_->output_vio_, 0);
TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
} else {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p because VConnWrite failed.", this,
state_->txn_, state_->output_vio_);
}
return 0;
}
if (!connection_closed) {
// So there is a possible race condition here, if we wake up a dead
// VIO it can cause a segfault, so we must check that the VCONN is not dead.
int connection_closed = TSVConnClosedGet(state_->vconn_);
if (!connection_closed) {
TSVIONBytesSet(state_->output_vio_, state_->bytes_written_);
TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
} else {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this,
state_->txn_, state_->output_vio_, connection_closed);
}
} else {
LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this, state_->txn_,
state_->output_vio_, connection_closed);
}
return state_->bytes_written_;
}