| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| /** |
| * @file bg_fetch.cpp |
| * @brief Background fetch related classes (header file). |
| */ |
| |
| #include <arpa/inet.h> |
| #include <sys/types.h> |
| #include <netinet/in.h> |
| #include <netinet/ip.h> |
| #include <cstring> |
| #include <sys/socket.h> |
| #include <cinttypes> |
| |
| #include "ts/ts.h" /* ATS API */ |
| #include "fetch.h" |
| #include "headers.h" |
| |
| const char * |
| getPrefetchMetricsNames(int metric) |
| { |
| switch (metric) { |
| case FETCH_ACTIVE: |
| return "fetch.active"; |
| break; |
| case FETCH_COMPLETED: |
| return "fetch.completed"; |
| break; |
| case FETCH_ERRORS: |
| return "fetch.errors"; |
| break; |
| case FETCH_TIMEOOUTS: |
| return "fetch.timeouts"; |
| break; |
| case FETCH_THROTTLED: |
| return "fetch.throttled"; |
| break; |
| case FETCH_ALREADY_CACHED: |
| return "fetch.already_cached"; |
| break; |
| case FETCH_TOTAL: |
| return "fetch.total"; |
| break; |
| case FETCH_UNIQUE_YES: |
| return "fetch.unique.yes"; |
| break; |
| case FETCH_UNIQUE_NO: |
| return "fetch.unique.no"; |
| break; |
| case FETCH_MATCH_YES: |
| return "fetch.match.yes"; |
| break; |
| case FETCH_MATCH_NO: |
| return "fetch.match.no"; |
| break; |
| case FETCH_POLICY_YES: |
| return "fetch.policy.yes"; |
| break; |
| case FETCH_POLICY_NO: |
| return "fetch.policy.no"; |
| break; |
| case FETCH_POLICY_SIZE: |
| return "fetch.policy.size"; |
| break; |
| case FETCH_POLICY_MAXSIZE: |
| return "fetch.policy.maxsize"; |
| break; |
| default: |
| return "unknown"; |
| break; |
| } |
| } |
| |
| static bool |
| createStat(const String &prefix, const String &space, const char *module, const char *statName, TSRecordDataType statType, |
| int &statId) |
| { |
| String name(prefix); |
| name.append(".").append(space); |
| if (nullptr != module) { |
| name.append(".").append(module); |
| } |
| name.append(".").append(statName); |
| |
| if (TSStatFindName(name.c_str(), &statId) == TS_ERROR) { |
| statId = TSStatCreate(name.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); |
| if (statId == TS_ERROR) { |
| PrefetchError("failed to register '%s'", name.c_str()); |
| return false; |
| } |
| |
| TSStatIntSet(statId, 0); |
| } |
| |
| PrefetchDebug("created metric '%s (id:%d)'", name.c_str(), statId); |
| |
| return true; |
| } |
| |
| BgFetchState::BgFetchState() |
| { |
| _policyLock = TSMutexCreate(); |
| if (nullptr == _policyLock) { |
| PrefetchError("failed to initialize lock"); |
| } else { |
| PrefetchDebug("initialized lock"); |
| } |
| |
| _lock = TSMutexCreate(); |
| if (nullptr == _lock) { |
| PrefetchError("failed to initialize lock"); |
| } else { |
| PrefetchDebug("initialized lock"); |
| } |
| } |
| |
| BgFetchState::~BgFetchState() |
| { |
| TSMutexLock(_policyLock); |
| delete _policy; |
| TSMutexUnlock(_policyLock); |
| |
| TSMutexLock(_lock); |
| delete _unique; |
| TSMutexUnlock(_lock); |
| |
| TSMutexDestroy(_policyLock); |
| TSMutexDestroy(_lock); |
| |
| TSTextLogObjectFlush(_log); |
| TSTextLogObjectDestroy(_log); |
| } |
| |
| static bool |
| initializePolicy(FetchPolicy *&policy, const char *policyName) |
| { |
| bool status = true; |
| if (nullptr == policy) { |
| policy = FetchPolicy::getInstance(policyName); |
| if (nullptr == policy) { |
| PrefetchError("failed to initialize the %s policy", policyName); |
| status = false; |
| } |
| } else { |
| PrefetchDebug("state already initialized"); |
| } |
| return status; |
| } |
| |
| bool |
| initializeMetrics(PrefetchMetricInfo metrics[], const PrefetchConfig &config) |
| { |
| bool status = true; |
| for (int i = FETCH_ACTIVE; i < FETCHES_MAX_METRICS; i++) { |
| if (-1 == metrics[i].id) { |
| status = createStat(config.getMetricsPrefix(), config.getNameSpace(), nullptr, getPrefetchMetricsNames(i), metrics[i].type, |
| metrics[i].id); |
| } else { |
| PrefetchDebug("metric %s already initialized", getPrefetchMetricsNames(i)); |
| } |
| } |
| return status; |
| } |
| |
| bool |
| initializeLog(TSTextLogObject &log, const PrefetchConfig &config) |
| { |
| bool status = true; |
| if (!config.getLogName().empty()) { |
| if (nullptr == log) { |
| TSReturnCode error = TSTextLogObjectCreate(config.getLogName().c_str(), TS_LOG_MODE_ADD_TIMESTAMP, &log); |
| if (error != TS_SUCCESS) { |
| PrefetchError("failed to create log file"); |
| status = false; |
| } else { |
| PrefetchDebug("initialized log file '%s'", config.getLogName().c_str()); |
| } |
| } else { |
| PrefetchDebug("log file '%s' already initialized", config.getLogName().c_str()); |
| } |
| } else { |
| PrefetchDebug("skip creating log file"); |
| } |
| return status; |
| } |
| |
| bool |
| BgFetchState::init(const PrefetchConfig &config) |
| { |
| int status = true; |
| |
| /* Is throttling configured, 0 - don't throttle */ |
| _concurrentFetchesMax = config.getFetchMax(); |
| |
| /* Initialize the state */ |
| TSMutexLock(_lock); |
| |
| /* Initialize 'simple' policy used to avoid concurrent fetches of the same object */ |
| status &= initializePolicy(_unique, "simple"); |
| |
| /* Initialize the fetch metrics */ |
| status &= initializeMetrics(_metrics, config); |
| |
| /* Initialize the "pre-fetch" log */ |
| status &= initializeLog(_log, config); |
| |
| TSMutexUnlock(_lock); |
| |
| /* Initialize fetching policy */ |
| TSMutexLock(_policyLock); |
| |
| if (!config.getFetchPolicy().empty() && 0 != config.getFetchPolicy().compare("simple")) { |
| status &= initializePolicy(_policy, config.getFetchPolicy().c_str()); |
| if (nullptr != _policy) { |
| setMetric(FETCH_POLICY_MAXSIZE, _policy->getMaxSize()); |
| } |
| } else { |
| PrefetchDebug("Policy not specified or 'simple' policy chosen (skipping)"); |
| } |
| |
| TSMutexUnlock(_policyLock); |
| |
| return status; |
| } |
| |
| bool |
| BgFetchState::acquire(const String &url) |
| { |
| bool permitted = true; |
| if (nullptr != _policy) { |
| TSMutexLock(_policyLock); |
| permitted = _policy->acquire(url); |
| TSMutexUnlock(_policyLock); |
| } |
| |
| if (permitted) { |
| incrementMetric(FETCH_POLICY_YES); |
| } else { |
| incrementMetric(FETCH_POLICY_NO); |
| } |
| |
| if (nullptr != _policy) { |
| setMetric(FETCH_POLICY_SIZE, _policy->getSize()); |
| } |
| |
| return permitted; |
| } |
| |
| bool |
| BgFetchState::release(const String &url) |
| { |
| bool ret = true; |
| if (nullptr != _policy) { |
| TSMutexLock(_policyLock); |
| ret &= _policy->release(url); |
| TSMutexUnlock(_policyLock); |
| } |
| |
| if (nullptr != _policy) { |
| setMetric(FETCH_POLICY_SIZE, _policy->getSize()); |
| } |
| |
| return ret; |
| } |
| |
| bool |
| BgFetchState::uniqueAcquire(const String &url) |
| { |
| bool permitted = true; |
| bool throttled = false; |
| size_t cachedCounter = 0; |
| |
| TSMutexLock(_lock); |
| if (0 == _concurrentFetchesMax || _concurrentFetches < _concurrentFetchesMax) { |
| permitted = _unique->acquire(url); |
| if (permitted) { |
| cachedCounter = ++_concurrentFetches; |
| } |
| } else { |
| throttled = true; |
| } |
| TSMutexUnlock(_lock); |
| |
| /* Update the metrics, no need to lock? */ |
| if (throttled) { |
| incrementMetric(FETCH_THROTTLED); |
| } |
| |
| if (permitted && !throttled) { |
| incrementMetric(FETCH_UNIQUE_YES); |
| incrementMetric(FETCH_TOTAL); |
| setMetric(FETCH_ACTIVE, cachedCounter); |
| } else { |
| incrementMetric(FETCH_UNIQUE_NO); |
| } |
| |
| return permitted; |
| } |
| |
| bool |
| BgFetchState::uniqueRelease(const String &url) |
| { |
| bool permitted = true; |
| ssize_t cachedCounter = 0; |
| |
| TSMutexLock(_lock); |
| cachedCounter = --_concurrentFetches; |
| permitted = _unique->release(url); |
| TSMutexUnlock(_lock); |
| |
| PrefetchDebug("cachedCounter: %zd", cachedCounter); |
| |
| /* Update the metrics, no need to lock? */ |
| if (permitted) { |
| setMetric(FETCH_ACTIVE, cachedCounter); |
| } |
| return permitted; |
| } |
| |
| void |
| BgFetchState::incrementMetric(PrefetchMetric m) |
| { |
| if (-1 != _metrics[m].id) { |
| TSStatIntIncrement(_metrics[m].id, 1); |
| } |
| } |
| |
| void |
| BgFetchState::setMetric(PrefetchMetric m, size_t value) |
| { |
| if (-1 != _metrics[m].id) { |
| TSStatIntSet(_metrics[m].id, value); |
| } |
| } |
| |
| inline TSTextLogObject |
| BgFetchState::getLog() |
| { |
| return _log; |
| } |
| BgFetchStates *BgFetchStates::_prefetchStates = nullptr; |
| |
| BgFetch::BgFetch(BgFetchState *state, const PrefetchConfig &config, bool lock) |
| : _headerLoc(TS_NULL_MLOC), |
| _urlLoc(TS_NULL_MLOC), |
| vc(nullptr), |
| req_io_buf(nullptr), |
| resp_io_buf(nullptr), |
| req_io_buf_reader(nullptr), |
| resp_io_buf_reader(nullptr), |
| r_vio(nullptr), |
| w_vio(nullptr), |
| _bytes(0), |
| _cont(nullptr), |
| _state(state), |
| _config(config), |
| _askPermission(lock), |
| _startTime(0) |
| { |
| _mbuf = TSMBufferCreate(); |
| memset(&client_ip, 0, sizeof(client_ip)); |
| } |
| |
| BgFetch::~BgFetch() |
| { |
| TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _headerLoc); |
| TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _urlLoc); |
| |
| TSMBufferDestroy(_mbuf); |
| |
| if (vc) { |
| PrefetchError("Destroyed BgFetch while VC was alive"); |
| TSVConnClose(vc); |
| vc = nullptr; |
| } |
| |
| if (nullptr != _cont) { |
| if (_askPermission) { |
| _state->release(_cachekey); |
| _state->uniqueRelease(_cachekey); |
| } |
| |
| TSContDestroy(_cont); |
| _cont = nullptr; |
| |
| TSIOBufferReaderFree(req_io_buf_reader); |
| TSIOBufferDestroy(req_io_buf); |
| TSIOBufferReaderFree(resp_io_buf_reader); |
| TSIOBufferDestroy(resp_io_buf); |
| } |
| } |
| |
| bool |
| BgFetch::schedule(BgFetchState *state, const PrefetchConfig &config, bool askPermission, TSMBuffer requestBuffer, |
| TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *path, size_t pathLen, const String &cachekey) |
| { |
| bool ret = false; |
| BgFetch *fetch = new BgFetch(state, config, askPermission); |
| if (fetch->init(requestBuffer, requestHeaderLoc, txnp, path, pathLen, cachekey)) { |
| fetch->schedule(); |
| ret = true; |
| } else { |
| delete fetch; |
| } |
| return ret; |
| } |
| |
| bool |
| BgFetch::saveIp(TSHttpTxn txnp) |
| { |
| struct sockaddr const *ip = TSHttpTxnClientAddrGet(txnp); |
| if (ip) { |
| if (ip->sa_family == AF_INET) { |
| memcpy(&client_ip, ip, sizeof(sockaddr_in)); |
| } else if (ip->sa_family == AF_INET6) { |
| memcpy(&client_ip, ip, sizeof(sockaddr_in6)); |
| } else { |
| PrefetchError("unknown address family %d", ip->sa_family); |
| } |
| } else { |
| PrefetchError("failed to get client host info"); |
| return false; |
| } |
| return true; |
| } |
| |
| inline void |
| BgFetch::addBytes(int64_t b) |
| { |
| _bytes += b; |
| } |
| /** |
| * Initialize the background fetch |
| */ |
| bool |
| BgFetch::init(TSMBuffer reqBuffer, TSMLoc reqHdrLoc, TSHttpTxn txnp, const char *fetchPath, size_t fetchPathLen, |
| const String &cachekey) |
| { |
| TSAssert(TS_NULL_MLOC == _headerLoc); |
| TSAssert(TS_NULL_MLOC == _urlLoc); |
| |
| if (_askPermission) { |
| if (!_state->acquire(cachekey)) { |
| PrefetchDebug("request is not fetchable"); |
| return false; |
| } |
| |
| if (!_state->uniqueAcquire(cachekey)) { |
| PrefetchDebug("already fetching the object"); |
| _state->release(cachekey); |
| return false; |
| } |
| } |
| |
| _cachekey.assign(cachekey); |
| |
| /* Save the IP info */ |
| if (!saveIp(txnp)) { |
| return false; |
| } |
| |
| /* Create HTTP header */ |
| _headerLoc = TSHttpHdrCreate(_mbuf); |
| |
| /* Copy the headers to the new marshal buffer */ |
| if (TS_SUCCESS != TSHttpHdrCopy(_mbuf, _headerLoc, reqBuffer, reqHdrLoc)) { |
| PrefetchError("header copy failed"); |
| } |
| |
| /* Copy the pristine request URL into fetch marshal buffer */ |
| TSMLoc pristineUrlLoc; |
| if (TS_SUCCESS == TSHttpTxnPristineUrlGet(txnp, &reqBuffer, &pristineUrlLoc)) { |
| if (TS_SUCCESS != TSUrlClone(_mbuf, reqBuffer, pristineUrlLoc, &_urlLoc)) { |
| PrefetchError("failed to clone URL"); |
| TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc); |
| return false; |
| } |
| TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc); |
| } else { |
| PrefetchError("failed to get pristine URL"); |
| return false; |
| } |
| |
| /* Save the path before changing */ |
| int pathLen; |
| const char *path = TSUrlPathGet(_mbuf, _urlLoc, &pathLen); |
| if (nullptr == path) { |
| PrefetchError("failed to get a URL path"); |
| return false; |
| } |
| |
| /* Now set or remove the prefetch API header */ |
| const String &header = _config.getApiHeader(); |
| if (_config.isFront()) { |
| if (setHeader(_mbuf, _headerLoc, header.c_str(), static_cast<int>(header.length()), path, pathLen)) { |
| PrefetchDebug("set header '%.*s: %.*s'", (int)header.length(), header.c_str(), (int)fetchPathLen, fetchPath); |
| } |
| } else { |
| if (removeHeader(_mbuf, _headerLoc, header.c_str(), header.length())) { |
| PrefetchDebug("remove header '%.*s'", (int)header.length(), header.c_str()); |
| } |
| } |
| |
| /* Make sure we remove the RANGE header to avoid 416 "Request Range Not Satisfiable" response when |
| * the current request is a RANGE request and its range turns out invalid for the "next" object */ |
| if (removeHeader(_mbuf, _headerLoc, TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE)) { |
| PrefetchDebug("remove header '%.*s'", TS_MIME_LEN_RANGE, TS_MIME_FIELD_RANGE); |
| } |
| |
| /* Overwrite the path if required */ |
| if (nullptr != fetchPath && 0 != fetchPathLen) { |
| if (TS_SUCCESS == TSUrlPathSet(_mbuf, _urlLoc, fetchPath, fetchPathLen)) { |
| PrefetchDebug("setting URL path to %.*s", (int)fetchPathLen, fetchPath); |
| } else { |
| PrefetchError("failed to set a URL path %.*s", (int)fetchPathLen, fetchPath); |
| } |
| } |
| |
| /* Come up with the host name to be used in the fetch request */ |
| const char *hostName = nullptr; |
| int hostNameLen = 0; |
| if (_config.getReplaceHost().empty()) { |
| hostName = TSUrlHostGet(_mbuf, _urlLoc, &hostNameLen); |
| } else { |
| hostName = _config.getReplaceHost().c_str(); |
| hostNameLen = _config.getReplaceHost().length(); |
| } |
| |
| /* Set the URI host */ |
| if (TS_SUCCESS == TSUrlHostSet(_mbuf, _urlLoc, hostName, hostNameLen)) { |
| PrefetchDebug("setting URL host: %.*s", hostNameLen, hostName); |
| } else { |
| PrefetchError("failed to set URL host: %.*s", hostNameLen, hostName); |
| } |
| |
| /* Set the host header */ |
| if (setHeader(_mbuf, _headerLoc, TS_MIME_FIELD_HOST, TS_MIME_LEN_HOST, hostName, hostNameLen)) { |
| PrefetchDebug("setting Host header: %.*s", hostNameLen, hostName); |
| } else { |
| PrefetchError("failed to set Host header: %.*s", hostNameLen, hostName); |
| } |
| |
| /* Save the URL to be fetched with this fetch for debugging purposes, expensive TSUrlStringGet() |
| * but really helpful when debugging multi-remap / host-replacement use cases */ |
| int urlLen = 0; |
| char *url = TSUrlStringGet(_mbuf, _urlLoc, &urlLen); |
| if (nullptr != url) { |
| _url.assign(url, urlLen); |
| TSfree(static_cast<void *>(url)); |
| } |
| |
| /* TODO: TBD is this the right place? */ |
| if (TS_SUCCESS != TSHttpHdrUrlSet(_mbuf, _headerLoc, _urlLoc)) { |
| return false; |
| } |
| |
| /* Initialization is success */ |
| return true; |
| } |
| |
| /** |
| * @brief Create, setup and schedule the background fetch continuation. |
| */ |
| void |
| BgFetch::schedule() |
| { |
| TSAssert(nullptr == _cont); |
| |
| /* Setup the continuation */ |
| _cont = TSContCreate(handler, TSMutexCreate()); |
| TSContDataSet(_cont, static_cast<void *>(this)); |
| |
| /* Initialize the VIO (for the fetch) */ |
| req_io_buf = TSIOBufferCreate(); |
| req_io_buf_reader = TSIOBufferReaderAlloc(req_io_buf); |
| resp_io_buf = TSIOBufferCreate(); |
| resp_io_buf_reader = TSIOBufferReaderAlloc(resp_io_buf); |
| |
| /* Schedule */ |
| PrefetchDebug("schedule fetch: %s", _url.c_str()); |
| _startTime = TShrtime(); |
| TSContScheduleOnPool(_cont, 0, TS_THREAD_POOL_NET); |
| } |
| |
| /* Log format is: name-space bytes status url */ |
| void |
| BgFetch::logAndMetricUpdate(TSEvent event) const |
| { |
| const char *status; |
| |
| switch (event) { |
| case TS_EVENT_VCONN_EOS: |
| status = "EOS"; |
| _state->incrementMetric(FETCH_COMPLETED); |
| break; |
| case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: |
| status = "TIMEOUT"; |
| _state->incrementMetric(FETCH_TIMEOOUTS); |
| break; |
| case TS_EVENT_ERROR: |
| _state->incrementMetric(FETCH_ERRORS); |
| status = "ERROR"; |
| break; |
| case TS_EVENT_VCONN_READ_COMPLETE: |
| _state->incrementMetric(FETCH_COMPLETED); |
| status = "READ_COMP"; |
| break; |
| default: |
| status = "UNKNOWN"; |
| break; |
| } |
| |
| if (TSIsDebugTagSet(PLUGIN_NAME "_log")) { |
| TSHRTime now = TShrtime(); |
| double elapsed = static_cast<double>(now - _startTime) / 1000000.0; |
| |
| PrefetchDebug("ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s", _config.getNameSpace().c_str(), _bytes, elapsed, |
| status, _url.c_str(), _cachekey.c_str()); |
| if (_state->getLog()) { |
| TSTextLogObjectWrite(_state->getLog(), "ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s", |
| _config.getNameSpace().c_str(), _bytes, elapsed, status, _url.c_str(), _cachekey.c_str()); |
| } |
| } |
| } |
| |
| /** |
| * @brief Continuation to perform a background fill of a URL. |
| * |
| * This is pretty expensive (memory allocations etc.) |
| */ |
| int |
| BgFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */) |
| { |
| BgFetch *fetch = static_cast<BgFetch *>(TSContDataGet(contp)); |
| int64_t avail; |
| |
| PrefetchDebug("event: %s (%d)", TSHttpEventNameLookup(event), event); |
| |
| switch (event) { |
| case TS_EVENT_IMMEDIATE: |
| case TS_EVENT_TIMEOUT: |
| // Debug info for this particular bg fetch (put all debug in here please) |
| if (TSIsDebugTagSet(PLUGIN_NAME)) { |
| char buf[INET6_ADDRSTRLEN]; |
| const sockaddr *sockaddress = reinterpret_cast<const sockaddr *>(&fetch->client_ip); |
| |
| switch (sockaddress->sa_family) { |
| case AF_INET: |
| inet_ntop(AF_INET, &(((struct sockaddr_in *)sockaddress)->sin_addr), buf, INET_ADDRSTRLEN); |
| PrefetchDebug("client IPv4 = %s", buf); |
| break; |
| case AF_INET6: |
| inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sockaddress)->sin6_addr), buf, INET6_ADDRSTRLEN); |
| PrefetchDebug("client IPv6 = %s", buf); |
| break; |
| default: |
| TSError("[%s] Unknown address family %d", PLUGIN_NAME, sockaddress->sa_family); |
| break; |
| } |
| PrefetchDebug("Starting background fetch."); |
| dumpHeaders(fetch->_mbuf, fetch->_headerLoc); |
| } |
| |
| // Setup the NetVC for background fetch |
| TSAssert(nullptr == fetch->vc); |
| if ((fetch->vc = TSHttpConnect(reinterpret_cast<sockaddr *>(&fetch->client_ip))) != nullptr) { |
| TSHttpHdrPrint(fetch->_mbuf, fetch->_headerLoc, fetch->req_io_buf); |
| // We never send a body with the request. ToDo: Do we ever need to support that ? |
| TSIOBufferWrite(fetch->req_io_buf, "\r\n", 2); |
| |
| fetch->r_vio = TSVConnRead(fetch->vc, contp, fetch->resp_io_buf, INT64_MAX); |
| fetch->w_vio = TSVConnWrite(fetch->vc, contp, fetch->req_io_buf_reader, TSIOBufferReaderAvail(fetch->req_io_buf_reader)); |
| } else { |
| delete fetch; |
| PrefetchError("Failed to connect to internal process, major malfunction"); |
| } |
| break; |
| |
| case TS_EVENT_VCONN_WRITE_COMPLETE: |
| // TSVConnShutdown(data->vc, 0, 1); |
| // TSVIOReenable(data->w_vio); |
| PrefetchDebug("write complete"); |
| break; |
| |
| case TS_EVENT_VCONN_READ_READY: |
| avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader); |
| fetch->addBytes(avail); |
| TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail); |
| TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail); |
| TSVIOReenable(fetch->r_vio); |
| break; |
| |
| case TS_EVENT_VCONN_READ_COMPLETE: |
| case TS_EVENT_VCONN_EOS: |
| case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: |
| case TS_EVENT_ERROR: |
| if (event == TS_EVENT_VCONN_INACTIVITY_TIMEOUT) { |
| PrefetchDebug("encountered Inactivity Timeout"); |
| TSVConnAbort(fetch->vc, TS_VC_CLOSE_ABORT); |
| } else { |
| TSVConnClose(fetch->vc); |
| } |
| |
| PrefetchDebug("closing background transaction"); |
| avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader); |
| fetch->addBytes(avail); |
| TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail); |
| TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail); |
| fetch->logAndMetricUpdate(event); |
| |
| /* Close, release and cleanup */ |
| fetch->vc = nullptr; |
| delete fetch; |
| break; |
| |
| default: |
| PrefetchDebug("unhandled event"); |
| break; |
| } |
| |
| return 0; |
| } |