blob: b8c4afc89fdf342e07aa9da6013355aad8dc65ad [file] [log] [blame]
/** @file
*
* A brief file description
*
* @section license License
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "I_VConnection.h"
#include "QUICStreamVCAdapter.h"
QUICStreamVCAdapter::QUICStreamVCAdapter(QUICStream &stream) : QUICStreamAdapter(stream), VConnection(new_ProxyMutex())
{
SET_HANDLER(&QUICStreamVCAdapter::state_stream_open);
}
QUICStreamVCAdapter::~QUICStreamVCAdapter()
{
if (this->_read_event) {
this->_read_event->cancel();
this->_read_event = nullptr;
}
if (this->_write_event) {
this->_write_event->cancel();
this->_write_event = nullptr;
}
}
int64_t
QUICStreamVCAdapter::write(QUICOffset offset, const uint8_t *data, uint64_t data_length, bool fin)
{
uint64_t bytes_added = -1;
if (this->_read_vio.op == VIO::READ) {
SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread());
bytes_added = this->_read_vio.get_writer()->write(data, data_length);
// Until receive FIN flag, keep nbytes INT64_MAX
if (fin && bytes_added == data_length) {
this->_read_vio.nbytes = offset + data_length;
}
}
return bytes_added;
}
Ptr<IOBufferBlock>
QUICStreamVCAdapter::_read(size_t len)
{
Ptr<IOBufferBlock> block;
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
IOBufferReader *reader = this->_write_vio.get_reader();
block = make_ptr<IOBufferBlock>(reader->get_current_block()->clone());
if (block->size()) {
block->consume(reader->start_offset);
block->_end = std::min(block->start() + len, block->_buf_end);
this->_write_vio.ndone += len;
}
reader->consume(block->size());
}
return block;
}
bool
QUICStreamVCAdapter::is_eos()
{
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
if (this->_write_vio.nbytes == INT64_MAX) {
return false;
}
if (this->_write_vio.ntodo() != 0) {
return false;
}
return true;
} else {
return false;
}
}
uint64_t
QUICStreamVCAdapter::unread_len()
{
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
return this->_write_vio.get_reader()->block_read_avail();
} else {
return 0;
}
}
uint64_t
QUICStreamVCAdapter::read_len()
{
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
return this->_write_vio.ndone;
} else {
return 0;
}
}
uint64_t
QUICStreamVCAdapter::total_len()
{
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
return this->_write_vio.nbytes;
} else {
return 0;
}
}
/**
* @brief Signal event to this->_read_vio.cont
*/
void
QUICStreamVCAdapter::encourge_read()
{
if (this->_read_vio.op == VIO::READ) {
SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread());
if (this->_read_vio.cont == nullptr) {
return;
}
int event = this->_read_vio.nbytes == INT64_MAX ? VC_EVENT_READ_READY : VC_EVENT_READ_COMPLETE;
this_ethread()->schedule_imm(this->_read_vio.cont, event, &this->_read_vio);
}
}
/**
* @brief Signal event to this->_write_vio.cont
*/
void
QUICStreamVCAdapter::encourge_write()
{
if (this->_write_vio.op == VIO::WRITE) {
SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
if (this->_write_vio.cont == nullptr) {
return;
}
int event = this->_write_vio.ntodo() ? VC_EVENT_WRITE_READY : VC_EVENT_WRITE_COMPLETE;
this_ethread()->schedule_imm(this->_write_vio.cont, event, &this->_write_vio);
}
}
/**
* @brief Signal event to this->_read_vio.cont
*/
void
QUICStreamVCAdapter::notify_eos()
{
if (this->_read_vio.op == VIO::READ) {
if (this->_read_vio.cont == nullptr) {
return;
}
int event = VC_EVENT_EOS;
MUTEX_TRY_LOCK(lock, this->_read_vio.mutex, this_ethread());
if (lock.is_locked()) {
this->_read_vio.cont->handleEvent(event, &this->_read_vio);
} else {
this_ethread()->schedule_imm(this->_read_vio.cont, event, &this->_read_vio);
}
}
}
// this->_read_vio.nbytes should be INT64_MAX until receive FIN flag
VIO *
QUICStreamVCAdapter::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
{
if (buf) {
this->_read_vio.buffer.writer_for(buf);
} else {
this->_read_vio.buffer.clear();
}
this->_read_vio.mutex = c ? c->mutex : this->mutex;
this->_read_vio.cont = c;
this->_read_vio.nbytes = nbytes;
this->_read_vio.ndone = 0;
this->_read_vio.vc_server = this;
this->_read_vio.op = VIO::READ;
return &this->_read_vio;
}
VIO *
QUICStreamVCAdapter::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
{
if (buf) {
this->_write_vio.buffer.reader_for(buf);
} else {
this->_write_vio.buffer.clear();
}
this->_write_vio.mutex = c ? c->mutex : this->mutex;
this->_write_vio.cont = c;
this->_write_vio.nbytes = nbytes;
this->_write_vio.ndone = 0;
this->_write_vio.vc_server = this;
this->_write_vio.op = VIO::WRITE;
return &this->_write_vio;
}
void
QUICStreamVCAdapter::do_io_close(int lerrno)
{
SET_HANDLER(&QUICStreamVCAdapter::state_stream_closed);
this->_read_vio.buffer.clear();
this->_read_vio.nbytes = 0;
this->_read_vio.op = VIO::NONE;
this->_read_vio.cont = nullptr;
this->_write_vio.buffer.clear();
this->_write_vio.nbytes = 0;
this->_write_vio.op = VIO::NONE;
this->_write_vio.cont = nullptr;
}
void
QUICStreamVCAdapter::do_io_shutdown(ShutdownHowTo_t howto)
{
ink_assert(false); // unimplemented yet
return;
}
void
QUICStreamVCAdapter::reenable(VIO *vio)
{
// TODO We probably need to tell QUICStream that the application consumed received data
// to update receive window here. In other words, we should not update receive window
// until the application consume data.
}
int
QUICStreamVCAdapter::state_stream_open(int event, void *data)
{
QUICErrorUPtr error = nullptr;
switch (event) {
case VC_EVENT_READ_READY:
case VC_EVENT_READ_COMPLETE: {
this->encourge_read();
break;
}
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE: {
this->encourge_write();
break;
}
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT: {
// TODO
ink_assert(false);
break;
}
default:
ink_assert(false);
}
return EVENT_DONE;
}
int
QUICStreamVCAdapter::state_stream_closed(int event, void *data)
{
switch (event) {
case VC_EVENT_READ_READY:
case VC_EVENT_READ_COMPLETE: {
// ignore
break;
}
case VC_EVENT_WRITE_READY:
case VC_EVENT_WRITE_COMPLETE: {
// ignore
break;
}
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT: {
// TODO
ink_assert(false);
break;
}
default:
ink_assert(false);
}
return EVENT_DONE;
}