| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "ts/ts.h" |
| #include <yaml-cpp/yaml.h> |
| #include "ats_wasm.h" |
| |
| #ifdef WAMR |
| #include "include/proxy-wasm/wamr.h" |
| #endif |
| |
| #ifdef WASMEDGE |
| #include "include/proxy-wasm/wasmedge.h" |
| #endif |
| |
| #ifdef WASMTIME |
| #include "include/proxy-wasm/wasmtime.h" |
| #endif |
| |
| #include <getopt.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| #include <fcntl.h> |
| |
| #include <string> |
| |
| // struct for storing plugin configuration |
| struct WasmInstanceConfig { |
| std::list<std::string> config_filenames = {}; |
| |
| std::list<std::pair<std::shared_ptr<ats_wasm::Wasm>, std::shared_ptr<proxy_wasm::PluginBase>>> configs = {}; |
| |
| std::list<std::pair<std::shared_ptr<ats_wasm::Wasm>, std::shared_ptr<proxy_wasm::PluginBase>>> deleted_configs = {}; |
| }; |
| |
| static std::unique_ptr<WasmInstanceConfig> wasm_config = nullptr; |
| |
| // handler for transform event |
| static int |
| transform_handler(TSCont contp, ats_wasm::TransformInfo *ti) |
| { |
| TSVConn output_conn; |
| TSVIO input_vio; |
| TSIOBufferReader input_reader; |
| TSIOBufferBlock blk; |
| int64_t toread, towrite, blk_len, upstream_done, input_avail; |
| const char *start; |
| const char *res; |
| size_t res_len; |
| bool eos, write_down, empty_input; |
| |
| ats_wasm::Context *c; |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] transform handler begins", __FUNCTION__); |
| c = ti->context; |
| |
| output_conn = TSTransformOutputVConnGet(contp); |
| input_vio = TSVConnWriteVIOGet(contp); |
| |
| empty_input = false; |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] checking input VIO", __FUNCTION__); |
| if (!TSVIOBufferGet(input_vio)) { |
| if (ti->output_vio) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] reenabling output VIO after input VIO does not exist", __FUNCTION__); |
| TSVIONBytesSet(ti->output_vio, ti->total); |
| TSVIOReenable(ti->output_vio); |
| return 0; |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] no input VIO and output VIO", __FUNCTION__); |
| empty_input = true; |
| } |
| } |
| |
| if (!empty_input) { |
| input_reader = TSVIOReaderGet(input_vio); |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] creating buffer and reader", __FUNCTION__); |
| if (!ti->output_buffer) { |
| ti->output_buffer = TSIOBufferCreate(); |
| ti->output_reader = TSIOBufferReaderAlloc(ti->output_buffer); |
| |
| ti->reserved_buffer = TSIOBufferCreate(); |
| ti->reserved_reader = TSIOBufferReaderAlloc(ti->reserved_buffer); |
| |
| if (!empty_input) { |
| ti->upstream_bytes = TSVIONBytesGet(input_vio); |
| } else { |
| ti->upstream_bytes = 0; |
| } |
| |
| ti->downstream_bytes = INT64_MAX; |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] init variables inside handler", __FUNCTION__); |
| if (!empty_input) { |
| input_avail = TSIOBufferReaderAvail(input_reader); |
| upstream_done = TSVIONDoneGet(input_vio); |
| toread = TSVIONTodoGet(input_vio); |
| |
| if (toread <= input_avail) { // upstream finished |
| eos = true; |
| } else { |
| eos = false; |
| } |
| } else { |
| input_avail = 0; |
| upstream_done = 0; |
| toread = 0; |
| eos = true; |
| } |
| |
| if (input_avail > 0) { |
| // move to the reserved.buffer |
| TSIOBufferCopy(ti->reserved_buffer, input_reader, input_avail, 0); |
| |
| // reset input |
| TSIOBufferReaderConsume(input_reader, input_avail); |
| TSVIONDoneSet(input_vio, upstream_done + input_avail); |
| } |
| |
| write_down = false; |
| if (!empty_input) { |
| towrite = TSIOBufferReaderAvail(ti->reserved_reader); |
| } else { |
| towrite = 0; |
| } |
| |
| do { |
| Dbg(ats_wasm::dbg_ctl, "[%s] inside transform handler loop", __FUNCTION__); |
| proxy_wasm::FilterDataStatus status = proxy_wasm::FilterDataStatus::Continue; |
| |
| if (towrite == 0 && !empty_input) { |
| break; |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] retrieving text and calling the wasm handler function", __FUNCTION__); |
| if (!empty_input) { |
| blk = TSIOBufferReaderStart(ti->reserved_reader); |
| start = TSIOBufferBlockReadStart(blk, ti->reserved_reader, &blk_len); |
| |
| int size = 0; |
| if (towrite > blk_len) { |
| c->setTransformResult(start, blk_len); |
| towrite -= blk_len; |
| TSIOBufferReaderConsume(ti->reserved_reader, blk_len); |
| size = blk_len; |
| } else { |
| c->setTransformResult(start, towrite); |
| TSIOBufferReaderConsume(ti->reserved_reader, towrite); |
| size = towrite; |
| towrite = 0; |
| } |
| |
| if (!towrite && eos) { |
| if (ti->request) { |
| status = c->onRequestBody(size, true); |
| } else { |
| status = c->onResponseBody(size, true); |
| } |
| } else { |
| if (ti->request) { |
| status = c->onRequestBody(size, false); |
| } else { |
| status = c->onResponseBody(size, false); |
| } |
| } |
| } else { |
| c->setTransformResult(nullptr, 0); |
| if (ti->request) { |
| status = c->onRequestBody(0, true); |
| } else { |
| status = c->onResponseBody(0, true); |
| } |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] retrieving returns from wasm handler function and pass back to ATS", __FUNCTION__); |
| if ((status == proxy_wasm::FilterDataStatus::Continue) || |
| ((status == proxy_wasm::FilterDataStatus::StopIterationAndBuffer || |
| status == proxy_wasm::FilterDataStatus::StopIterationAndWatermark) && |
| eos && !towrite)) { |
| res = c->getTransformResult(&res_len); |
| |
| if (res && res_len > 0) { |
| if (!ti->output_vio) { |
| if (eos && !towrite) { |
| ti->output_vio = TSVConnWrite(output_conn, contp, ti->output_reader, res_len); // HttpSM go on |
| } else { |
| ti->output_vio = TSVConnWrite(output_conn, contp, ti->output_reader, ti->downstream_bytes); // HttpSM go on |
| } |
| } |
| |
| TSIOBufferWrite(ti->output_buffer, res, res_len); |
| ti->total += res_len; |
| write_down = true; |
| } |
| |
| c->clearTransformResult(); |
| } |
| |
| if (status == proxy_wasm::FilterDataStatus::StopIterationNoBuffer) { |
| c->clearTransformResult(); |
| } |
| |
| if (eos && !towrite) { // EOS |
| break; |
| } |
| |
| } while (towrite > 0); |
| |
| if (eos && !ti->output_vio) { |
| ti->output_vio = TSVConnWrite(output_conn, contp, ti->output_reader, 0); |
| } |
| |
| if (write_down || eos) { |
| TSVIOReenable(ti->output_vio); |
| } |
| |
| if (toread > input_avail) { // upstream not finished. |
| if (eos) { |
| // this should not happen because eos is set to true if toread <= input_avail |
| // we are, though, expecting that eos may be set by the wasm module function in the future |
| TSVIONBytesSet(ti->output_vio, ti->total); |
| if (!empty_input) { |
| TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_EOS, input_vio); |
| } |
| } else { |
| if (!empty_input) { |
| TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_READY, input_vio); |
| } |
| } |
| } else { // upstream is finished. |
| TSVIONBytesSet(ti->output_vio, ti->total); |
| if (!empty_input) { |
| TSContCall(TSVIOContGet(input_vio), TS_EVENT_VCONN_WRITE_COMPLETE, input_vio); |
| } |
| } |
| |
| return 0; |
| } |
| |
| static int |
| transform_entry(TSCont contp, TSEvent ev, void * /* edata ATS_UNUSED */) |
| { |
| int event; |
| TSVIO input_vio; |
| ats_wasm::TransformInfo *ti; |
| |
| event = static_cast<int>(ev); |
| ti = static_cast<ats_wasm::TransformInfo *>(TSContDataGet(contp)); |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] begin transform entry", __FUNCTION__); |
| if (TSVConnClosedGet(contp)) { |
| delete ti; |
| TSContDestroy(contp); |
| return 0; |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] checking event inside transform entry", __FUNCTION__); |
| switch (event) { |
| case TS_EVENT_ERROR: |
| Dbg(ats_wasm::dbg_ctl, "[%s] event error", __FUNCTION__); |
| input_vio = TSVConnWriteVIOGet(contp); |
| TSContCall(TSVIOContGet(input_vio), TS_EVENT_ERROR, input_vio); |
| break; |
| |
| // we should handle TS_EVENT_VCONN_EOS similarly here if we support setting EOS from wasm module |
| case TS_EVENT_VCONN_WRITE_COMPLETE: |
| Dbg(ats_wasm::dbg_ctl, "[%s] event vconn write complete", __FUNCTION__); |
| TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1); |
| break; |
| |
| case TS_EVENT_VCONN_WRITE_READY: |
| default: |
| Dbg(ats_wasm::dbg_ctl, "[%s] event vconn write ready/default", __FUNCTION__); |
| transform_handler(contp, ti); |
| break; |
| } |
| |
| return 0; |
| } |
| |
| // handler for timer event |
| static int |
| schedule_handler(TSCont contp, TSEvent /*event*/, void * /*data*/) |
| { |
| Dbg(ats_wasm::dbg_ctl, "[%s] Inside schedule_handler", __FUNCTION__); |
| |
| auto *c = static_cast<ats_wasm::Context *>(TSContDataGet(contp)); |
| |
| auto *old_wasm = static_cast<ats_wasm::Wasm *>(c->wasm()); |
| TSMutexLock(old_wasm->mutex()); |
| |
| c->onTick(0); // use 0 as token |
| |
| if (wasm_config->configs.empty()) { |
| TSError("[wasm][%s] Configuration objects are empty", __FUNCTION__); |
| TSMutexUnlock(old_wasm->mutex()); |
| return 0; |
| } |
| |
| bool found = false; |
| for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { |
| std::shared_ptr<ats_wasm::Wasm> wbp = it->first; |
| if (wbp.get() == old_wasm) { |
| found = true; |
| auto *wasm = static_cast<ats_wasm::Wasm *>(c->wasm()); |
| uint32_t root_context_id = c->id(); |
| if (wasm->existsTimerPeriod(root_context_id)) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] reschedule continuation", __FUNCTION__); |
| std::chrono::milliseconds period = wasm->getTimerPeriod(root_context_id); |
| TSContScheduleOnPool(contp, static_cast<TSHRTime>(period.count()), TS_THREAD_POOL_NET); |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] can't find period for root context id: %d", __FUNCTION__, root_context_id); |
| } |
| break; |
| } |
| } |
| |
| if (!found) { |
| std::shared_ptr<ats_wasm::Wasm> temp = nullptr; |
| uint32_t root_context_id = c->id(); |
| old_wasm->removeTimerPeriod(root_context_id); |
| |
| if (old_wasm->readyShutdown()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] starting WasmBase Shutdown", __FUNCTION__); |
| old_wasm->startShutdown(); |
| if (!old_wasm->readyDelete()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to delete WasmBase/PluginBase", __FUNCTION__); |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] remove wasm from deleted_configs", __FUNCTION__); |
| bool advance = true; |
| for (auto it = wasm_config->deleted_configs.begin(); it != wasm_config->deleted_configs.end(); advance ? it++ : it) { |
| advance = true; |
| Dbg(ats_wasm::dbg_ctl, "[%s] looping through deleted_configs", __FUNCTION__); |
| std::shared_ptr<ats_wasm::Wasm> wbp = it->first; |
| temp = wbp; |
| if (wbp.get() == old_wasm) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] found matching WasmBase", __FUNCTION__); |
| it = wasm_config->deleted_configs.erase(it); |
| advance = false; |
| } |
| } |
| } |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to shutdown WasmBase", __FUNCTION__); |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] config wasm has changed. thus not scheduling", __FUNCTION__); |
| } |
| |
| TSMutexUnlock(old_wasm->mutex()); |
| |
| return 0; |
| } |
| |
| // handler for transaction event |
| static int |
| http_event_handler(TSCont contp, TSEvent event, void *data) |
| { |
| int result = -1; |
| auto *context = static_cast<ats_wasm::Context *>(TSContDataGet(contp)); |
| auto *old_wasm = static_cast<ats_wasm::Wasm *>(context->wasm()); |
| |
| context->resetTxnReenable(); |
| |
| TSMutexLock(old_wasm->mutex()); |
| std::shared_ptr<ats_wasm::Wasm> temp = nullptr; |
| auto *txnp = static_cast<TSHttpTxn>(data); |
| |
| TSMBuffer buf = nullptr; |
| TSMLoc hdr_loc = nullptr; |
| int count = 0; |
| |
| switch (event) { |
| case TS_EVENT_HTTP_TXN_START: |
| break; |
| |
| case TS_EVENT_HTTP_READ_REQUEST_HDR: |
| if (TSHttpTxnClientReqGet(txnp, &buf, &hdr_loc) != TS_SUCCESS) { |
| TSError("[wasm][%s] cannot retrieve client request", __FUNCTION__); |
| TSMutexUnlock(old_wasm->mutex()); |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); |
| context->setTxnReenable(); |
| return 0; |
| } |
| count = TSMimeHdrFieldsCount(buf, hdr_loc); |
| TSHandleMLocRelease(buf, TS_NULL_MLOC, hdr_loc); |
| |
| result = context->onRequestHeaders(count, false) == proxy_wasm::FilterHeadersStatus::Continue ? 0 : 1; |
| break; |
| |
| case TS_EVENT_HTTP_POST_REMAP: |
| break; |
| |
| case TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE: |
| break; |
| |
| case TS_EVENT_HTTP_SEND_REQUEST_HDR: |
| break; |
| |
| case TS_EVENT_HTTP_READ_RESPONSE_HDR: |
| if (TSHttpTxnServerRespGet(txnp, &buf, &hdr_loc) != TS_SUCCESS) { |
| TSError("[wasm][%s] cannot retrieve server response", __FUNCTION__); |
| TSMutexUnlock(old_wasm->mutex()); |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); |
| context->setTxnReenable(); |
| return 0; |
| } |
| count = TSMimeHdrFieldsCount(buf, hdr_loc); |
| TSHandleMLocRelease(buf, TS_NULL_MLOC, hdr_loc); |
| |
| result = context->onResponseHeaders(count, false) == proxy_wasm::FilterHeadersStatus::Continue ? 0 : 1; |
| break; |
| |
| case TS_EVENT_HTTP_SEND_RESPONSE_HDR: |
| context->onLocalReply(); |
| result = 0; |
| break; |
| |
| case TS_EVENT_HTTP_PRE_REMAP: |
| break; |
| |
| case TS_EVENT_HTTP_OS_DNS: |
| break; |
| |
| case TS_EVENT_HTTP_READ_CACHE_HDR: |
| break; |
| |
| case TS_EVENT_HTTP_TXN_CLOSE: { |
| context->onDone(); |
| context->onDelete(); |
| |
| bool found = false; |
| for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { |
| std::shared_ptr<ats_wasm::Wasm> wbp = it->first; |
| if (wbp.get() == context->wasm()) { |
| found = true; |
| break; |
| } |
| } |
| |
| if (found) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] config wasm has not changed", __FUNCTION__); |
| } else { |
| if (old_wasm->readyShutdown()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] starting WasmBase Shutdown", __FUNCTION__); |
| old_wasm->startShutdown(); |
| if (!old_wasm->readyDelete()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to delete WasmBase/PluginBase", __FUNCTION__); |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] remove wasm from deleted_configs", __FUNCTION__); |
| bool advance = true; |
| for (auto it = wasm_config->deleted_configs.begin(); it != wasm_config->deleted_configs.end(); advance ? it++ : it) { |
| advance = true; |
| Dbg(ats_wasm::dbg_ctl, "[%s] looping through deleted_configs", __FUNCTION__); |
| std::shared_ptr<ats_wasm::Wasm> wbp = it->first; |
| temp = wbp; |
| if (wbp.get() == old_wasm) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] found matching WasmBase", __FUNCTION__); |
| it = wasm_config->deleted_configs.erase(it); |
| advance = false; |
| } |
| } |
| } |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to shutdown WasmBase", __FUNCTION__); |
| } |
| |
| Dbg(ats_wasm::dbg_ctl, "[%s] config wasm has changed", __FUNCTION__); |
| } |
| |
| delete context; |
| |
| TSContDestroy(contp); |
| result = 0; |
| break; |
| } |
| default: |
| break; |
| } |
| |
| TSMutexUnlock(old_wasm->mutex()); |
| |
| // check if we have reenable transaction already or not |
| if ((context == nullptr) || (!context->isTxnReenable())) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] no context or not yet reenabled transaction", __FUNCTION__); |
| |
| if (result == 0) { |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); |
| if (context != nullptr) { |
| context->setTxnReenable(); |
| } |
| } else if (result < 0) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] abnormal event, continue with error", __FUNCTION__); |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); |
| if (context != nullptr) { |
| context->setTxnReenable(); |
| } |
| } else { |
| if (context->isLocalReply()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] abnormal return, continue with error due to local reply", __FUNCTION__); |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); |
| if (context != nullptr) { |
| context->setTxnReenable(); |
| } |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] abnormal return, no continue, context id: %d", __FUNCTION__, context->id()); |
| } |
| } |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] transaction already reenabled", __FUNCTION__); |
| } |
| return 0; |
| } |
| |
| // main handler/entry point for the plugin |
| static int |
| global_hook_handler(TSCont /*contp*/, TSEvent /*event*/, void *data) |
| { |
| auto *txnp = static_cast<TSHttpTxn>(data); |
| for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { |
| std::shared_ptr<ats_wasm::Wasm> wbp = it->first; |
| std::shared_ptr<proxy_wasm::PluginBase> plg = it->second; |
| auto *wasm = wbp.get(); |
| TSMutexLock(wasm->mutex()); |
| auto *rootContext = wasm->getRootContext(plg, false); |
| auto *context = new ats_wasm::Context(wasm, rootContext->id(), plg); |
| context->initialize(txnp); |
| context->onCreate(); |
| TSMutexUnlock(wasm->mutex()); |
| |
| // create continuation for transaction |
| TSCont txn_contp = TSContCreate(http_event_handler, nullptr); |
| TSHttpTxnHookAdd(txnp, TS_HTTP_READ_REQUEST_HDR_HOOK, txn_contp); |
| TSHttpTxnHookAdd(txnp, TS_HTTP_READ_RESPONSE_HDR_HOOK, txn_contp); |
| TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp); |
| // add send response hook for local reply if needed |
| TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, txn_contp); |
| |
| TSContDataSet(txn_contp, context); |
| |
| // create transform items |
| Dbg(ats_wasm::dbg_ctl, "[%s] creating transform info, continuation and hook", __FUNCTION__); |
| ats_wasm::TransformInfo *reqbody_ti = new ats_wasm::TransformInfo(); |
| reqbody_ti->request = true; |
| reqbody_ti->context = context; |
| ats_wasm::TransformInfo *respbody_ti = new ats_wasm::TransformInfo(); |
| respbody_ti->request = false; |
| respbody_ti->context = context; |
| |
| TSVConn reqbody_connp = TSTransformCreate(transform_entry, txnp); |
| TSContDataSet(reqbody_connp, reqbody_ti); |
| TSVConn respbody_connp = TSTransformCreate(transform_entry, txnp); |
| TSContDataSet(respbody_connp, respbody_ti); |
| |
| TSHttpTxnHookAdd(txnp, TS_HTTP_REQUEST_TRANSFORM_HOOK, reqbody_connp); |
| TSHttpTxnHookAdd(txnp, TS_HTTP_RESPONSE_TRANSFORM_HOOK, respbody_connp); |
| } |
| |
| TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); |
| return 0; |
| } |
| |
| // function to read a file |
| static inline int |
| read_file(const std::string &fn, std::string *s) |
| { |
| auto fd = open(fn.c_str(), O_RDONLY); |
| if (fd < 0) { |
| char *errmsg = strerror(errno); |
| TSError("[wasm][%s] wasm unable to open: %s", __FUNCTION__, errmsg); |
| return -1; |
| } |
| auto n = ::lseek(fd, 0, SEEK_END); |
| if (n < 0) { |
| char *errmsg = strerror(errno); |
| TSError("[wasm][%s] wasm unable to lseek: %s", __FUNCTION__, errmsg); |
| return -1; |
| } |
| ::lseek(fd, 0, SEEK_SET); |
| s->resize(n); |
| auto nn = ::read(fd, const_cast<char *>(&*s->begin()), n); |
| if (nn < 0) { |
| char *errmsg = strerror(errno); |
| TSError("[wasm][%s] wasm unable to read: %s", __FUNCTION__, errmsg); |
| return -1; |
| } |
| if (nn != static_cast<ssize_t>(n)) { |
| TSError("[wasm][%s] wasm unable to read: size different from buffer", __FUNCTION__); |
| return -1; |
| } |
| return 0; |
| } |
| |
| // function to read configuration |
| static bool |
| read_configuration() |
| { |
| std::list<std::pair<std::shared_ptr<ats_wasm::Wasm>, std::shared_ptr<proxy_wasm::PluginBase>>> new_configs = {}; |
| |
| for (auto const &cfn : wasm_config->config_filenames) { |
| // PluginBase parameters |
| std::string name = ""; |
| std::string root_id = ""; |
| std::string configuration = ""; |
| bool fail_open = true; |
| |
| // WasmBase parameters |
| std::string runtime = ""; |
| std::string vm_id = ""; |
| std::string vm_configuration = ""; |
| std::string wasm_filename = ""; |
| bool allow_precompiled = true; |
| |
| proxy_wasm::AllowedCapabilitiesMap cap_maps; |
| std::unordered_map<std::string, std::string> envs; |
| |
| try { |
| YAML::Node config = YAML::LoadFile(cfn); |
| |
| for (YAML::const_iterator it = config.begin(); it != config.end(); ++it) { |
| const std::string &node_name = it->first.as<std::string>(); |
| YAML::NodeType::value type = it->second.Type(); |
| |
| if (node_name != "config" || type != YAML::NodeType::Map) { |
| TSError( |
| "[wasm][%s] Invalid YAML Configuration format for wasm: %s, reason: Top level nodes must be named config and be of " |
| "type map", |
| __FUNCTION__, cfn.c_str()); |
| return false; |
| } |
| |
| for (YAML::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { |
| const YAML::Node first = it2->first; |
| const YAML::Node second = it2->second; |
| |
| const std::string &key = first.as<std::string>(); |
| if (second.IsScalar()) { |
| const std::string &value = second.as<std::string>(); |
| if (key == "name") { |
| name = value; |
| } |
| if (key == "root_id" || key == "rootId") { |
| root_id = value; |
| } |
| if (key == "configuration") { |
| configuration = value; |
| } |
| if (key == "fail_open") { |
| if (value == "false") { |
| fail_open = false; |
| } |
| } |
| } |
| if (second.IsMap() && (key == "capability_restriction_config")) { |
| if (second["allowed_capabilities"]) { |
| const YAML::Node ac_node = second["allowed_capabilities"]; |
| if (ac_node.IsSequence()) { |
| for (const auto &i : ac_node) { |
| auto ac = i.as<std::string>(); |
| proxy_wasm::SanitizationConfig sc; |
| sc.argument_list = {}; |
| sc.is_allowlist = false; |
| cap_maps[ac] = sc; |
| } |
| } |
| } |
| } |
| |
| if (second.IsMap() && (key == "vm_config" || key == "vmConfig")) { |
| for (YAML::const_iterator it3 = second.begin(); it3 != second.end(); ++it3) { |
| const YAML::Node vm_config_first = it3->first; |
| const YAML::Node vm_config_second = it3->second; |
| |
| const std::string &vm_config_key = vm_config_first.as<std::string>(); |
| if (vm_config_second.IsScalar()) { |
| const std::string &vm_config_value = vm_config_second.as<std::string>(); |
| if (vm_config_key == "runtime") { |
| runtime = vm_config_value; |
| } |
| if (vm_config_key == "vm_id" || vm_config_key == "vmId") { |
| vm_id = vm_config_value; |
| } |
| if (vm_config_key == "configuration") { |
| vm_configuration = vm_config_value; |
| } |
| if (vm_config_key == "allow_precompiled") { |
| if (vm_config_value == "false") { |
| allow_precompiled = false; |
| } |
| } |
| } |
| |
| if (vm_config_key == "environment_variables" && vm_config_second.IsMap()) { |
| if (vm_config_second["host_env_keys"]) { |
| const YAML::Node ek_node = vm_config_second["host_env_keys"]; |
| if (ek_node.IsSequence()) { |
| for (const auto &i : ek_node) { |
| auto ek = i.as<std::string>(); |
| if (auto *value = std::getenv(ek.data())) { |
| envs[ek] = value; |
| } |
| } |
| } |
| } |
| if (vm_config_second["key_values"]) { |
| const YAML::Node kv_node = vm_config_second["key_values"]; |
| if (kv_node.IsMap()) { |
| for (YAML::const_iterator it4 = kv_node.begin(); it4 != kv_node.end(); ++it4) { |
| envs[it4->first.as<std::string>()] = it4->second.as<std::string>(); |
| } |
| } |
| } |
| } |
| |
| if (vm_config_key == "code" && vm_config_second.IsMap()) { |
| if (vm_config_second["local"]) { |
| const YAML::Node local_node = vm_config_second["local"]; |
| if (local_node["filename"]) { |
| wasm_filename = local_node["filename"].as<std::string>(); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // only allowed one config block (first one) for now |
| break; |
| } |
| } catch (const YAML::Exception &e) { |
| TSError("[wasm][%s] YAML::Exception %s when parsing YAML config file %s for wasm", __FUNCTION__, e.what(), cfn.c_str()); |
| return false; |
| } |
| |
| std::shared_ptr<ats_wasm::Wasm> wasm; |
| if (runtime == "ats.wasm.runtime.wasmedge") { |
| #ifdef WASMEDGE |
| wasm = std::make_shared<ats_wasm::Wasm>(proxy_wasm::createWasmEdgeVm(), // VM |
| vm_id, // vm_id |
| vm_configuration, // vm_configuration |
| "", // vm_key, |
| envs, // envs |
| cap_maps // allowed capabilities |
| ); |
| #else |
| TSError("[wasm][%s] wasm unable to use WasmEdge runtime", __FUNCTION__); |
| return false; |
| #endif |
| } else if (runtime == "ats.wasm.runtime.wamr") { |
| #ifdef WAMR |
| wasm = std::make_shared<ats_wasm::Wasm>(proxy_wasm::createWamrVm(), // VM |
| vm_id, // vm_id |
| vm_configuration, // vm_configuration |
| "", // vm_key, |
| envs, // envs |
| cap_maps // allowed capabilities |
| ); |
| #else |
| TSError("[wasm][%s] wasm unable to use WAMR runtime", __FUNCTION__); |
| return false; |
| #endif |
| } else if (runtime == "ats.wasm.runtime.wasmtime") { |
| #ifdef WASMTIME |
| wasm = std::make_shared<ats_wasm::Wasm>(proxy_wasm::createWasmtimeVm(), // VM |
| vm_id, // vm_id |
| vm_configuration, // vm_configuration |
| "", // vm_key, |
| envs, // envs |
| cap_maps // allowed capabilities |
| ); |
| #else |
| TSError("[wasm][%s] wasm unable to use Wasmtime runtime", __FUNCTION__); |
| return false; |
| #endif |
| } else { |
| TSError("[wasm][%s] wasm unable to use %s runtime", __FUNCTION__, runtime.c_str()); |
| return false; |
| } |
| wasm->wasm_vm()->integration() = std::make_unique<ats_wasm::ATSWasmVmIntegration>(); |
| |
| auto plugin = std::make_shared<proxy_wasm::PluginBase>(name, // name |
| root_id, // root_id |
| vm_id, // vm_id |
| runtime, // engine |
| configuration, // plugin_configuration |
| fail_open, // failopen |
| "" // TODO: plugin key from where ? |
| ); |
| |
| if (*wasm_filename.begin() != '/') { |
| wasm_filename = std::string(TSConfigDirGet()) + "/" + wasm_filename; |
| } |
| std::string code; |
| if (read_file(wasm_filename, &code) < 0) { |
| TSError("[wasm][%s] wasm unable to read file '%s'", __FUNCTION__, wasm_filename.c_str()); |
| return false; |
| } |
| |
| if (code.empty()) { |
| TSError("[wasm][%s] code is empty", __FUNCTION__); |
| return false; |
| } |
| |
| if (!wasm) { |
| TSError("[wasm][%s] wasm wasm wasm unable to create vm", __FUNCTION__); |
| return false; |
| } |
| if (!wasm->load(code, allow_precompiled)) { |
| TSError("[wasm][%s] Failed to load Wasm code", __FUNCTION__); |
| return false; |
| } |
| if (!wasm->initialize()) { |
| TSError("[wasm][%s] Failed to initialize Wasm code", __FUNCTION__); |
| return false; |
| } |
| |
| TSCont contp = TSContCreate(schedule_handler, TSMutexCreate()); |
| auto *rootContext = wasm->start(plugin, contp); |
| |
| if (!wasm->configure(rootContext, plugin)) { |
| TSError("[wasm][%s] Failed to configure Wasm", __FUNCTION__); |
| return false; |
| } |
| |
| auto new_config = std::make_pair(wasm, plugin); |
| new_configs.push_front(new_config); |
| } |
| |
| auto old_configs = wasm_config->configs; |
| |
| wasm_config->configs = new_configs; |
| |
| for (auto it = old_configs.begin(); it != old_configs.end(); it++) { |
| std::shared_ptr<ats_wasm::Wasm> old_wasm = it->first; |
| std::shared_ptr<proxy_wasm::PluginBase> old_plugin = it->second; |
| |
| if (old_wasm != nullptr) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] previous WasmBase exists", __FUNCTION__); |
| TSMutexLock(old_wasm->mutex()); |
| if (old_wasm->readyShutdown()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] starting WasmBase Shutdown", __FUNCTION__); |
| old_wasm->startShutdown(); |
| if (!old_wasm->readyDelete()) { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to delete WasmBase/PluginBase", __FUNCTION__); |
| auto deleted_config = std::make_pair(old_wasm, old_plugin); |
| wasm_config->deleted_configs.push_front(deleted_config); |
| } |
| } else { |
| Dbg(ats_wasm::dbg_ctl, "[%s] not ready to shutdown WasmBase", __FUNCTION__); |
| auto deleted_config = std::make_pair(old_wasm, old_plugin); |
| wasm_config->deleted_configs.push_front(deleted_config); |
| } |
| TSMutexUnlock(old_wasm->mutex()); |
| } |
| } |
| |
| return true; |
| } |
| |
| // handler for configuration event |
| static int |
| config_handler(TSCont /*contp*/, TSEvent /*event*/, void * /*data*/) |
| { |
| Dbg(ats_wasm::dbg_ctl, "[%s] configuration reloading", __FUNCTION__); |
| read_configuration(); |
| Dbg(ats_wasm::dbg_ctl, "[%s] configuration reloading ends", __FUNCTION__); |
| return 0; |
| } |
| |
| // main function for the plugin |
| void |
| TSPluginInit(int argc, const char *argv[]) |
| { |
| TSPluginRegistrationInfo info; |
| info.plugin_name = "wasm"; |
| info.vendor_name = "Apache Software Foundation"; |
| info.support_email = "dev@trafficserver.apache.org"; |
| if (TSPluginRegister(&info) != TS_SUCCESS) { |
| TSError("[wasm] Plugin registration failed"); |
| } |
| |
| if (argc < 2) { |
| TSError("[wasm][%s] wasm config argument missing", __FUNCTION__); |
| return; |
| } |
| |
| wasm_config = std::make_unique<WasmInstanceConfig>(); |
| |
| for (int i = 1; i < argc; i++) { |
| std::string filename = std::string(argv[i]); |
| if (*filename.begin() != '/') { |
| filename = std::string(TSConfigDirGet()) + "/" + filename; |
| } |
| wasm_config->config_filenames.push_front(filename); |
| } |
| |
| if (!read_configuration()) { |
| return; |
| } |
| |
| // global handler |
| TSCont global_contp = TSContCreate(global_hook_handler, nullptr); |
| if (global_contp == nullptr) { |
| TSError("[wasm][%s] could not create transaction start continuation", __FUNCTION__); |
| return; |
| } |
| TSHttpHookAdd(TS_HTTP_TXN_START_HOOK, global_contp); |
| |
| // configuration handler |
| TSCont config_contp = TSContCreate(config_handler, nullptr); |
| if (config_contp == nullptr) { |
| TSError("[ts_lua][%s] could not create configuration continuation", __FUNCTION__); |
| return; |
| } |
| TSMgmtUpdateRegister(config_contp, "wasm"); |
| } |