blob: 6f2a512465146ed6d8d6be6cf864d2c3102a8ec3 [file] [log] [blame]
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <stddef.h>
#include <apr_atomic.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include "h2_private.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_h2.h"
#include "h2_io.h"
#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_stream_set.h"
#include "h2_task.h"
#include "h2_task_input.h"
#include "h2_task_output.h"
#include "h2_task_queue.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
do { \
if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
} while(0)
#define H2_MPLX_IO_IN(lvl,m,io,msg) \
do { \
if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
} while(0)
static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
AP_DEBUG_ASSERT(m);
if (m->aborted) {
*pstatus = APR_ECONNABORTED;
return 1;
}
return 0;
}
static void have_out_data_for(h2_mplx *m, int stream_id);
static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): destroy, refs=%d",
m->id, m->refs);
m->aborted = 1;
if (m->ready_ios) {
h2_io_set_destroy(m->ready_ios);
m->ready_ios = NULL;
}
if (m->stream_ios) {
h2_io_set_destroy(m->stream_ios);
m->stream_ios = NULL;
}
if (m->pool) {
apr_pool_destroy(m->pool);
}
}
/**
* A h2_mplx needs to be thread-safe *and* if will be called by
* the h2_session thread *and* the h2_worker threads. Therefore:
* - calls are protected by a mutex lock, m->lock
* - the pool needs its own allocator, since apr_allocator_t are
* not re-entrant. The separate allocator works without a
* separate lock since we already protect h2_mplx itself.
* Since HTTP/2 connections can be expected to live longer than
* their HTTP/1 cousins, the separate allocator seems to work better
* than protecting a shared h2_session one with an own lock.
*/
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
{
apr_status_t status = APR_SUCCESS;
h2_config *conf = h2_config_get(c);
apr_allocator_t *allocator = NULL;
h2_mplx *m;
AP_DEBUG_ASSERT(conf);
status = apr_allocator_create(&allocator);
if (status != APR_SUCCESS) {
return NULL;
}
m = apr_pcalloc(parent, sizeof(h2_mplx));
if (m) {
m->id = c->id;
APR_RING_ELEM_INIT(m, link);
m->refs = 1;
m->c = c;
apr_pool_create_ex(&m->pool, parent, NULL, allocator);
if (!m->pool) {
return NULL;
}
apr_allocator_owner_set(allocator, m->pool);
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (status != APR_SUCCESS) {
h2_mplx_destroy(m);
return NULL;
}
m->bucket_alloc = apr_bucket_alloc_create(m->pool);
m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->workers = workers;
m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
}
return m;
}
static void release(h2_mplx *m, int lock)
{
if (lock) {
apr_thread_mutex_lock(m->lock);
--m->refs;
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
apr_thread_mutex_unlock(m->lock);
}
else {
--m->refs;
}
}
void h2_mplx_reference(h2_mplx *m)
{
apr_thread_mutex_lock(m->lock);
++m->refs;
apr_thread_mutex_unlock(m->lock);
}
void h2_mplx_release(h2_mplx *m)
{
release(m, 1);
}
static void workers_register(h2_mplx *m) {
/* Initially, there was ref count increase for this as well, but
* this is not needed, even harmful.
* h2_workers is only a hub for all the h2_worker instances.
* At the end-of-life of this h2_mplx, we always unregister at
* the workers. The thing to manage are all the h2_worker instances
* out there. Those may hold a reference to this h2_mplx and we cannot
* call them to unregister.
*
* Therefore: ref counting for h2_workers in not needed, ref counting
* for h2_worker using this is critical.
*/
h2_workers_register(m->workers, m);
}
static void workers_unregister(h2_mplx *m) {
h2_workers_unregister(m->workers, m);
}
static void io_destroy(h2_mplx *m, h2_io *io)
{
apr_pool_t *pool = io->pool;
io->pool = NULL;
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
m->file_handles_allowed += io->files_handles_owned;
h2_io_set_remove(m->stream_ios, io);
h2_io_set_remove(m->ready_ios, io);
h2_io_destroy(io);
if (pool) {
apr_pool_clear(pool);
if (m->spare_pool) {
apr_pool_destroy(m->spare_pool);
}
m->spare_pool = pool;
}
}
static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
{
/* Remove io from ready set, we will never submit it */
h2_io_set_remove(m->ready_ios, io);
if (io->task_done || h2_tq_remove(m->q, io->id)) {
/* already finished or not even started yet */
io_destroy(m, io);
return 0;
}
else {
/* cleanup once task is done */
io->orphaned = 1;
if (rst_error) {
h2_io_rst(io, rst_error);
}
return 1;
}
}
static int stream_done_iter(void *ctx, h2_io *io) {
return io_stream_done((h2_mplx*)ctx, io, 0);
}
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
workers_unregister(m);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterator until all h2_io have been orphaned or destroyed */
}
release(m, 0);
while (m->refs > 0) {
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): release_join, refs=%d, waiting...",
m->id, m->refs);
apr_thread_cond_wait(wait, m->lock);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): release_join -> destroy, (#ios=%ld)",
m->id, (long)h2_io_set_size(m->stream_ios));
h2_mplx_destroy(m);
/* all gone */
/*apr_thread_mutex_unlock(m->lock);*/
}
return status;
}
void h2_mplx_abort(h2_mplx *m)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
m->aborted = 1;
apr_thread_mutex_unlock(m->lock);
}
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
io_stream_done(m, io, rst_error);
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
void h2_mplx_task_done(h2_mplx *m, int stream_id)
{
apr_status_t status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): task(%d) done", m->id, stream_id);
if (io) {
io->task_done = 1;
if (io->orphaned) {
io_destroy(m, io);
}
else {
/* hang around until the stream deregisteres */
}
}
apr_thread_mutex_unlock(m->lock);
}
}
apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
int stream_id, apr_bucket_brigade *bb,
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
io->input_arrived = iowait;
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
status = h2_io_in_read(io, bb, -1);
while (APR_STATUS_IS_EAGAIN(status)
&& !is_aborted(m, &status)
&& block == APR_BLOCK_READ) {
apr_thread_cond_wait(io->input_arrived, m->lock);
status = h2_io_in_read(io, bb, -1);
}
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
io->input_arrived = NULL;
}
else {
status = APR_EOF;
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
apr_bucket_brigade *bb)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
status = h2_io_in_write(io, bb);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
}
}
else {
status = APR_EOF;
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = h2_io_in_close(io);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
if (io->input_arrived) {
apr_thread_cond_signal(io->input_arrived);
}
}
else {
status = APR_ECONNABORTED;
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
typedef struct {
h2_mplx_consumed_cb *cb;
void *cb_ctx;
int streams_updated;
} update_ctx;
static int update_window(void *ctx, h2_io *io)
{
if (io->input_consumed) {
update_ctx *uctx = (update_ctx*)ctx;
uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
io->input_consumed = 0;
++uctx->streams_updated;
}
return 1;
}
apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
h2_mplx_consumed_cb *cb, void *cb_ctx)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
update_ctx ctx;
ctx.cb = cb;
ctx.cb_ctx = cb_ctx;
ctx.streams_updated = 0;
status = APR_EAGAIN;
h2_io_set_iter(m->stream_ios, update_window, &ctx);
if (ctx.streams_updated) {
status = APR_SUCCESS;
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
h2_io_data_cb *cb, void *ctx,
apr_off_t *plen, int *peos,
apr_table_t **ptrailers)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
status = h2_io_out_readx(io, cb, ctx, plen, peos);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post");
if (status == APR_SUCCESS && cb && io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
else {
status = APR_ECONNABORTED;
}
*ptrailers = (*peos && io->response)? io->response->trailers : NULL;
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id,
apr_bucket_brigade *bb,
apr_off_t *plen, int *peos,
apr_table_t **ptrailers)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
status = h2_io_out_read_to(io, bb, plen, peos);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_post");
if (status == APR_SUCCESS && io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
else {
status = APR_ECONNABORTED;
}
*ptrailers = (*peos && io->response)? io->response->trailers : NULL;
apr_thread_mutex_unlock(m->lock);
}
return status;
}
h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
{
apr_status_t status;
h2_stream *stream = NULL;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return NULL;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
if (io) {
stream = h2_stream_set_get(streams, io->id);
if (stream) {
if (io->rst_error) {
h2_stream_rst(stream, io->rst_error);
}
else {
AP_DEBUG_ASSERT(io->response);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
h2_stream_set_response(stream, io->response, io->bbout);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
}
}
else {
/* We have the io ready, but the stream has gone away, maybe
* reset by the client. Should no longer happen since such
* streams should clear io's from the ready queue.
*/
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, APLOGNO(02953)
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
io->orphaned = 1;
if (io->task_done) {
io_destroy(m, io);
}
else {
/* hang around until the h2_task is done */
h2_io_rst(io, H2_ERR_STREAM_CLOSED);
}
}
if (io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
apr_thread_mutex_unlock(m->lock);
}
return stream;
}
static apr_status_t out_write(h2_mplx *m, h2_io *io,
ap_filter_t* f, apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
apr_status_t status = APR_SUCCESS;
/* We check the memory footprint queued for this stream_id
* and block if it exceeds our configured limit.
* We will not split buckets to enforce the limit to the last
* byte. After all, the bucket is already in memory.
*/
while (!APR_BRIGADE_EMPTY(bb)
&& (status == APR_SUCCESS)
&& !is_aborted(m, &status)) {
status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
&m->file_handles_allowed);
/* Wait for data to drain until there is room again */
while (!APR_BRIGADE_EMPTY(bb)
&& iowait
&& status == APR_SUCCESS
&& (m->stream_max_mem <= h2_io_out_length(io))
&& !is_aborted(m, &status)) {
trailers = NULL;
io->output_drained = iowait;
if (f) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_mplx(%ld-%d): waiting for out drain",
m->id, io->id);
}
apr_thread_cond_wait(io->output_drained, m->lock);
io->output_drained = NULL;
}
}
apr_brigade_cleanup(bb);
return status;
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
ap_filter_t* f, apr_bucket_brigade *bb,
struct apr_thread_cond_t *iowait)
{
apr_status_t status = APR_SUCCESS;
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
if (f) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_mplx(%ld-%d): open response: %d, rst=%d",
m->id, stream_id, response->http_status,
response->rst_error);
}
h2_io_set_response(io, response);
h2_io_set_add(m->ready_ios, io);
if (bb) {
status = out_write(m, io, f, bb, response->trailers, iowait);
}
have_out_data_for(m, stream_id);
}
else {
status = APR_ECONNABORTED;
}
return status;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
ap_filter_t* f, apr_bucket_brigade *bb,
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
status = out_open(m, stream_id, response, f, bb, iowait);
if (APLOGctrace1(m->c)) {
h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
}
if (m->aborted) {
return APR_ECONNABORTED;
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
ap_filter_t* f, apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = out_write(m, io, f, bb, trailers, iowait);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): write with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
have_out_data_for(m, stream_id);
if (m->aborted) {
return APR_ECONNABORTED;
}
}
else {
status = APR_ECONNABORTED;
}
}
if (m->lock) {
apr_thread_mutex_unlock(m->lock);
}
}
return status;
}
apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
if (!io->response && !io->rst_error) {
/* In case a close comes before a response was created,
* insert an error one so that our streams can properly
* reset.
*/
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
io->request, m->pool);
status = out_open(m, stream_id, r, NULL, NULL, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
status = h2_io_out_close(io, trailers);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
have_out_data_for(m, stream_id);
if (m->aborted) {
/* if we were the last output, the whole session might
* have gone down in the meantime.
*/
return APR_SUCCESS;
}
}
else {
status = APR_ECONNABORTED;
}
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->rst_error && !io->orphaned) {
h2_io_rst(io, error);
if (!io->response) {
h2_io_set_add(m->ready_ios, io);
}
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
have_out_data_for(m, stream_id);
if (io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
else {
status = APR_ECONNABORTED;
}
}
apr_thread_mutex_unlock(m->lock);
}
return status;
}
int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
{
int has_eos = 0;
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return 0;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
has_eos = io->orphaned || h2_io_in_has_eos_for(io);
}
apr_thread_mutex_unlock(m->lock);
}
return has_eos;
}
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
{
apr_status_t status;
int has_data = 0;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return 0;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
has_data = h2_io_out_has_data(io);
}
apr_thread_mutex_unlock(m->lock);
}
return has_data;
}
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
if (APLOGctrace2(m->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): trywait on data for %f ms)",
m->id, timeout/1000.0);
}
m->added_output = NULL;
apr_thread_mutex_unlock(m->lock);
}
return status;
}
static void have_out_data_for(h2_mplx *m, int stream_id)
{
(void)stream_id;
AP_DEBUG_ASSERT(m);
if (m->added_output) {
apr_thread_cond_signal(m->added_output);
}
}
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_tq_sort(m->q, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): reprioritize tasks", m->id);
apr_thread_mutex_unlock(m->lock);
}
workers_register(m);
return status;
}
static h2_io *open_io(h2_mplx *m, int stream_id)
{
apr_pool_t *io_pool = m->spare_pool;
h2_io *io;
if (!io_pool) {
apr_pool_create(&io_pool, m->pool);
}
else {
m->spare_pool = NULL;
}
io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
h2_io_set_add(m->stream_ios, io);
return io;
}
apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
const h2_request *req, int eos,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
h2_io *io = open_io(m, stream_id);
io->request = req;
io->request_body = !eos;
if (eos) {
status = h2_io_in_close(io);
}
h2_tq_add(m->q, io->id, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%ld-%d): process", m->c->id, stream_id);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
apr_thread_mutex_unlock(m->lock);
}
if (status == APR_SUCCESS) {
workers_register(m);
}
return status;
}
h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more)
{
h2_task *task = NULL;
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
*has_more = 0;
return NULL;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
int sid;
while (!task && (sid = h2_tq_shift(m->q)) > 0) {
/* Anything not already setup correctly in the task
* needs to be so now, as task will be executed right about
* when this method returns. */
h2_io *io = h2_io_set_get(m->stream_ios, sid);
if (io) {
task = h2_worker_create_task(w, m, io->request, !io->request_body);
}
}
*has_more = !h2_tq_empty(m->q);
apr_thread_mutex_unlock(m->lock);
}
return task;
}