| /** @file |
| |
| Implements callin functions for plugins |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "tscore/ink_config.h" |
| #include "FetchSM.h" |
| #include <cstdio> |
| #include "HTTP.h" |
| #include "PluginVC.h" |
| #include "ts/ts.h" // Ugly, but we need a bunch of the public APIs here ... :-/ |
| |
| #define DEBUG_TAG "FetchSM" |
| #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10) |
| |
| ClassAllocator<FetchSM> FetchSMAllocator("FetchSMAllocator"); |
| void |
| FetchSM::cleanUp() |
| { |
| Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__); |
| |
| if (!ink_atomic_cas(&destroyed, false, true)) { |
| Debug(DEBUG_TAG, "Error: Double delete on FetchSM, this:%p", this); |
| return; |
| } |
| |
| if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) { |
| chunked_handler.clear(); |
| } |
| |
| free_MIOBuffer(req_buffer); |
| free_MIOBuffer(resp_buffer); |
| mutex.clear(); |
| http_parser_clear(&http_parser); |
| client_response_hdr.destroy(); |
| ats_free(client_response); |
| cont_mutex.clear(); |
| http_vc->do_io_close(); |
| FetchSMAllocator.free(this); |
| } |
| |
| void |
| FetchSM::httpConnect() |
| { |
| PluginIdentity *pi = dynamic_cast<PluginIdentity *>(contp); |
| const char *tag = pi ? pi->getPluginTag() : "fetchSM"; |
| int64_t id = pi ? pi->getPluginId() : 0; |
| |
| Debug(DEBUG_TAG, "[%s] calling httpconnect write pi=%p tag=%s id=%" PRId64, __FUNCTION__, pi, tag, id); |
| http_vc = reinterpret_cast<PluginVC *>(TSHttpConnectWithPluginId(&_addr.sa, tag, id)); |
| |
| /* |
| * TS-2906: We need a way to unset internal request when using FetchSM, the use case for this |
| * is H2 when it creates outgoing requests it uses FetchSM and the outgoing requests |
| * are spawned via H2 SYN packets which are definitely not internal requests. |
| */ |
| if (!is_internal_request) { |
| PluginVC *other_side = reinterpret_cast<PluginVC *>(http_vc)->get_other_side(); |
| if (other_side != nullptr) { |
| other_side->set_is_internal_request(false); |
| } |
| } |
| |
| read_vio = http_vc->do_io_read(this, INT64_MAX, resp_buffer); |
| write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader); |
| } |
| |
| char * |
| FetchSM::resp_get(int *length) |
| { |
| *length = client_bytes; |
| return client_response; |
| } |
| |
| int |
| FetchSM::InvokePlugin(int event, void *data) |
| { |
| EThread *mythread = this_ethread(); |
| |
| MUTEX_TAKE_LOCK(contp->mutex, mythread); |
| |
| int ret = contp->handleEvent(event, data); |
| |
| MUTEX_UNTAKE_LOCK(contp->mutex, mythread); |
| |
| return ret; |
| } |
| |
| bool |
| FetchSM::has_body() |
| { |
| int status_code; |
| HTTPHdr *hdr; |
| |
| if (!header_done) { |
| return false; |
| } |
| |
| if (is_method_head) { |
| return false; |
| } |
| // |
| // The following code comply with HTTP/1.1: |
| // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4 |
| // |
| |
| hdr = &client_response_hdr; |
| |
| status_code = hdr->status_get(); |
| if (status_code < 200 || status_code == 204 || status_code == 304) { |
| return false; |
| } |
| |
| if (check_chunked()) { |
| return true; |
| } |
| |
| resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH); |
| if (!resp_content_length) { |
| if (check_connection_close()) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| bool |
| FetchSM::check_body_done() |
| { |
| if (!check_chunked()) { |
| if (resp_content_length == resp_received_body_len + resp_reader->read_avail()) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| // |
| // TODO: check whether the chunked body is done |
| // |
| return true; |
| } |
| |
| bool |
| FetchSM::check_for_field_value(const char *name, size_t name_len, char const *value, size_t value_len) |
| { |
| bool zret = false; // not found. |
| StrList slist; |
| HTTPHdr *hdr = &client_response_hdr; |
| int ret = hdr->value_get_comma_list(name, name_len, &slist); |
| |
| ink_release_assert(header_done); |
| |
| if (ret) { |
| for (Str *f = slist.head; f != nullptr; f = f->next) { |
| if (f->len == value_len && 0 == strncasecmp(f->str, value, value_len)) { |
| Debug(DEBUG_TAG, "[%s] field '%.*s', value '%.*s'", __FUNCTION__, static_cast<int>(name_len), name, |
| static_cast<int>(value_len), value); |
| zret = true; |
| break; |
| } |
| } |
| } |
| return zret; |
| } |
| |
| bool |
| FetchSM::check_chunked() |
| { |
| static const char CHUNKED_TEXT[] = "chunked"; |
| static size_t const CHUNKED_LEN = sizeof(CHUNKED_TEXT) - 1; |
| |
| if (resp_is_chunked < 0) { |
| resp_is_chunked = static_cast<int>( |
| this->check_for_field_value(MIME_FIELD_TRANSFER_ENCODING, MIME_LEN_TRANSFER_ENCODING, CHUNKED_TEXT, CHUNKED_LEN)); |
| |
| if (resp_is_chunked && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) { |
| ChunkedHandler *ch = &chunked_handler; |
| ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK); |
| ch->dechunked_reader = ch->dechunked_buffer->alloc_reader(); |
| ch->state = ChunkedHandler::CHUNK_READ_SIZE; |
| resp_reader->dealloc(); |
| } |
| } |
| return resp_is_chunked > 0; |
| } |
| |
| bool |
| FetchSM::check_connection_close() |
| { |
| static const char CLOSE_TEXT[] = "close"; |
| static size_t const CLOSE_LEN = sizeof(CLOSE_TEXT) - 1; |
| |
| if (resp_received_close < 0) { |
| resp_received_close = |
| static_cast<int>(this->check_for_field_value(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION, CLOSE_TEXT, CLOSE_LEN)); |
| } |
| return resp_received_close > 0; |
| } |
| |
| int |
| FetchSM::dechunk_body() |
| { |
| ink_assert(resp_is_chunked > 0); |
| // |
| // Return Value: |
| // - 0: need to read more data. |
| // - TS_FETCH_EVENT_EXT_BODY_READY. |
| // - TS_FETCH_EVENT_EXT_BODY_DONE. |
| // |
| if (chunked_handler.process_chunked_content()) { |
| return TS_FETCH_EVENT_EXT_BODY_DONE; |
| } |
| |
| if (chunked_handler.dechunked_reader->read_avail()) { |
| return TS_FETCH_EVENT_EXT_BODY_READY; |
| } |
| |
| return 0; |
| } |
| |
| void |
| FetchSM::InvokePluginExt(int fetch_event) |
| { |
| int event; |
| EThread *mythread = this_ethread(); |
| bool read_complete_event = (fetch_event == TS_EVENT_VCONN_READ_COMPLETE) || (fetch_event == TS_EVENT_VCONN_EOS); |
| |
| // |
| // Increasing *recursion* to prevent |
| // FetchSM being deleted by callback. |
| // |
| recursion++; |
| |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_TAKE_LOCK(cont_mutex, mythread); |
| } |
| |
| if (!contp) { |
| goto out; |
| } |
| |
| if (fetch_event && !read_complete_event) { |
| contp->handleEvent(fetch_event, this); |
| goto out; |
| } |
| |
| if (!has_sent_header) { |
| if (fetch_event != TS_EVENT_VCONN_EOS) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this); |
| has_sent_header = true; |
| } else { |
| contp->handleEvent(fetch_event, this); |
| goto out; |
| } |
| } |
| |
| // TS-3112: always check 'contp' after handleEvent() |
| // since handleEvent effectively calls the plugin (or H2 layer) |
| // which may call TSFetchDestroy in error conditions. |
| // TSFetchDestroy sets contp to NULL, but, doesn't destroy FetchSM yet, |
| // since, it¹s in a tight loop protected by 'recursion' counter. |
| // When handleEvent returns, 'recursion' is decremented and contp is |
| // already null, so, FetchSM gets destroyed. |
| if (!contp) { |
| goto out; |
| } |
| |
| if (!has_body()) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); |
| goto out; |
| } |
| |
| Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", received_len: %" PRId64 ", avail: %" PRId64 "", __FUNCTION__, |
| resp_is_chunked, resp_content_length, resp_received_body_len, |
| resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail()); |
| |
| if (resp_is_chunked > 0) { |
| if (!chunked_handler.chunked_reader->read_avail()) { |
| if (read_complete_event) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); |
| } |
| goto out; |
| } |
| } else if (!resp_reader->read_avail()) { |
| if (read_complete_event) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); |
| } |
| goto out; |
| } |
| |
| if (!check_chunked()) { |
| if (!check_body_done() && !read_complete_event) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this); |
| } else { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); |
| } |
| } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) { |
| do { |
| if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) { |
| chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START; |
| } |
| |
| event = dechunk_body(); |
| if (!event) { |
| read_vio->reenable(); |
| goto out; |
| } |
| |
| contp->handleEvent(event, this); |
| |
| // contp may be null after handleEvent |
| if (!contp) { |
| goto out; |
| } |
| |
| } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL); |
| } else if (check_body_done()) { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); |
| } else { |
| contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this); |
| } |
| |
| out: |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_UNTAKE_LOCK(cont_mutex, mythread); |
| } |
| recursion--; |
| |
| if (!contp && !recursion) { |
| cleanUp(); |
| } |
| |
| return; |
| } |
| |
| void |
| FetchSM::get_info_from_buffer(IOBufferReader *reader) |
| { |
| char *buf, *info; |
| IOBufferBlock *blk; |
| int64_t read_avail, read_done; |
| |
| if (!reader) { |
| client_bytes = 0; |
| return; |
| } |
| |
| /* Read the data out of the reader */ |
| if (reader->block != NULL) |
| reader->skip_empty_blocks(); |
| |
| read_avail = reader->read_avail(); |
| Debug(DEBUG_TAG, "[%s] total avail %" PRId64, __FUNCTION__, read_avail); |
| if (!read_avail) { |
| client_bytes = 0; |
| return; |
| } |
| |
| info = (char *)ats_malloc(sizeof(char) * (read_avail + 1)); |
| client_response = info; |
| |
| blk = reader->block.get(); |
| |
| // This is the equivalent of TSIOBufferBlockReadStart() |
| buf = blk->start() + reader->start_offset; |
| read_done = blk->read_avail() - reader->start_offset; |
| |
| if (header_done == 0 && read_done > 0) { |
| int bytes_used = 0; |
| header_done = 1; |
| if (client_response_hdr.parse_resp(&http_parser, reader, &bytes_used, 0) == PARSE_RESULT_DONE) { |
| if ((bytes_used > 0) && (bytes_used <= read_avail)) { |
| memcpy(info, buf, bytes_used); |
| info += bytes_used; |
| client_bytes += bytes_used; |
| } |
| } else { |
| Error("Failed to parse headers in FetchSM buffer"); |
| } |
| // adjust the read_avail |
| read_avail -= bytes_used; |
| } |
| |
| // Send the body without dechunk when neither streaming nor dechunk flag is set |
| // Or when the body is not chunked |
| if (!((fetch_flags & TS_FETCH_FLAGS_STREAM) || (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) || !check_chunked()) { |
| /* Read the data out of the reader */ |
| while (read_avail > 0) { |
| if (reader->block) { |
| reader->skip_empty_blocks(); |
| } |
| |
| blk = reader->block.get(); |
| |
| // This is the equivalent of TSIOBufferBlockReadStart() |
| buf = blk->start() + reader->start_offset; |
| read_done = blk->read_avail() - reader->start_offset; |
| |
| if ((read_done > 0) && ((read_done <= read_avail))) { |
| memcpy(info, buf, read_done); |
| reader->consume(read_done); |
| read_avail -= read_done; |
| info += read_done; |
| client_bytes += read_done; |
| } |
| } |
| client_response[client_bytes] = '\0'; |
| return; |
| } |
| |
| reader = chunked_handler.dechunked_reader; |
| do { |
| if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) { |
| chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START; |
| } |
| |
| if (!dechunk_body()) { |
| break; |
| } |
| |
| /* Read the data out of the reader */ |
| read_avail = reader->read_avail(); |
| while (read_avail > 0) { |
| if (reader->block) { |
| reader->skip_empty_blocks(); |
| } |
| |
| IOBufferBlock *blk = reader->block.get(); |
| |
| // This is the equivalent of TSIOBufferBlockReadStart() |
| buf = blk->start() + reader->start_offset; |
| read_done = blk->read_avail() - reader->start_offset; |
| |
| if ((read_done > 0) && (read_done <= read_avail)) { |
| memcpy(info, buf, read_done); |
| reader->consume(read_done); |
| read_avail -= read_done; |
| info += read_done; |
| client_bytes += read_done; |
| } |
| } |
| } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL); |
| |
| client_response[client_bytes] = '\0'; |
| return; |
| } |
| |
| void |
| FetchSM::process_fetch_read(int event) |
| { |
| Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__); |
| int64_t bytes; |
| int bytes_used; |
| int64_t total_bytes_copied = 0; |
| |
| switch (event) { |
| case TS_EVENT_VCONN_READ_READY: |
| // duplicate the bytes for backward compatibility with TSFetchUrl() |
| if (!(fetch_flags & TS_FETCH_FLAGS_STREAM)) { |
| bytes = resp_reader->read_avail(); |
| Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes); |
| |
| while (total_bytes_copied < bytes) { |
| int64_t actual_bytes_copied; |
| actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0); |
| Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied); |
| if (actual_bytes_copied <= 0) { |
| break; |
| } |
| total_bytes_copied += actual_bytes_copied; |
| } |
| Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied); |
| resp_reader->consume(total_bytes_copied); |
| } |
| |
| if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) { |
| if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, false) == PARSE_RESULT_DONE) { |
| header_done = true; |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| return InvokePluginExt(); |
| } else { |
| InvokePlugin(callback_events.success_event_id, (void *)&client_response_hdr); |
| } |
| } |
| } else { |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| return InvokePluginExt(); |
| } |
| } |
| read_vio->reenable(); |
| break; |
| case TS_EVENT_VCONN_READ_COMPLETE: |
| case TS_EVENT_VCONN_EOS: |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| return InvokePluginExt(event); |
| } |
| if (callback_options == AFTER_HEADER || callback_options == AFTER_BODY) { |
| get_info_from_buffer(resp_reader); |
| InvokePlugin(callback_events.success_event_id, (void *)this); |
| } |
| Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__); |
| cleanUp(); |
| break; |
| case TS_EVENT_ERROR: |
| default: |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| return InvokePluginExt(event); |
| } |
| InvokePlugin(callback_events.failure_event_id, nullptr); |
| cleanUp(); |
| break; |
| } |
| } |
| |
| void |
| FetchSM::process_fetch_write(int event) |
| { |
| Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__); |
| switch (event) { |
| case TS_EVENT_VCONN_WRITE_COMPLETE: |
| req_finished = true; |
| break; |
| case TS_EVENT_VCONN_WRITE_READY: |
| // data is processed in chunks of 32k; if there is more than 32k |
| // of input data, we have to continue reenabling until all data is |
| // read (we have already written all the data to the buffer) |
| if (req_reader->read_avail() > 0) { |
| ((PluginVC *)http_vc)->reenable(write_vio); |
| } |
| break; |
| case TS_EVENT_ERROR: |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| return InvokePluginExt(event); |
| } |
| InvokePlugin(callback_events.failure_event_id, nullptr); |
| cleanUp(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| int |
| FetchSM::fetch_handler(int event, void *edata) |
| { |
| Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__); |
| |
| if (edata == read_vio) { |
| process_fetch_read(event); |
| } else if (edata == write_vio) { |
| process_fetch_write(event); |
| } else { |
| if (fetch_flags & TS_FETCH_FLAGS_STREAM) { |
| InvokePluginExt(event); |
| return 1; |
| } |
| InvokePlugin(callback_events.failure_event_id, nullptr); |
| cleanUp(); |
| } |
| return 1; |
| } |
| |
| void |
| FetchSM::ext_init(Continuation *cont, const char *method, const char *url, const char *version, const sockaddr *client_addr, |
| int flags) |
| { |
| init_comm(); |
| |
| if (flags & TS_FETCH_FLAGS_NEWLOCK) { |
| mutex = new_ProxyMutex(); |
| cont_mutex = cont->mutex; |
| } else { |
| mutex = cont->mutex; |
| } |
| |
| contp = cont; |
| _addr.assign(client_addr); |
| |
| // |
| // Enable stream IO automatically. |
| // |
| fetch_flags = (TS_FETCH_FLAGS_STREAM | flags); |
| if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) { |
| set_internal_request(false); |
| } |
| |
| // |
| // These options are not used when enable |
| // stream IO. |
| // |
| memset(&callback_options, 0, sizeof(callback_options)); |
| memset(&callback_events, 0, sizeof(callback_events)); |
| |
| int method_len = strlen(method); |
| req_buffer->write(method, method_len); |
| req_buffer->write(" ", 1); |
| req_buffer->write(url, strlen(url)); |
| req_buffer->write(" ", 1); |
| req_buffer->write(version, strlen(version)); |
| req_buffer->write("\r\n", 2); |
| |
| if ((method_len == HTTP_LEN_HEAD) && !memcmp(method, HTTP_METHOD_HEAD, HTTP_LEN_HEAD)) { |
| is_method_head = true; |
| } |
| } |
| |
| void |
| FetchSM::ext_add_header(const char *name, int name_len, const char *value, int value_len) |
| { |
| if (TS_MIME_LEN_CONTENT_LENGTH == name_len && !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) { |
| req_content_length = atoll(value); |
| } |
| |
| req_buffer->write(name, name_len); |
| req_buffer->write(": ", 2); |
| req_buffer->write(value, value_len); |
| req_buffer->write("\r\n", 2); |
| } |
| |
| void |
| FetchSM::ext_launch() |
| { |
| req_buffer->write("\r\n", 2); |
| httpConnect(); |
| } |
| |
| void |
| FetchSM::ext_write_data(const void *data, size_t len) |
| { |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_TAKE_LOCK(mutex, this_ethread()); |
| } |
| req_buffer->write(data, len); |
| |
| Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done); |
| write_vio->reenable(); |
| |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_UNTAKE_LOCK(mutex, this_ethread()); |
| } |
| } |
| |
| ssize_t |
| FetchSM::ext_read_data(char *buf, size_t len) |
| { |
| const char *start; |
| TSIOBufferReader reader; |
| TSIOBufferBlock blk, next_blk; |
| int64_t already, blk_len, need, wavail; |
| |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_TRY_LOCK(lock, mutex, this_ethread()); |
| if (!lock.is_locked()) { |
| return 0; |
| } |
| } |
| |
| if (!header_done) { |
| return 0; |
| } |
| |
| if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) { |
| reader = (tsapi_bufferreader *)chunked_handler.dechunked_reader; |
| } else { |
| reader = (TSIOBufferReader)resp_reader; |
| } |
| |
| already = 0; |
| blk = TSIOBufferReaderStart(reader); |
| |
| while (blk) { |
| wavail = len - already; |
| |
| next_blk = TSIOBufferBlockNext(blk); |
| start = TSIOBufferBlockReadStart(blk, reader, &blk_len); |
| |
| need = blk_len > wavail ? wavail : blk_len; |
| |
| memcpy(&buf[already], start, need); |
| already += need; |
| |
| if (already >= static_cast<int64_t>(len)) { |
| break; |
| } |
| |
| blk = next_blk; |
| } |
| |
| resp_received_body_len += already; |
| TSIOBufferReaderConsume(reader, already); |
| |
| read_vio->reenable(); |
| return already; |
| } |
| |
| void |
| FetchSM::ext_destroy() |
| { |
| contp = nullptr; |
| |
| if (recursion) { |
| return; |
| } |
| |
| if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { |
| MUTEX_TRY_LOCK(lock, mutex, this_ethread()); |
| if (!lock.is_locked()) { |
| eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME); |
| return; |
| } |
| } |
| |
| cleanUp(); |
| } |
| |
| void |
| FetchSM::ext_set_user_data(void *data) |
| { |
| user_data = data; |
| } |
| |
| void * |
| FetchSM::ext_get_user_data() |
| { |
| return user_data; |
| } |
| |
| TSMBuffer |
| FetchSM::resp_hdr_bufp() |
| { |
| HdrHeapSDKHandle *heap; |
| heap = (HdrHeapSDKHandle *)&client_response_hdr; |
| |
| return (TSMBuffer)heap; |
| } |
| |
| TSMLoc |
| FetchSM::resp_hdr_mloc() |
| { |
| return (TSMLoc)client_response_hdr.m_http; |
| } |