blob: 09f386f0b8a2504815b82e589a1f5dd2b8a6e7c4 [file] [log] [blame]
/** @passthru.cc
*
* Example protocol plugin.
*
* @section license License
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Passthru plugin.
*
* This plugin demonstrates:
*
* - Using TSMgmtStringCreate() to add custom records into records.config.
* - Listening on a custom socket with TSPortDescriptorAccept().
* - Using TSHttpConnectWithPluginId() and the VConn API to proxy HTTP traffic.
*/
#include <ts/ts.h>
#include <cinttypes>
#include <cstring>
#define PLUGIN_NAME "passthru"
#define PassthruSessionDebug(sp, fmt, ...) \
do { \
TSDebug(PLUGIN_NAME, "sp=%p " fmt, sp, ##__VA_ARGS__); \
} while (0)
static int PassthruSessionEvent(TSCont cont, TSEvent event, void *edata);
union EventArgument {
void *edata;
TSVConn vconn;
TSVIO vio;
EventArgument(void *_p) : edata(_p) {}
};
struct PassthruIO {
TSVIO vio;
TSIOBuffer iobuf;
TSIOBufferReader reader;
PassthruIO() : vio(nullptr), iobuf(nullptr), reader(nullptr) {}
~PassthruIO() { clear(); }
void
clear()
{
if (this->reader) {
TSIOBufferReaderFree(this->reader);
}
if (this->iobuf) {
TSIOBufferDestroy(this->iobuf);
}
this->reader = nullptr;
this->iobuf = nullptr;
this->vio = nullptr;
}
// Start a read operation.
void
read(TSVConn vconn, TSCont contp)
{
TSReleaseAssert(this->vio == nullptr);
this->iobuf = TSIOBufferCreate();
this->reader = TSIOBufferReaderAlloc(this->iobuf);
this->vio = TSVConnRead(vconn, contp, this->iobuf, INT64_MAX);
}
// Start a write operation.
void
write(TSVConn vconn, TSCont contp)
{
TSReleaseAssert(this->vio == nullptr);
this->iobuf = TSIOBufferCreate();
this->reader = TSIOBufferReaderAlloc(this->iobuf);
this->vio = TSVConnWrite(vconn, contp, this->reader, INT64_MAX);
}
// Transfer data from this IO object to the target IO object. We use
// TSIOBufferCopy to move the data without actually duplicating it.
int64_t
transfer_to(PassthruIO &to)
{
int64_t consumed = 0;
int64_t avail = TSIOBufferReaderAvail(this->reader);
if (avail) {
consumed = TSIOBufferCopy(to.iobuf, this->reader, avail, 0 /* offset */);
TSIOBufferReaderConsume(this->reader, consumed);
}
return consumed;
}
// noncopyable
PassthruIO(const PassthruIO &) = delete;
PassthruIO &operator=(const PassthruIO &) = delete;
};
struct PassthruSession {
// VC session to the client.
struct {
TSVConn vconn;
PassthruIO readio;
PassthruIO writeio;
} client;
// VC session to Traffic Server via TSHttpConnect.
struct {
TSVConn vconn;
PassthruIO readio;
PassthruIO writeio;
} server;
TSCont contp;
PassthruSession() : contp(TSContCreate(PassthruSessionEvent, TSMutexCreate()))
{
this->client.vconn = this->server.vconn = nullptr;
TSContDataSet(this->contp, this);
}
~PassthruSession()
{
if (this->server.vconn) {
TSVConnClose(this->server.vconn);
}
if (this->client.vconn) {
TSVConnClose(this->client.vconn);
}
TSContDataSet(this->contp, nullptr);
TSContDestroy(this->contp);
PassthruSessionDebug(this, "destroyed session");
}
// noncopyable
PassthruSession(const PassthruSession &) = delete;
PassthruSession &operator=(const PassthruSession &) = delete;
};
static bool
PassthruSessionIsFinished(PassthruSession *sp)
{
int64_t avail = TSIOBufferReaderAvail(sp->client.writeio.reader);
// We should shut down the session if we don't have a server vconn
// (either it was never started, or it was closed), and we have drained
// the client write buffer.
if (sp->server.vconn == nullptr && avail == 0) {
return true;
}
PassthruSessionDebug(sp, "continuing session with %" PRId64 " buffered client bytes", avail);
return false;
}
static int
PassthruSessionEvent(TSCont cont, TSEvent event, void *edata)
{
EventArgument arg(edata);
PassthruSession *sp = (PassthruSession *)TSContDataGet(cont);
PassthruSessionDebug(sp, "session event on vconn=%p event=%d (%s)", TSVIOVConnGet(arg.vio), event, TSHttpEventNameLookup(event));
if (event == TS_EVENT_VCONN_READ_READY) {
// On the first read, wire up the internal transfer to the server.
if (sp->server.vconn == nullptr) {
sp->server.vconn = TSHttpConnectWithPluginId(TSNetVConnRemoteAddrGet(sp->client.vconn), PLUGIN_NAME, 0);
TSReleaseAssert(sp->server.vconn != nullptr);
// Start the server end of the IO before we write any data.
sp->server.readio.read(sp->server.vconn, sp->contp);
sp->server.writeio.write(sp->server.vconn, sp->contp);
}
int64_t nbytes;
nbytes = sp->client.readio.transfer_to(sp->server.writeio);
PassthruSessionDebug(sp, "proxied %" PRId64 " bytes from client vconn=%p to server vconn=%p", nbytes, sp->client.vconn,
sp->server.vconn);
if (nbytes) {
TSVIOReenable(sp->client.readio.vio);
TSVIOReenable(sp->server.writeio.vio);
}
nbytes = sp->server.readio.transfer_to(sp->client.writeio);
PassthruSessionDebug(sp, "proxied %" PRId64 " bytes from server vconn=%p to client vconn=%p", nbytes, sp->server.vconn,
sp->client.vconn);
if (nbytes) {
TSVIOReenable(sp->server.readio.vio);
TSVIOReenable(sp->client.writeio.vio);
}
if (PassthruSessionIsFinished(sp)) {
delete sp;
return TS_EVENT_NONE;
}
TSVIOReenable(arg.vio);
return TS_EVENT_NONE;
}
if (event == TS_EVENT_VCONN_WRITE_READY) {
if (PassthruSessionIsFinished(sp)) {
delete sp;
return TS_EVENT_NONE;
}
return TS_EVENT_NONE;
}
if (event == TS_EVENT_VCONN_EOS) {
// If we get EOS from the client, just abort everything; we don't
// care any more.
if (TSVIOVConnGet(arg.vio) == sp->client.vconn) {
PassthruSessionDebug(sp, "got EOS from client vconn=%p", sp->client.vconn);
delete sp;
return TS_EVENT_NONE;
}
// If we get EOS from the server, then we should make sure that we
// drain any outstanding data before shutting down the client.
if (TSVIOVConnGet(arg.vio) == sp->server.vconn) {
PassthruSessionDebug(sp, "EOS from server vconn=%p", sp->server.vconn);
TSReleaseAssert(sp->client.vconn != nullptr);
if (TSIOBufferReaderAvail(sp->server.readio.reader) > 0) {
sp->server.readio.transfer_to(sp->client.writeio);
TSVIOReenable(sp->client.writeio.vio);
}
TSVConnClose(sp->server.vconn);
sp->server.vconn = nullptr;
sp->server.readio.clear();
sp->server.writeio.clear();
}
return TS_EVENT_NONE;
}
TSError("[%s] unexpected event %s (%d) edata=%p", PLUGIN_NAME, TSHttpEventNameLookup(event), event, arg.edata);
return TS_EVENT_ERROR;
}
static int
PassthruAccept(TSCont /* cont */, TSEvent event, void *edata)
{
EventArgument arg(edata);
PassthruSession *sp = new PassthruSession();
PassthruSessionDebug(sp, "accepting connection on vconn=%p event=%d", arg.vconn, event);
TSReleaseAssert(event == TS_EVENT_NET_ACCEPT);
// Start the client end of the IO. We delay starting the server end until
// we get the first read from the client end.
sp->client.vconn = arg.vconn;
sp->client.readio.read(arg.vconn, sp->contp);
sp->client.writeio.write(arg.vconn, sp->contp);
return TS_EVENT_NONE;
}
static TSReturnCode
PassthruListen()
{
TSMgmtString ports = nullptr;
TSPortDescriptor descriptor = nullptr;
TSCont cont = nullptr;
if (TSMgmtStringGet("config.plugin.passthru.server_ports", &ports) == TS_ERROR) {
TSError("[%s] missing config.plugin.passthru.server_ports configuration", PLUGIN_NAME);
return TS_ERROR;
}
descriptor = TSPortDescriptorParse(ports);
if (descriptor == nullptr) {
TSError("[%s] failed to parse config.plugin.passthru.server_ports", PLUGIN_NAME);
TSfree(ports);
return TS_ERROR;
}
TSDebug(PLUGIN_NAME, "listening on port '%s'", ports);
TSfree(ports);
cont = TSContCreate(PassthruAccept, nullptr);
return TSPortDescriptorAccept(descriptor, cont);
}
void
TSPluginInit(int /* argc */, const char * /* argv */ [])
{
TSPluginRegistrationInfo info = {PLUGIN_NAME, "Apache Software Foundation", "dev@trafficserver.apache.org"};
TSReturnCode status;
TSMgmtStringCreate(TS_RECORDTYPE_CONFIG, "config.plugin.passthru.server_ports", const_cast<char *>(""),
TS_RECORDUPDATE_RESTART_TS, TS_RECORDCHECK_NULL, nullptr /* check_regex */, TS_RECORDACCESS_NULL);
// Start listening on the configured port.
status = PassthruListen();
TSReleaseAssert(status == TS_SUCCESS);
// Now that succeeded, we can register.
status = TSPluginRegister(&info);
TSReleaseAssert(status == TS_SUCCESS);
}
// vim: set sw=2 ts=2 sts=2 et: