blob: d1c37f6af0ca3d0e759777aef6d842fb45152e66 [file] [log] [blame]
/** @file
Http2Stream.cc
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "HTTP2.h"
#include "Http2Stream.h"
#include "Http2ClientSession.h"
#include "../http/HttpSM.h"
#include <numeric>
#define Http2StreamDebug(fmt, ...) \
SsnDebug(parent, "http2_stream", "[%" PRId64 "] [%u] " fmt, parent->connection_id(), this->get_id(), ##__VA_ARGS__);
ClassAllocator<Http2Stream> http2StreamAllocator("http2StreamAllocator");
int
Http2Stream::main_event_handler(int event, void *edata)
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
if (!this->_switch_thread_if_not_on_right_thread(event, edata)) {
// Not on the right thread
return 0;
}
ink_release_assert(this->_thread == this_ethread());
Event *e = static_cast<Event *>(edata);
reentrancy_count++;
if (e == cross_thread_event) {
cross_thread_event = nullptr;
} else if (e == active_event) {
event = VC_EVENT_ACTIVE_TIMEOUT;
active_event = nullptr;
} else if (e == inactive_event) {
if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
event = VC_EVENT_INACTIVITY_TIMEOUT;
clear_inactive_timer();
}
} else if (e == read_event) {
read_event = nullptr;
} else if (e == write_event) {
write_event = nullptr;
} else if (e == buffer_full_write_event) {
buffer_full_write_event = nullptr;
}
switch (event) {
case VC_EVENT_ACTIVE_TIMEOUT:
case VC_EVENT_INACTIVITY_TIMEOUT:
if (current_reader && read_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked()) {
read_vio.cont->handleEvent(event, &read_vio);
} else {
this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
}
} else if (current_reader && write_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked()) {
write_vio.cont->handleEvent(event, &write_vio);
} else {
this_ethread()->schedule_imm(write_vio.cont, event, &write_vio);
}
}
break;
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE:
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &write_vio) {
if (write_vio.mutex) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked() && write_vio.cont && this->current_reader) {
write_vio.cont->handleEvent(event, &write_vio);
} else {
this_ethread()->schedule_imm(write_vio.cont, event, &write_vio);
}
}
} else {
update_write_request(write_vio.get_reader(), INT64_MAX, true);
}
break;
case VC_EVENT_READ_COMPLETE:
case VC_EVENT_READ_READY:
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &read_vio) {
if (read_vio.mutex) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked() && read_vio.cont && this->current_reader) {
read_vio.cont->handleEvent(event, &read_vio);
} else {
this_ethread()->schedule_imm(read_vio.cont, event, &read_vio);
}
}
} else {
this->update_read_request(INT64_MAX, true);
}
break;
case VC_EVENT_EOS:
if (e->cookie == &read_vio) {
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
read_vio.cont->handleEvent(VC_EVENT_EOS, &read_vio);
} else if (e->cookie == &write_vio) {
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
write_vio.cont->handleEvent(VC_EVENT_EOS, &write_vio);
}
break;
}
reentrancy_count--;
// Clean stream up if the terminate flag is set and we are at the bottom of the handler stack
terminate_if_possible();
return 0;
}
Http2ErrorCode
Http2Stream::decode_header_blocks(HpackHandle &hpack_handle, uint32_t maximum_table_size)
{
return http2_decode_header_blocks(&_req_header, (const uint8_t *)header_blocks, header_blocks_length, nullptr, hpack_handle,
trailing_header, maximum_table_size);
}
void
Http2Stream::send_request(Http2ConnectionState &cstate)
{
// Convert header to HTTP/1.1 format
http2_convert_header_from_2_to_1_1(&_req_header);
// Write header to a buffer. Borrowing logic from HttpSM::write_header_into_buffer.
// Seems like a function like this ought to be in HTTPHdr directly
int bufindex;
int dumpoffset = 0;
int done, tmp;
IOBufferBlock *block;
do {
bufindex = 0;
tmp = dumpoffset;
block = request_buffer.get_current_block();
if (!block) {
request_buffer.add_block();
block = request_buffer.get_current_block();
}
done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
dumpoffset += bufindex;
request_buffer.fill(bufindex);
if (!done) {
request_buffer.add_block();
}
} while (!done);
// Is there a read_vio request waiting?
this->update_read_request(INT64_MAX, true);
}
bool
Http2Stream::change_state(uint8_t type, uint8_t flags)
{
switch (_state) {
case Http2StreamState::HTTP2_STREAM_STATE_IDLE:
if (type == HTTP2_FRAME_TYPE_HEADERS) {
if (recv_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
_state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
}
} else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
if (recv_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
_state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
}
} else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) {
_state = Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL;
} else {
return false;
}
break;
case Http2StreamState::HTTP2_STREAM_STATE_OPEN:
if (type == HTTP2_FRAME_TYPE_RST_STREAM) {
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
} else if (type == HTTP2_FRAME_TYPE_DATA) {
if (recv_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
// Do not change state
}
} else {
// A stream in the "open" state may be used by both peers to send frames of any type.
return true;
}
break;
case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL:
if (type == HTTP2_FRAME_TYPE_HEADERS) {
if (flags & HTTP2_FLAGS_HEADERS_END_HEADERS) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
}
} else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
if (flags & HTTP2_FLAGS_CONTINUATION_END_HEADERS) {
_state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
}
} else {
return false;
}
break;
case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_REMOTE:
// Currently ATS supports only HTTP/2 server features
return false;
case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL:
if (type == HTTP2_FRAME_TYPE_RST_STREAM || recv_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
} else {
// Error, set state closed
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
return false;
}
break;
case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE:
if (type == HTTP2_FRAME_TYPE_RST_STREAM || send_end_stream) {
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
} else if (type == HTTP2_FRAME_TYPE_HEADERS) { // w/o END_STREAM flag
// No state change here. Expect a following DATA frame with END_STREAM flag.
return true;
} else {
// Error, set state closed
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
return false;
}
break;
case Http2StreamState::HTTP2_STREAM_STATE_CLOSED:
// No state changing
return true;
default:
return false;
}
Http2StreamDebug("%s", Http2DebugNames::get_state_name(_state));
return true;
}
VIO *
Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
{
if (buf) {
read_vio.buffer.writer_for(buf);
} else {
read_vio.buffer.clear();
}
read_vio.mutex = c ? c->mutex : this->mutex;
read_vio.cont = c;
read_vio.nbytes = nbytes;
read_vio.ndone = 0;
read_vio.vc_server = this;
read_vio.op = VIO::READ;
// Is there already data in the request_buffer? If so, copy it over and then
// schedule a READ_READY or READ_COMPLETE event after we return.
update_read_request(nbytes, false, true);
return &read_vio;
}
VIO *
Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
{
if (abuffer) {
write_vio.buffer.reader_for(abuffer);
} else {
write_vio.buffer.clear();
}
write_vio.mutex = c ? c->mutex : this->mutex;
write_vio.cont = c;
write_vio.nbytes = nbytes;
write_vio.ndone = 0;
write_vio.vc_server = this;
write_vio.op = VIO::WRITE;
response_reader = abuffer;
update_write_request(abuffer, nbytes, false);
return &write_vio;
}
// Initiated from SM
void
Http2Stream::do_io_close(int /* flags */)
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
super::release(nullptr);
if (!closed) {
Http2StreamDebug("do_io_close");
// When we get here, the SM has initiated the shutdown. Either it received a WRITE_COMPLETE, or it is shutting down. Any
// remaining IO operations back to client should be abandoned. The SM-side buffers backing these operations will be deleted
// by the time this is called from transaction_done.
closed = true;
if (parent && this->is_client_state_writeable()) {
// Make sure any trailing end of stream frames are sent
// Wee will be removed at send_data_frames or closing connection phase
static_cast<Http2ClientSession *>(parent)->connection_state.send_data_frames(this);
}
clear_timers();
clear_io_events();
// Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed
}
}
/*
* HttpSM has called TXN_close hooks.
*/
void
Http2Stream::transaction_done()
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
if (cross_thread_event) {
cross_thread_event->cancel();
cross_thread_event = nullptr;
}
if (!closed) {
do_io_close(); // Make sure we've been closed. If we didn't close the parent session better still be open
}
ink_release_assert(closed || !static_cast<Http2ClientSession *>(parent)->connection_state.is_state_closed());
current_reader = nullptr;
if (closed) {
// Safe to initiate SSN_CLOSE if this is the last stream
ink_assert(cross_thread_event == nullptr);
// Schedule the destroy to occur after we unwind here. IF we call directly, may delete with reference on the stack.
terminate_stream = true;
terminate_if_possible();
}
}
void
Http2Stream::terminate_if_possible()
{
if (terminate_stream && reentrancy_count == 0) {
Http2ClientSession *h2_parent = static_cast<Http2ClientSession *>(parent);
SCOPED_MUTEX_LOCK(lock, h2_parent->connection_state.mutex, this_ethread());
h2_parent->connection_state.delete_stream(this);
destroy();
}
}
// Initiated from the Http2 side
void
Http2Stream::initiating_close()
{
if (!closed) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
Http2StreamDebug("initiating_close");
// Set the state of the connection to closed
// TODO - these states should be combined
closed = true;
_state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
// leaving the reference to the SM, so we can detach from the SM when we actually destroy
// current_reader = NULL;
// Leaving reference to client session as well, so we can signal once the
// TXN_CLOSE has been sent
// parent = NULL;
clear_timers();
clear_io_events();
// This should result in do_io_close or release being called. That will schedule the final
// kill yourself signal
// Send the SM the EOS signal if there are no active VIO's to signal
// We are sending signals rather than calling the handlers directly to avoid the case where
// the HttpTunnel handler causes the HttpSM to be deleted on the stack.
bool sent_write_complete = false;
if (current_reader) {
// Push out any last IO events
if (write_vio.cont) {
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
// Are we done?
if (write_vio.nbytes == write_vio.ndone) {
Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_WRITE_COMPLETE);
write_event = send_tracked_event(write_event, VC_EVENT_WRITE_COMPLETE, &write_vio);
} else {
write_event = send_tracked_event(write_event, VC_EVENT_EOS, &write_vio);
Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_EOS);
}
sent_write_complete = true;
}
}
// Send EOS to let SM know that we aren't sticking around
if (current_reader && read_vio.cont) {
// Only bother with the EOS if we haven't sent the write complete
if (!sent_write_complete) {
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
Http2StreamDebug("send EOS to read cont");
read_event = send_tracked_event(read_event, VC_EVENT_EOS, &read_vio);
}
} else if (current_reader) {
SCOPED_MUTEX_LOCK(lock, current_reader->mutex, this_ethread());
current_reader->handleEvent(VC_EVENT_ERROR);
} else if (!sent_write_complete) {
// Transaction is already gone or not started. Kill yourself
do_io_close();
destroy();
}
}
}
/* Replace existing event only if the new event is different than the inprogress event */
Event *
Http2Stream::send_tracked_event(Event *event, int send_event, VIO *vio)
{
if (event != nullptr) {
if (event->callback_event != send_event) {
event->cancel();
event = nullptr;
}
}
if (event == nullptr) {
event = this_ethread()->schedule_imm(this, send_event, vio);
}
return event;
}
void
Http2Stream::update_read_request(int64_t read_len, bool call_update, bool check_eos)
{
if (closed || parent == nullptr || current_reader == nullptr || read_vio.mutex == nullptr) {
return;
}
if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_READ_READY, nullptr)) {
// Not on the right thread
return;
}
ink_release_assert(this->_thread == this_ethread());
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) {
// If this vio has a different buffer, we must copy
ink_release_assert(this_ethread() == this->_thread);
if (read_vio.buffer.writer() != (&request_buffer)) {
int64_t num_to_read = read_vio.nbytes - read_vio.ndone;
if (num_to_read > read_len) {
num_to_read = read_len;
}
if (num_to_read > 0) {
int bytes_added = read_vio.buffer.writer()->write(request_reader, num_to_read);
if (bytes_added > 0 || (check_eos && recv_end_stream)) {
request_reader->consume(bytes_added);
read_vio.ndone += bytes_added;
int send_event = (read_vio.nbytes == read_vio.ndone || recv_end_stream) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio.cont && this->current_reader) {
read_vio.cont->handleEvent(send_event, &read_vio);
}
} else { // Called from do_io_read. Still setting things up. Send event to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
} else {
// Try to be smart and only signal if there was additional data
int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (request_reader->read_avail() > 0 || send_event == VC_EVENT_READ_COMPLETE) {
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio.cont && this->current_reader) {
read_vio.cont->handleEvent(send_event, &read_vio);
}
} else { // Called from do_io_read. Still setting things up. Send event
// to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
}
}
void
Http2Stream::restart_sending()
{
this->send_response_body(true);
}
void
Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool call_update)
{
if (!this->is_client_state_writeable() || closed || parent == nullptr || write_vio.mutex == nullptr ||
(buf_reader == nullptr && write_len == 0)) {
return;
}
if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_WRITE_READY, nullptr)) {
// Not on the right thread
return;
}
ink_release_assert(this->_thread == this_ethread());
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
// if response is chunked, limit the dechunked_buffer size.
bool is_done = false;
if (this->chunked) {
if (chunked_handler.dechunked_buffer && chunked_handler.dechunked_buffer->max_read_avail() > HTTP2_MAX_BUFFER_USAGE) {
if (buffer_full_write_event == nullptr) {
buffer_full_write_event = _thread->schedule_imm(this, VC_EVENT_WRITE_READY);
}
} else {
this->response_process_data(is_done);
}
}
if (this->response_get_data_reader() == nullptr) {
return;
}
int64_t bytes_avail = this->response_get_data_reader()->read_avail();
if (write_vio.nbytes > 0 && write_vio.ntodo() > 0) {
int64_t num_to_write = write_vio.ntodo();
if (num_to_write > write_len) {
num_to_write = write_len;
}
if (bytes_avail > num_to_write) {
bytes_avail = num_to_write;
}
}
Http2StreamDebug("write_vio.nbytes=%" PRId64 ", write_vio.ndone=%" PRId64 ", write_vio.write_avail=%" PRId64
", reader.read_avail=%" PRId64,
write_vio.nbytes, write_vio.ndone, write_vio.get_writer()->write_avail(), bytes_avail);
if (bytes_avail <= 0 && !is_done) {
return;
}
// Process the new data
if (!this->response_header_done) {
// Still parsing the response_header
int bytes_used = 0;
int state = this->response_header.parse_resp(&http_parser, this->response_reader, &bytes_used, false);
// HTTPHdr::parse_resp() consumed the response_reader in above
write_vio.ndone += this->response_header.length_get();
switch (state) {
case PARSE_RESULT_DONE: {
this->response_header_done = true;
// Schedule session shutdown if response header has "Connection: close"
MIMEField *field = this->response_header.field_find(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION);
if (field) {
int len;
const char *value = field->value_get(&len);
if (memcmp(HTTP_VALUE_CLOSE, value, HTTP_LEN_CLOSE) == 0) {
SCOPED_MUTEX_LOCK(lock, parent->connection_state.mutex, this_ethread());
if (parent->connection_state.get_shutdown_state() == HTTP2_SHUTDOWN_NONE) {
parent->connection_state.set_shutdown_state(HTTP2_SHUTDOWN_NOT_INITIATED, Http2ErrorCode::HTTP2_ERROR_NO_ERROR);
}
}
}
{
SCOPED_MUTEX_LOCK(lock, parent->connection_state.mutex, this_ethread());
// Send the response header back
parent->connection_state.send_headers_frame(this);
}
// See if the response is chunked. Set up the dechunking logic if it is
// Make sure to check if the chunk is complete and signal appropriately
this->response_initialize_data_handling(is_done);
// If there is additional data, send it along in a data frame. Or if this was header only
// make sure to send the end of stream
is_done |= (write_vio.ntodo() + this->response_header.length_get()) == bytes_avail;
if (this->response_is_data_available() || is_done) {
this->send_response_body(call_update);
}
break;
}
case PARSE_RESULT_CONT:
// Let it ride for next time
break;
default:
break;
}
} else {
this->send_response_body(call_update);
}
return;
}
void
Http2Stream::signal_write_event(bool call_update)
{
if (this->write_vio.cont == nullptr || this->write_vio.op == VIO::NONE) {
return;
}
if (this->write_vio.get_writer()->write_avail() == 0) {
return;
}
int send_event = this->write_vio.ntodo() == 0 ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
if (call_update) {
// Coming from reenable. Safe to call the handler directly
if (write_vio.cont && this->current_reader) {
write_vio.cont->handleEvent(send_event, &write_vio);
}
} else {
// Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
write_event = send_tracked_event(write_event, send_event, &write_vio);
}
}
void
Http2Stream::push_promise(URL &url, const MIMEField *accept_encoding)
{
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
SCOPED_MUTEX_LOCK(lock, parent->connection_state.mutex, this_ethread());
parent->connection_state.send_push_promise_frame(this, url, accept_encoding);
}
void
Http2Stream::send_response_body(bool call_update)
{
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (Http2::stream_priority_enabled) {
SCOPED_MUTEX_LOCK(lock, parent->connection_state.mutex, this_ethread());
parent->connection_state.schedule_stream(this);
// signal_write_event() will be called from `Http2ConnectionState::send_data_frames_depends_on_priority()`
// when write_vio is consumed
} else {
SCOPED_MUTEX_LOCK(lock, parent->connection_state.mutex, this_ethread());
parent->connection_state.send_data_frames(this);
this->signal_write_event(call_update);
// XXX The call to signal_write_event can destroy/free the Http2Stream.
// Don't modify the Http2Stream after calling this method.
}
}
void
Http2Stream::reenable(VIO *vio)
{
if (this->parent) {
if (vio->op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
update_write_request(vio->get_reader(), INT64_MAX, true);
} else if (vio->op == VIO::READ) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
update_read_request(INT64_MAX, true);
}
}
}
void
Http2Stream::destroy()
{
Http2StreamDebug("Destroy stream, sent %" PRIu64 " bytes", this->bytes_sent);
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
// Clean up after yourself if this was an EOS
ink_release_assert(this->closed);
ink_release_assert(reentrancy_count == 0);
// Safe to initiate SSN_CLOSE if this is the last stream
if (parent) {
Http2ClientSession *h2_parent = static_cast<Http2ClientSession *>(parent);
SCOPED_MUTEX_LOCK(lock, h2_parent->connection_state.mutex, this_ethread());
// Make sure the stream is removed from the stream list and priority tree
// In many cases, this has been called earlier, so this call is a no-op
h2_parent->connection_state.delete_stream(this);
// Update session's stream counts, so it accurately goes into keep-alive state
h2_parent->connection_state.release_stream(this);
}
// Clean up the write VIO in case of inactivity timeout
this->do_io_write(nullptr, 0, nullptr);
HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
ink_hrtime end_time = Thread::get_hrtime();
HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, _thread, end_time - _start_time);
_req_header.destroy();
response_header.destroy();
// Drop references to all buffer data
request_buffer.clear();
// Free the mutexes in the VIO
read_vio.mutex.clear();
write_vio.mutex.clear();
if (header_blocks) {
ats_free(header_blocks);
}
chunked_handler.clear();
clear_timers();
clear_io_events();
super::destroy();
THREAD_FREE(this, http2StreamAllocator, this_ethread());
}
void
Http2Stream::response_initialize_data_handling(bool &is_done)
{
is_done = false;
const char *name = "transfer-encoding";
const char *value = "chunked";
int chunked_index = response_header.value_get_index(name, strlen(name), value, strlen(value));
// -1 means this value was not found for this field
if (chunked_index >= 0) {
Http2StreamDebug("Response is chunked");
chunked = true;
this->chunked_handler.init_by_action(this->response_reader, ChunkedHandler::ACTION_DECHUNK);
this->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE;
this->chunked_handler.dechunked_reader = this->chunked_handler.dechunked_buffer->alloc_reader();
this->response_reader->dealloc();
this->response_reader = nullptr;
// Get things going if there is already data waiting
if (this->chunked_handler.chunked_reader->is_read_avail_more_than(0)) {
response_process_data(is_done);
}
}
}
void
Http2Stream::response_process_data(bool &done)
{
done = false;
if (chunked) {
do {
if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
}
done = this->chunked_handler.process_chunked_content();
} while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
}
}
bool
Http2Stream::response_is_data_available() const
{
IOBufferReader *reader = this->response_get_data_reader();
return reader ? reader->is_read_avail_more_than(0) : false;
}
IOBufferReader *
Http2Stream::response_get_data_reader() const
{
return (chunked) ? chunked_handler.dechunked_reader : response_reader;
}
void
Http2Stream::set_active_timeout(ink_hrtime timeout_in)
{
active_timeout = timeout_in;
clear_active_timer();
if (active_timeout > 0) {
active_event = this_ethread()->schedule_in(this, active_timeout);
}
}
void
Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
{
inactive_timeout = timeout_in;
if (inactive_timeout > 0) {
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (!inactive_event) {
inactive_event = this_ethread()->schedule_every(this, HRTIME_SECONDS(1));
}
} else {
clear_inactive_timer();
}
}
void
Http2Stream::cancel_inactivity_timeout()
{
set_inactivity_timeout(0);
}
void
Http2Stream::clear_inactive_timer()
{
inactive_timeout_at = 0;
if (inactive_event) {
inactive_event->cancel();
inactive_event = nullptr;
}
}
void
Http2Stream::clear_active_timer()
{
if (active_event) {
active_event->cancel();
active_event = nullptr;
}
}
void
Http2Stream::clear_timers()
{
clear_inactive_timer();
clear_active_timer();
}
void
Http2Stream::clear_io_events()
{
if (read_event) {
read_event->cancel();
}
read_event = nullptr;
if (write_event) {
write_event->cancel();
}
write_event = nullptr;
if (buffer_full_write_event) {
buffer_full_write_event->cancel();
buffer_full_write_event = nullptr;
}
}
void
Http2Stream::release(IOBufferReader *r)
{
super::release(r);
current_reader = nullptr; // State machine is on its own way down.
this->do_io_close();
}
ssize_t
Http2Stream::client_rwnd() const
{
return this->_client_rwnd;
}
Http2ErrorCode
Http2Stream::increment_client_rwnd(size_t amount)
{
this->_client_rwnd += amount;
this->_recent_rwnd_increment[this->_recent_rwnd_increment_index] = amount;
++this->_recent_rwnd_increment_index;
this->_recent_rwnd_increment_index %= this->_recent_rwnd_increment.size();
double sum = std::accumulate(this->_recent_rwnd_increment.begin(), this->_recent_rwnd_increment.end(), 0.0);
double avg = sum / this->_recent_rwnd_increment.size();
if (avg < Http2::min_avg_window_update) {
return Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM;
}
return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
}
Http2ErrorCode
Http2Stream::decrement_client_rwnd(size_t amount)
{
this->_client_rwnd -= amount;
if (this->_client_rwnd < 0) {
return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
} else {
return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
}
}
ssize_t
Http2Stream::server_rwnd() const
{
return this->_server_rwnd;
}
Http2ErrorCode
Http2Stream::increment_server_rwnd(size_t amount)
{
this->_server_rwnd += amount;
return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
}
Http2ErrorCode
Http2Stream::decrement_server_rwnd(size_t amount)
{
this->_server_rwnd -= amount;
if (this->_server_rwnd < 0) {
return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
} else {
return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
}
}
bool
Http2Stream::_switch_thread_if_not_on_right_thread(int event, void *edata)
{
if (this->_thread != this_ethread()) {
SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
if (cross_thread_event == nullptr) {
// Send to the right thread
cross_thread_event = this->_thread->schedule_imm(this, event, edata);
}
return false;
}
return true;
}