blob: a4896b3308056bf1d6db3d5cb5461d4c113bcf8f [file] [log] [blame]
/** @file
Http2Stream.cc
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "HTTP2.h"
#include "Http2Stream.h"
#include "Http2ClientSession.h"
#include "../http/HttpSM.h"
ClassAllocator<Http2Stream> http2StreamAllocator("http2StreamAllocator");
int
Http2Stream::main_event_handler(int event, void *edata)
{
Event *e = static_cast<Event *>(edata);
Thread *this_thread = this_ethread();
if (this->get_thread() != this_thread) {
// Send on to the owning thread
if (cross_thread_event == NULL) {
cross_thread_event = this->get_thread()->schedule_imm(this, event, edata);
}
return 0;
}
ink_release_assert(this->get_thread() == this_ethread());
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
if (e == cross_thread_event) {
cross_thread_event = NULL;
} else if (e == active_event) {
event = VC_EVENT_ACTIVE_TIMEOUT;
active_event = NULL;
} else if (e == inactive_event) {
if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
event = VC_EVENT_INACTIVITY_TIMEOUT;
clear_inactive_timer();
}
} else if (e == read_event) {
read_event = NULL;
} else if (e == write_event) {
write_event = NULL;
}
switch (event) {
case VC_EVENT_ACTIVE_TIMEOUT:
case VC_EVENT_INACTIVITY_TIMEOUT:
if (current_reader && read_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked()) {
read_vio._cont->handleEvent(event, &read_vio);
} else {
this_ethread()->schedule_imm(read_vio._cont, event, &read_vio);
}
} else if (current_reader && write_vio.ntodo() > 0) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked()) {
write_vio._cont->handleEvent(event, &write_vio);
} else {
this_ethread()->schedule_imm(write_vio._cont, event, &write_vio);
}
}
break;
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE:
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &write_vio) {
if (write_vio.mutex) {
MUTEX_TRY_LOCK(lock, write_vio.mutex, this_ethread());
if (lock.is_locked() && write_vio._cont && this->current_reader) {
write_vio._cont->handleEvent(event, &write_vio);
} else {
this_ethread()->schedule_imm(write_vio._cont, event, &write_vio);
}
}
} else {
update_write_request(write_vio.get_reader(), INT64_MAX, true);
}
break;
case VC_EVENT_READ_COMPLETE:
case VC_EVENT_READ_READY:
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (e->cookie == &read_vio) {
if (read_vio.mutex) {
MUTEX_TRY_LOCK(lock, read_vio.mutex, this_ethread());
if (lock.is_locked() && read_vio._cont && this->current_reader) {
read_vio._cont->handleEvent(event, &read_vio);
} else {
this_ethread()->schedule_imm(read_vio._cont, event, &read_vio);
}
}
} else {
this->update_read_request(INT64_MAX, true);
}
break;
case VC_EVENT_EOS: {
// If there are active VIO's send the EOS through them
if (e->cookie == &read_vio) {
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
read_vio._cont->handleEvent(VC_EVENT_EOS, &read_vio);
} else if (e->cookie == &write_vio) {
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
write_vio._cont->handleEvent(VC_EVENT_EOS, &write_vio);
} else {
// Otherwise, handle the EOS yourself and shut down
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
// Clean up after yourself if this was an EOS
ink_release_assert(this->closed);
// Safe to initiate SSN_CLOSE if this is the last stream
static_cast<Http2ClientSession *>(parent)->connection_state.release_stream(this);
this->destroy();
}
break;
}
}
return 0;
}
Http2ErrorCode
Http2Stream::decode_header_blocks(HpackHandle &hpack_handle)
{
return http2_decode_header_blocks(&_req_header, (const uint8_t *)header_blocks, header_blocks_length, NULL, hpack_handle,
trailing_header);
}
void
Http2Stream::send_request(Http2ConnectionState &cstate)
{
// Convert header to HTTP/1.1 format
http2_convert_header_from_2_to_1_1(&_req_header);
// Write header to a buffer. Borrowing logic from HttpSM::write_header_into_buffer.
// Seems like a function like this ought to be in HTTPHdr directly
int bufindex;
int dumpoffset = 0;
int done, tmp;
IOBufferBlock *block;
do {
bufindex = 0;
tmp = dumpoffset;
block = request_buffer.get_current_block();
if (!block) {
request_buffer.add_block();
block = request_buffer.get_current_block();
}
done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
dumpoffset += bufindex;
request_buffer.fill(bufindex);
if (!done) {
request_buffer.add_block();
}
} while (!done);
// Is there a read_vio request waiting?
this->update_read_request(INT64_MAX, true);
}
bool
Http2Stream::change_state(uint8_t type, uint8_t flags)
{
switch (_state) {
case HTTP2_STREAM_STATE_IDLE:
if (type == HTTP2_FRAME_TYPE_HEADERS) {
if (recv_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
_state = HTTP2_STREAM_STATE_OPEN;
}
} else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
if (recv_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
_state = HTTP2_STREAM_STATE_OPEN;
}
} else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) {
_state = HTTP2_STREAM_STATE_RESERVED_LOCAL;
} else {
return false;
}
break;
case HTTP2_STREAM_STATE_OPEN:
if (type == HTTP2_FRAME_TYPE_RST_STREAM) {
_state = HTTP2_STREAM_STATE_CLOSED;
} else if (type == HTTP2_FRAME_TYPE_DATA) {
if (recv_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
} else if (send_end_stream) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
} else {
// Do not change state
}
} else {
// A stream in the "open" state may be used by both peers to send frames of any type.
return true;
}
break;
case HTTP2_STREAM_STATE_RESERVED_LOCAL:
if (type == HTTP2_FRAME_TYPE_HEADERS) {
if (flags & HTTP2_FLAGS_HEADERS_END_HEADERS) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
}
} else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
if (flags & HTTP2_FLAGS_CONTINUATION_END_HEADERS) {
_state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
}
} else {
return false;
}
break;
case HTTP2_STREAM_STATE_RESERVED_REMOTE:
// Currently ATS supports only HTTP/2 server features
return false;
case HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL:
if (type == HTTP2_FRAME_TYPE_RST_STREAM || recv_end_stream) {
_state = HTTP2_STREAM_STATE_CLOSED;
} else {
// Error, set state closed
_state = HTTP2_STREAM_STATE_CLOSED;
return false;
}
break;
case HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE:
if (type == HTTP2_FRAME_TYPE_RST_STREAM || send_end_stream) {
_state = HTTP2_STREAM_STATE_CLOSED;
} else if (type == HTTP2_FRAME_TYPE_HEADERS) { // w/o END_STREAM flag
// No state change here. Expect a following DATA frame with END_STREAM flag.
return true;
} else {
// Error, set state closed
_state = HTTP2_STREAM_STATE_CLOSED;
return false;
}
break;
case HTTP2_STREAM_STATE_CLOSED:
// No state changing
return true;
default:
return false;
}
Debug("http2_stream", "%s", Http2DebugNames::get_state_name(_state));
return true;
}
VIO *
Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
{
if (buf) {
read_vio.buffer.writer_for(buf);
} else {
read_vio.buffer.clear();
}
read_vio.mutex = c ? c->mutex : this->mutex;
read_vio._cont = c;
read_vio.nbytes = nbytes;
read_vio.ndone = 0;
read_vio.vc_server = this;
read_vio.op = VIO::READ;
// Is there already data in the request_buffer? If so, copy it over and then
// schedule a READ_READY or READ_COMPLETE event after we return.
update_read_request(nbytes, false);
return &read_vio;
}
VIO *
Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
{
if (abuffer) {
write_vio.buffer.reader_for(abuffer);
} else {
write_vio.buffer.clear();
}
write_vio.mutex = c ? c->mutex : this->mutex;
write_vio._cont = c;
write_vio.nbytes = nbytes;
write_vio.ndone = 0;
write_vio.vc_server = this;
write_vio.op = VIO::WRITE;
response_reader = response_buffer.alloc_reader();
return update_write_request(abuffer, nbytes, false) ? &write_vio : NULL;
}
// Initiated from SM
void
Http2Stream::do_io_close(int /* flags */)
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
// disengage us from the SM
super::release(NULL);
if (!sent_delete) {
Debug("http2_stream", "do_io_close stream %d", this->get_id());
// When we get here, the SM has initiated the shutdown. Either it received a WRITE_COMPLETE, or it is shutting down. Any
// remaining IO operations back to client should be abandoned. The SM-side buffers backing these operations will be deleted
// by the time this is called from transaction_done.
sent_delete = true;
closed = true;
if (parent && this->is_client_state_writeable()) {
// Make sure any trailing end of stream frames are sent
// Wee will be removed at send_data_frames or closing connection phase
static_cast<Http2ClientSession *>(parent)->connection_state.send_data_frames(this);
}
clear_timers();
clear_io_events();
// Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed
}
}
/*
* HttpSM has called TXN_close hooks.
*/
void
Http2Stream::transaction_done()
{
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
if (cross_thread_event) {
cross_thread_event->cancel();
cross_thread_event = NULL;
}
if (!closed)
do_io_close(); // Make sure we've been closed. If we didn't close the parent session better still be open
ink_release_assert(closed || !static_cast<Http2ClientSession *>(parent)->connection_state.is_state_closed());
current_reader = NULL;
if (closed) {
// Safe to initiate SSN_CLOSE if this is the last stream
ink_assert(cross_thread_event == NULL);
// Schedule the destroy to occur after we unwind here. IF we call directly, may delete with reference on the stack.
cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_EOS, NULL);
}
}
// Initiated from the Http2 side
void
Http2Stream::initiating_close()
{
if (!closed) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
Debug("http2_stream", "initiating_close stream %d", this->get_id());
// Set the state of the connection to closed
// TODO - these states should be combined
closed = true;
_state = HTTP2_STREAM_STATE_CLOSED;
// leaving the reference to the SM, so we can detach from the SM when we actually destroy
// current_reader = NULL;
// Leaving reference to client session as well, so we can signal once the
// TXN_CLOSE has been sent
// parent = NULL;
clear_timers();
clear_io_events();
// This should result in do_io_close or release being called. That will schedule the final
// kill yourself signal
// Send the SM the EOS signal if there are no active VIO's to signal
// We are sending signals rather than calling the handlers directly to avoid the case where
// the HttpTunnel handler causes the HttpSM to be deleted on the stack.
bool sent_write_complete = false;
if (current_reader && this->is_client_state_writeable()) {
// Push out any last IO events
if (write_vio._cont) {
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
// Are we done?
if (write_vio.nbytes == write_vio.ndone) {
Debug("http2_stream", "handle write from destroy stream=%d event=%d", this->_id, VC_EVENT_WRITE_COMPLETE);
write_event = send_tracked_event(write_event, VC_EVENT_WRITE_COMPLETE, &write_vio);
} else {
write_event = send_tracked_event(write_event, VC_EVENT_EOS, &write_vio);
Debug("http2_stream", "handle write from destroy stream=%d event=%d", this->_id, VC_EVENT_EOS);
}
sent_write_complete = true;
}
}
// Send EOS to let SM know that we aren't sticking around
if (current_reader && read_vio._cont) {
// Only bother with the EOS if we haven't sent the write complete
if (!sent_write_complete) {
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
Debug("http2_stream", "send EOS to read cont stream=%d", this->_id);
read_event = send_tracked_event(read_event, VC_EVENT_EOS, &read_vio);
}
} else if (current_reader) {
SCOPED_MUTEX_LOCK(lock, current_reader->mutex, this_ethread());
current_reader->handleEvent(VC_EVENT_ERROR);
} else if (!sent_write_complete) {
// Transaction is already gone. Kill yourself
do_io_close();
}
}
}
/* Replace existing event only if the new event is different than the inprogress event */
Event *
Http2Stream::send_tracked_event(Event *event, int send_event, VIO *vio)
{
if (event != NULL) {
if (event->callback_event != send_event) {
event->cancel();
event = NULL;
}
}
if (event == NULL) {
event = this_ethread()->schedule_imm(this, send_event, vio);
}
return event;
}
void
Http2Stream::update_read_request(int64_t read_len, bool call_update)
{
if (closed || sent_delete || parent == NULL || current_reader == NULL || read_vio.mutex == NULL) {
return;
}
if (this->get_thread() != this_ethread()) {
SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
if (cross_thread_event == NULL) {
// Send to the right thread
cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_READ_READY, NULL);
}
return;
}
ink_release_assert(this->get_thread() == this_ethread());
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) {
// If this vio has a different buffer, we must copy
ink_release_assert(this_ethread() == this->_thread);
if (read_vio.buffer.writer() != (&request_buffer)) {
int64_t num_to_read = read_vio.nbytes - read_vio.ndone;
if (num_to_read > read_len) {
num_to_read = read_len;
}
if (num_to_read > 0) {
int bytes_added = read_vio.buffer.writer()->write(request_reader, num_to_read);
if (bytes_added > 0) {
request_reader->consume(bytes_added);
read_vio.ndone += bytes_added;
int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio._cont && this->current_reader)
read_vio._cont->handleEvent(send_event, &read_vio);
} else { // Called from do_io_read. Still setting things up. Send event to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
} else {
// Try to be smart and only signal if there was additional data
int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
if (request_reader->read_avail() > 0 || send_event == VC_EVENT_READ_COMPLETE) {
if (call_update) { // Safe to call vio handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (read_vio._cont && this->current_reader)
read_vio._cont->handleEvent(send_event, &read_vio);
} else { // Called from do_io_read. Still setting things up. Send event
// to handle this after the dust settles
read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
}
}
bool
Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool call_update)
{
bool retval = true;
if (!this->is_client_state_writeable() || closed || sent_delete || parent == NULL || write_vio.mutex == NULL) {
return retval;
}
if (this->get_thread() != this_ethread()) {
SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
if (cross_thread_event == NULL) {
// Send to the right thread
cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_WRITE_READY, NULL);
}
return retval;
}
ink_release_assert(this->get_thread() == this_ethread());
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
// Copy over data in the abuffer into resp_buffer. Then schedule a WRITE_READY or
// WRITE_COMPLETE event
SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
int64_t total_added = 0;
if (write_vio.nbytes > 0 && write_vio.ndone < write_vio.nbytes) {
int64_t num_to_write = write_vio.nbytes - write_vio.ndone;
if (num_to_write > write_len) {
num_to_write = write_len;
}
int64_t bytes_avail = buf_reader->read_avail();
if (bytes_avail > num_to_write) {
bytes_avail = num_to_write;
}
while (total_added < bytes_avail) {
int64_t bytes_added = response_buffer.write(buf_reader, bytes_avail);
buf_reader->consume(bytes_added);
total_added += bytes_added;
}
}
bool is_done = false;
this->response_process_data(is_done);
if (total_added > 0 || is_done) {
write_vio.ndone += total_added;
int send_event = (write_vio.nbytes == write_vio.ndone || is_done) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
// Process the new data
if (!this->response_header_done) {
// Still parsing the response_header
int bytes_used = 0;
int state = this->response_header.parse_resp(&http_parser, this->response_reader, &bytes_used, false);
// this->response_reader->consume(bytes_used);
switch (state) {
case PARSE_RESULT_DONE: {
this->response_header_done = true;
// Send the response header back
parent->connection_state.send_headers_frame(this);
// See if the response is chunked. Set up the dechunking logic if it is
// Make sure to check if the chunk is complete and signal appropriately
this->response_initialize_data_handling(is_done);
if (is_done) {
send_event = VC_EVENT_WRITE_COMPLETE;
}
// If there is additional data, send it along in a data frame. Or if this was header only
// make sure to send the end of stream
if (this->response_is_data_available() || send_event == VC_EVENT_WRITE_COMPLETE) {
if (send_event != VC_EVENT_WRITE_COMPLETE) {
send_response_body();
// As with update_read_request, should be safe to call handler directly here if
// call_update is true. Commented out for now while tracking a performance regression
if (call_update) { // Coming from reenable. Safe to call the handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (write_vio._cont && this->current_reader)
write_vio._cont->handleEvent(send_event, &write_vio);
} else { // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
write_event = send_tracked_event(write_event, send_event, &write_vio);
}
} else {
this->mark_body_done();
// Send the data frame
send_response_body();
retval = false;
}
}
break;
}
case PARSE_RESULT_CONT:
// Let it ride for next time
break;
default:
break;
}
} else {
if (send_event == VC_EVENT_WRITE_COMPLETE) {
// Defer sending the write complete until the send_data_frame has sent it all
// this_ethread()->schedule_imm(this, send_event, &write_vio);
this->mark_body_done();
send_response_body();
retval = false;
} else {
send_response_body();
if (call_update) { // Coming from reenable. Safe to call the handler directly
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (write_vio._cont && this->current_reader)
write_vio._cont->handleEvent(send_event, &write_vio);
} else { // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
write_event = send_tracked_event(write_event, send_event, &write_vio);
}
}
}
Debug("http2_stream", "write update stream_id=%d event=%d", this->get_id(), send_event);
}
return retval;
}
void
Http2Stream::push_promise(URL &url)
{
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
parent->connection_state.send_push_promise_frame(this, url);
}
void
Http2Stream::send_response_body()
{
Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
if (Http2::stream_priority_enabled) {
parent->connection_state.schedule_stream(this);
} else {
// Send DATA frames directly
parent->connection_state.send_data_frames(this);
}
}
void
Http2Stream::reenable(VIO *vio)
{
if (this->parent) {
if (vio->op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
update_write_request(vio->get_reader(), INT64_MAX, true);
} else if (vio->op == VIO::READ) {
SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
update_read_request(INT64_MAX, true);
}
}
}
void
Http2Stream::destroy()
{
Debug("http2_stream", "Destroy stream %d, sent %" PRIu64 " bytes", this->_id, this->bytes_sent);
// Clean up the write VIO in case of inactivity timeout
this->do_io_write(NULL, 0, NULL);
HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
ink_hrtime end_time = Thread::get_hrtime();
HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, _thread, end_time - _start_time);
_req_header.destroy();
response_header.destroy();
// Drop references to all buffer data
request_buffer.clear();
response_buffer.clear();
// Free the mutexes in the VIO
read_vio.mutex.clear();
write_vio.mutex.clear();
if (header_blocks) {
ats_free(header_blocks);
}
chunked_handler.clear();
super::destroy();
// Current Http2ConnectionState implementation uses a memory pool for instantiating streams and DLL<> stream_list for storing
// active streams. Destroying a stream before deleting it from stream_list and then creating a new one + reusing the same chunk
// from the memory pool right away always leads to destroying the DLL structure (deadlocks, inconsistencies).
// The following is meant as a safety net since the consequences are disastrous. Until the design/implementation changes it seems
// less error prone to (double) delete before destroying (noop if already deleted).
if (parent) {
if (static_cast<Http2ClientSession *>(parent)->connection_state.delete_stream(this)) {
Warning("Http2Stream was about to be deallocated without removing it from the active stream list");
}
}
THREAD_FREE(this, http2StreamAllocator, this_ethread());
}
bool
check_stream_thread(Continuation *cont)
{
Http2Stream *stream = dynamic_cast<Http2Stream *>(cont);
if (stream) {
return stream->get_thread() == this_ethread();
} else {
return true;
}
}
bool
check_continuation(Continuation *cont)
{
Http2Stream *stream = dynamic_cast<Http2Stream *>(cont);
return stream == NULL;
}
void
Http2Stream::response_initialize_data_handling(bool &is_done)
{
is_done = false;
const char *name = "transfer-encoding";
const char *value = "chunked";
int chunked_index = response_header.value_get_index(name, strlen(name), value, strlen(value));
// -1 means this value was not found for this field
if (chunked_index >= 0) {
Debug("http2_stream", "Response is chunked");
chunked = true;
this->chunked_handler.init_by_action(this->response_reader, ChunkedHandler::ACTION_DECHUNK);
this->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE;
this->chunked_handler.dechunked_reader = this->chunked_handler.dechunked_buffer->alloc_reader();
this->response_reader->dealloc();
this->response_reader = NULL;
// Get things going if there is already data waiting
if (this->chunked_handler.chunked_reader->is_read_avail_more_than(0)) {
response_process_data(is_done);
}
}
}
void
Http2Stream::response_process_data(bool &done)
{
done = false;
if (chunked) {
do {
if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
}
done = this->chunked_handler.process_chunked_content();
} while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
}
}
bool
Http2Stream::response_is_data_available() const
{
IOBufferReader *reader = this->response_get_data_reader();
return reader ? reader->is_read_avail_more_than(0) : false;
}
IOBufferReader *
Http2Stream::response_get_data_reader() const
{
return (chunked) ? chunked_handler.dechunked_reader : response_reader;
}
void
Http2Stream::set_active_timeout(ink_hrtime timeout_in)
{
active_timeout = timeout_in;
clear_active_timer();
if (active_timeout > 0) {
active_event = this_ethread()->schedule_in(this, active_timeout);
}
}
void
Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
{
inactive_timeout = timeout_in;
if (inactive_timeout > 0) {
inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
if (!inactive_event) {
inactive_event = this_ethread()->schedule_every(this, HRTIME_SECONDS(1));
}
} else {
clear_inactive_timer();
}
}
void
Http2Stream::cancel_inactivity_timeout()
{
set_inactivity_timeout(0);
}
void
Http2Stream::clear_inactive_timer()
{
inactive_timeout_at = 0;
if (inactive_event) {
inactive_event->cancel();
inactive_event = NULL;
}
}
void
Http2Stream::clear_active_timer()
{
if (active_event) {
active_event->cancel();
active_event = NULL;
}
}
void
Http2Stream::clear_timers()
{
clear_inactive_timer();
clear_active_timer();
}
void
Http2Stream::clear_io_events()
{
if (read_event) {
read_event->cancel();
}
read_event = NULL;
if (write_event) {
write_event->cancel();
}
write_event = NULL;
}
void
Http2Stream::release(IOBufferReader *r)
{
super::release(r);
current_reader = NULL; // State machine is on its own way down.
this->do_io_close();
}