blob: 5e8d75bcf321a2772d9c5874c27d56322836b3bf [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "ts_lua_util.h"
static int ts_lua_transform_handler(TSCont contp, ts_lua_http_transform_ctx *transform_ctx, TSEvent event, int n);
int
ts_lua_transform_entry(TSCont contp, TSEvent ev, void *edata)
{
int n, event;
TSVIO input_vio;
ts_lua_http_transform_ctx *transform_ctx;
event = (int)ev;
transform_ctx = (ts_lua_http_transform_ctx *)TSContDataGet(contp);
if (TSVConnClosedGet(contp)) {
ts_lua_destroy_http_transform_ctx(transform_ctx);
return 0;
}
n = 0;
switch (event) {
case TS_EVENT_ERROR:
input_vio = TSVConnWriteVIOGet(contp);
TSContCall(TSVIOContGet(input_vio), TS_EVENT_ERROR, input_vio);
break;
case TS_EVENT_VCONN_WRITE_COMPLETE:
TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
break;
case TS_LUA_EVENT_COROUTINE_CONT:
n = (intptr_t)edata;
/* FALL THROUGH */
case TS_EVENT_VCONN_WRITE_READY:
default:
ts_lua_transform_handler(contp, transform_ctx, event, n);
break;
}
return 0;
}
static int
ts_lua_transform_handler(TSCont contp, ts_lua_http_transform_ctx *transform_ctx, TSEvent event, int n)
{
TSVConn output_conn;
TSVIO input_vio;
TSIOBufferReader input_reader;
TSIOBufferBlock blk;
int64_t toread, towrite, blk_len, upstream_done, input_avail, input_wm_bytes, l;
const char *start;
const char *res;
size_t res_len;
int ret, eos, write_down, rc, top, empty_input;
ts_lua_coroutine *crt;
ts_lua_cont_info *ci;
lua_State *L;
TSMutex mtxp;
ci = &transform_ctx->cinfo;
crt = &ci->routine;
mtxp = crt->mctx->mutexp;
L = crt->lua;
output_conn = TSTransformOutputVConnGet(contp);
input_vio = TSVConnWriteVIOGet(contp);
empty_input = 0;
if (!TSVIOBufferGet(input_vio)) {
if (transform_ctx->output.vio) {
TSDebug(TS_LUA_DEBUG_TAG, "[%s] reenabling output VIO after input VIO does not exist", __FUNCTION__);
TSVIONBytesSet(transform_ctx->output.vio, transform_ctx->total);
TSVIOReenable(transform_ctx->output.vio);
return 0;
} else {
TSDebug(TS_LUA_DEBUG_TAG, "[%s] no input VIO and output VIO", __FUNCTION__);
empty_input = 1;
}
} else { // input VIO exists
input_wm_bytes = TSIOBufferWaterMarkGet(TSVIOBufferGet(input_vio));
if (transform_ctx->upstream_watermark_bytes >= 0 && transform_ctx->upstream_watermark_bytes != input_wm_bytes) {
TSDebug(TS_LUA_DEBUG_TAG, "[%s] Setting input_vio watermark to %" PRId64 " bytes", __FUNCTION__,
transform_ctx->upstream_watermark_bytes);
TSIOBufferWaterMarkSet(TSVIOBufferGet(input_vio), transform_ctx->upstream_watermark_bytes);
}
}
if (empty_input == 0) {
input_reader = TSVIOReaderGet(input_vio);
}
if (!transform_ctx->output.buffer) {
transform_ctx->output.buffer = TSIOBufferCreate();
transform_ctx->output.reader = TSIOBufferReaderAlloc(transform_ctx->output.buffer);
transform_ctx->reserved.buffer = TSIOBufferCreate();
transform_ctx->reserved.reader = TSIOBufferReaderAlloc(transform_ctx->reserved.buffer);
if (empty_input == 0) {
transform_ctx->upstream_bytes = TSVIONBytesGet(input_vio);
} else {
transform_ctx->upstream_bytes = 0;
}
transform_ctx->downstream_bytes = INT64_MAX;
}
if (empty_input == 0) {
input_avail = TSIOBufferReaderAvail(input_reader);
upstream_done = TSVIONDoneGet(input_vio);
toread = TSVIONTodoGet(input_vio);
if (toread <= input_avail) { // upstream finished
eos = 1;
} else {
eos = 0;
}
} else {
input_avail = 0;
upstream_done = 0;
toread = 0;
eos = 1;
}
if (input_avail > 0) {
// move to the reserved.buffer
TSIOBufferCopy(transform_ctx->reserved.buffer, input_reader, input_avail, 0);
// reset input
TSIOBufferReaderConsume(input_reader, input_avail);
TSVIONDoneSet(input_vio, upstream_done + input_avail);
}
write_down = 0;
if (empty_input == 0) {
towrite = TSIOBufferReaderAvail(transform_ctx->reserved.reader);
} else {
towrite = 0;
}
TSMutexLock(mtxp);
ts_lua_set_cont_info(L, ci);
do {
if (event == TS_LUA_EVENT_COROUTINE_CONT) {
event = 0;
goto launch;
} else {
n = 2;
}
if (towrite == 0 && empty_input == 0) {
break;
}
if (empty_input == 0) {
blk = TSIOBufferReaderStart(transform_ctx->reserved.reader);
start = TSIOBufferBlockReadStart(blk, transform_ctx->reserved.reader, &blk_len);
lua_pushlightuserdata(L, transform_ctx);
lua_rawget(L, LUA_GLOBALSINDEX); /* push function */
if (towrite > blk_len) {
lua_pushlstring(L, start, (size_t)blk_len);
towrite -= blk_len;
TSIOBufferReaderConsume(transform_ctx->reserved.reader, blk_len);
} else {
lua_pushlstring(L, start, (size_t)towrite);
TSIOBufferReaderConsume(transform_ctx->reserved.reader, towrite);
towrite = 0;
}
if (!towrite && eos) {
lua_pushinteger(L, 1); /* second param, data finished */
} else {
lua_pushinteger(L, 0); /* second param, data not finish */
}
} else {
lua_pushlightuserdata(L, transform_ctx);
lua_rawget(L, LUA_GLOBALSINDEX); /* push function */
lua_pushlstring(L, "", 0);
lua_pushinteger(L, 1); /* second param, data finished */
}
launch:
rc = lua_resume(L, n);
top = lua_gettop(L);
switch (rc) {
case LUA_YIELD: // coroutine yield
TSMutexUnlock(mtxp);
return 0;
case 0: // coroutine success
if (top == 2) {
ret = lua_tointeger(L, -1); /* 0 is not finished, 1 is finished */
res = lua_tolstring(L, -2, &res_len);
} else { // what hells code are you writing ?
ret = 0;
res = NULL;
res_len = 0;
}
break;
default: // coroutine failed
TSError("[ts_lua][%s] lua_resume failed: %s", __FUNCTION__, lua_tostring(L, -1));
ret = 1;
res = NULL;
res_len = 0;
break;
}
if (res && res_len > 0) {
if (!transform_ctx->output.vio) {
l = transform_ctx->downstream_bytes;
if (ret) {
l = res_len;
}
transform_ctx->output.vio = TSVConnWrite(output_conn, contp, transform_ctx->output.reader, l); // HttpSM go on
}
TSIOBufferWrite(transform_ctx->output.buffer, res, res_len);
transform_ctx->total += res_len;
write_down = 1;
}
lua_pop(L, top);
if (ret || (eos && !towrite)) { // EOS
eos = 1;
break;
}
} while (towrite > 0);
TSMutexUnlock(mtxp);
if (eos && !transform_ctx->output.vio) {
transform_ctx->output.vio = TSVConnWrite(output_conn, contp, transform_ctx->output.reader, 0);
}
if (write_down || eos) {
TSVIOReenable(transform_ctx->output.vio);
}
if (toread > input_avail) { // upstream not finished.
if (eos) {
TSVIONBytesSet(transform_ctx->output.vio, transform_ctx->total);
if (empty_input == 0) {
TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_EOS, input_vio);
}
} else {
if (empty_input == 0) {
TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY, input_vio);
}
}
} else { // upstream is finished.
TSVIONBytesSet(transform_ctx->output.vio, transform_ctx->total);
if (empty_input == 0) {
TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE, input_vio);
}
}
return 0;
}