| /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <assert.h> |
| #include <stddef.h> |
| #include <stdlib.h> |
| |
| #include <apr_atomic.h> |
| #include <apr_thread_mutex.h> |
| #include <apr_thread_cond.h> |
| #include <apr_strings.h> |
| #include <apr_time.h> |
| |
| #include <httpd.h> |
| #include <http_core.h> |
| #include <http_log.h> |
| |
| #include "mod_http2.h" |
| |
| #include "h2.h" |
| #include "h2_private.h" |
| #include "h2_bucket_beam.h" |
| #include "h2_config.h" |
| #include "h2_conn.h" |
| #include "h2_ctx.h" |
| #include "h2_h2.h" |
| #include "h2_mplx.h" |
| #include "h2_ngn_shed.h" |
| #include "h2_request.h" |
| #include "h2_stream.h" |
| #include "h2_task.h" |
| #include "h2_worker.h" |
| #include "h2_workers.h" |
| #include "h2_util.h" |
| |
| |
| static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, |
| conn_rec *c, int level) |
| { |
| if (beam && APLOG_C_IS_LEVEL(c,level)) { |
| char buffer[2048]; |
| apr_size_t off = 0; |
| |
| off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed); |
| off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "to_send", ", ", &beam->send_list); |
| off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "recv_buffer", ", ", beam->recv_buffer); |
| off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list); |
| off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list); |
| |
| ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", |
| c->id, id, msg, buffer); |
| } |
| } |
| |
| /* utility for iterating over ihash task sets */ |
| typedef struct { |
| h2_mplx *m; |
| h2_task *task; |
| apr_time_t now; |
| } task_iter_ctx; |
| |
| /* NULL or the mutex hold by this thread, used for recursive calls |
| */ |
| static apr_threadkey_t *thread_lock; |
| |
| apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) |
| { |
| return apr_threadkey_private_create(&thread_lock, NULL, pool); |
| } |
| |
| static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) |
| { |
| apr_status_t status; |
| void *mutex = NULL; |
| |
| /* Enter the mutex if this thread already holds the lock or |
| * if we can acquire it. Only on the later case do we unlock |
| * onleaving the mutex. |
| * This allow recursive entering of the mutex from the saem thread, |
| * which is what we need in certain situations involving callbacks |
| */ |
| ap_assert(m); |
| apr_threadkey_private_get(&mutex, thread_lock); |
| if (mutex == m->lock) { |
| *pacquired = 0; |
| return APR_SUCCESS; |
| } |
| |
| ap_assert(m->lock); |
| status = apr_thread_mutex_lock(m->lock); |
| *pacquired = (status == APR_SUCCESS); |
| if (*pacquired) { |
| apr_threadkey_private_set(m->lock, thread_lock); |
| } |
| return status; |
| } |
| |
| static void leave_mutex(h2_mplx *m, int acquired) |
| { |
| if (acquired) { |
| apr_threadkey_private_set(NULL, thread_lock); |
| apr_thread_mutex_unlock(m->lock); |
| } |
| } |
| |
| static void beam_leave(void *ctx, apr_thread_mutex_t *lock) |
| { |
| leave_mutex(ctx, 1); |
| } |
| |
| static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) |
| { |
| h2_mplx *m = ctx; |
| int acquired; |
| apr_status_t status; |
| |
| status = enter_mutex(m, &acquired); |
| if (status == APR_SUCCESS) { |
| pbl->mutex = m->lock; |
| pbl->leave = acquired? beam_leave : NULL; |
| pbl->leave_ctx = m; |
| } |
| return status; |
| } |
| |
| static void stream_output_consumed(void *ctx, |
| h2_bucket_beam *beam, apr_off_t length) |
| { |
| h2_task *task = ctx; |
| if (length > 0 && task && task->assigned) { |
| h2_req_engine_out_consumed(task->assigned, task->c, length); |
| } |
| } |
| |
| static void stream_input_ev(void *ctx, h2_bucket_beam *beam) |
| { |
| h2_mplx *m = ctx; |
| apr_atomic_set32(&m->event_pending, 1); |
| } |
| |
| static void stream_input_consumed(void *ctx, |
| h2_bucket_beam *beam, apr_off_t length) |
| { |
| h2_mplx *m = ctx; |
| if (m->input_consumed && length) { |
| m->input_consumed(m->input_consumed_ctx, beam->id, length); |
| } |
| } |
| |
| static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) |
| { |
| h2_mplx *m = ctx; |
| if (m->tx_handles_reserved > 0) { |
| --m->tx_handles_reserved; |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, |
| "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", |
| m->id, beam->id, beam->tag, m->tx_handles_reserved); |
| return 1; |
| } |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, |
| "h2_mplx(%ld-%d): can_beam_file denied on %s", |
| m->id, beam->id, beam->tag); |
| return 0; |
| } |
| |
| static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response); |
| static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); |
| |
| static void check_tx_reservation(h2_mplx *m) |
| { |
| if (m->tx_handles_reserved <= 0) { |
| m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, |
| H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks))); |
| } |
| } |
| |
| static void check_tx_free(h2_mplx *m) |
| { |
| if (m->tx_handles_reserved > m->tx_chunk_size) { |
| apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size; |
| m->tx_handles_reserved = m->tx_chunk_size; |
| h2_workers_tx_free(m->workers, count); |
| } |
| else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) { |
| h2_workers_tx_free(m->workers, m->tx_handles_reserved); |
| m->tx_handles_reserved = 0; |
| } |
| } |
| |
| static int purge_stream(void *ctx, void *val) |
| { |
| h2_mplx *m = ctx; |
| h2_stream *stream = val; |
| int stream_id = stream->id; |
| h2_task *task; |
| |
| /* stream_cleanup clears all buffers and destroys any buckets |
| * that might hold references into task space. Needs to be done |
| * before task destruction, otherwise it will complain. */ |
| h2_stream_cleanup(stream); |
| |
| task = h2_ihash_get(m->tasks, stream_id); |
| if (task) { |
| task_destroy(m, task, 1); |
| } |
| |
| h2_stream_destroy(stream); |
| h2_ihash_remove(m->spurge, stream_id); |
| return 0; |
| } |
| |
| static void purge_streams(h2_mplx *m) |
| { |
| if (!h2_ihash_empty(m->spurge)) { |
| while(!h2_ihash_iter(m->spurge, purge_stream, m)) { |
| /* repeat until empty */ |
| } |
| h2_ihash_clear(m->spurge); |
| } |
| } |
| |
| static void h2_mplx_destroy(h2_mplx *m) |
| { |
| conn_rec **pslave; |
| ap_assert(m); |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): destroy, tasks=%d", |
| m->id, (int)h2_ihash_count(m->tasks)); |
| check_tx_free(m); |
| |
| while (m->spare_slaves->nelts > 0) { |
| pslave = (conn_rec **)apr_array_pop(m->spare_slaves); |
| h2_slave_destroy(*pslave); |
| } |
| if (m->pool) { |
| apr_pool_destroy(m->pool); |
| } |
| } |
| |
| /** |
| * A h2_mplx needs to be thread-safe *and* if will be called by |
| * the h2_session thread *and* the h2_worker threads. Therefore: |
| * - calls are protected by a mutex lock, m->lock |
| * - the pool needs its own allocator, since apr_allocator_t are |
| * not re-entrant. The separate allocator works without a |
| * separate lock since we already protect h2_mplx itself. |
| * Since HTTP/2 connections can be expected to live longer than |
| * their HTTP/1 cousins, the separate allocator seems to work better |
| * than protecting a shared h2_session one with an own lock. |
| */ |
| h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, |
| const h2_config *conf, |
| apr_interval_time_t stream_timeout, |
| h2_workers *workers) |
| { |
| apr_status_t status = APR_SUCCESS; |
| apr_allocator_t *allocator = NULL; |
| h2_mplx *m; |
| ap_assert(conf); |
| |
| status = apr_allocator_create(&allocator); |
| if (status != APR_SUCCESS) { |
| return NULL; |
| } |
| |
| m = apr_pcalloc(parent, sizeof(h2_mplx)); |
| if (m) { |
| m->id = c->id; |
| APR_RING_ELEM_INIT(m, link); |
| m->c = c; |
| apr_pool_create_ex(&m->pool, parent, NULL, allocator); |
| if (!m->pool) { |
| return NULL; |
| } |
| apr_pool_tag(m->pool, "h2_mplx"); |
| apr_allocator_owner_set(allocator, m->pool); |
| |
| status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, |
| m->pool); |
| if (status != APR_SUCCESS) { |
| h2_mplx_destroy(m); |
| return NULL; |
| } |
| |
| status = apr_thread_cond_create(&m->task_thawed, m->pool); |
| if (status != APR_SUCCESS) { |
| h2_mplx_destroy(m); |
| return NULL; |
| } |
| |
| m->bucket_alloc = apr_bucket_alloc_create(m->pool); |
| m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); |
| m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); |
| |
| m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); |
| m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); |
| m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); |
| m->q = h2_iq_create(m->pool, m->max_streams); |
| m->readyq = h2_iq_create(m->pool, m->max_streams); |
| m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); |
| m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id)); |
| |
| m->stream_timeout = stream_timeout; |
| m->workers = workers; |
| m->workers_max = workers->max_workers; |
| m->workers_limit = 6; /* the original h1 max parallel connections */ |
| m->last_limit_change = m->last_idle_block = apr_time_now(); |
| m->limit_change_interval = apr_time_from_msec(200); |
| |
| m->tx_handles_reserved = 0; |
| m->tx_chunk_size = 4; |
| |
| m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); |
| |
| m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams, |
| m->stream_max_mem); |
| h2_ngn_shed_set_ctx(m->ngn_shed , m); |
| } |
| return m; |
| } |
| |
| int h2_mplx_shutdown(h2_mplx *m) |
| { |
| int acquired, max_stream_started = 0; |
| |
| if (enter_mutex(m, &acquired) == APR_SUCCESS) { |
| max_stream_started = m->max_stream_started; |
| /* Clear schedule queue, disabling existing streams from starting */ |
| h2_iq_clear(m->q); |
| leave_mutex(m, acquired); |
| } |
| return max_stream_started; |
| } |
| |
| static int input_consumed_signal(h2_mplx *m, h2_stream *stream) |
| { |
| if (stream->input) { |
| return h2_beam_report_consumption(stream->input); |
| } |
| return 0; |
| } |
| |
| static int output_consumed_signal(h2_mplx *m, h2_task *task) |
| { |
| if (task->output.beam) { |
| return h2_beam_report_consumption(task->output.beam); |
| } |
| return 0; |
| } |
| |
| |
| static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) |
| { |
| conn_rec *slave = NULL; |
| int reuse_slave = 0; |
| |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, |
| "h2_task(%s): destroy", task->id); |
| if (called_from_master) { |
| /* Process outstanding events before destruction */ |
| h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); |
| if (stream) { |
| input_consumed_signal(m, stream); |
| } |
| } |
| |
| h2_beam_on_produced(task->output.beam, NULL, NULL); |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, |
| APLOGNO(03385) "h2_task(%s): destroy " |
| "output beam empty=%d, holds proxies=%d", |
| task->id, |
| h2_beam_empty(task->output.beam), |
| h2_beam_holds_proxies(task->output.beam)); |
| |
| slave = task->c; |
| reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) |
| && !task->rst_error); |
| |
| h2_ihash_remove(m->tasks, task->stream_id); |
| h2_ihash_remove(m->redo_tasks, task->stream_id); |
| h2_task_destroy(task); |
| |
| if (slave) { |
| if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { |
| APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; |
| } |
| else { |
| slave->sbh = NULL; |
| h2_slave_destroy(slave); |
| } |
| } |
| |
| check_tx_free(m); |
| } |
| |
| static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) |
| { |
| h2_task *task; |
| |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, |
| "h2_stream(%ld-%d): done", m->c->id, stream->id); |
| /* Situation: we are, on the master connection, done with processing |
| * the stream. Either we have handled it successfully, or the stream |
| * was reset by the client or the connection is gone and we are |
| * shutting down the whole session. |
| * |
| * We possibly have created a task for this stream to be processed |
| * on a slave connection. The processing might actually be ongoing |
| * right now or has already finished. A finished task waits for its |
| * stream to be done. This is the common case. |
| * |
| * If the stream had input (e.g. the request had a body), a task |
| * may have read, or is still reading buckets from the input beam. |
| * This means that the task is referencing memory from the stream's |
| * pool (or the master connection bucket alloc). Before we can free |
| * the stream pool, we need to make sure that those references are |
| * gone. This is what h2_beam_shutdown() on the input waits for. |
| * |
| * With the input handled, we can tear down that beam and care |
| * about the output beam. The stream might still have buffered some |
| * buckets read from the output, so we need to get rid of those. That |
| * is done by h2_stream_cleanup(). |
| * |
| * Now it is save to destroy the task (if it exists and is finished). |
| * |
| * FIXME: we currently destroy the stream, even if the task is still |
| * ongoing. This is not ok, since task->request is coming from stream |
| * memory. We should either copy it on task creation or wait with the |
| * stream destruction until the task is done. |
| */ |
| h2_iq_remove(m->q, stream->id); |
| h2_ihash_remove(m->streams, stream->id); |
| |
| h2_stream_cleanup(stream); |
| m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); |
| h2_beam_on_consumed(stream->input, NULL, NULL, NULL); |
| /* Let anyone blocked reading know that there is no more to come */ |
| h2_beam_abort(stream->input); |
| /* Remove mutex after, so that abort still finds cond to signal */ |
| h2_beam_mutex_set(stream->input, NULL, NULL, NULL); |
| m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output); |
| |
| task = h2_ihash_get(m->tasks, stream->id); |
| if (task) { |
| if (!task->worker_done) { |
| /* task still running, cleanup once it is done */ |
| if (rst_error) { |
| h2_task_rst(task, rst_error); |
| } |
| h2_ihash_add(m->shold, stream); |
| return; |
| } |
| else { |
| /* already finished */ |
| task_destroy(m, task, 1); |
| } |
| } |
| h2_stream_destroy(stream); |
| } |
| |
| static int stream_done_iter(void *ctx, void *val) |
| { |
| stream_done((h2_mplx*)ctx, val, 0); |
| return 0; |
| } |
| |
| typedef struct { |
| h2_mplx_stream_cb *cb; |
| void *ctx; |
| } stream_iter_ctx_t; |
| |
| static int stream_iter_wrap(void *ctx, void *stream) |
| { |
| stream_iter_ctx_t *x = ctx; |
| return x->cb(stream, x->ctx); |
| } |
| |
| apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| stream_iter_ctx_t x; |
| x.cb = cb; |
| x.ctx = ctx; |
| h2_ihash_iter(m->streams, stream_iter_wrap, &x); |
| |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| static int task_print(void *ctx, void *val) |
| { |
| h2_mplx *m = ctx; |
| h2_task *task = val; |
| |
| if (task) { |
| h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); |
| |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ |
| "->03198: h2_stream(%s): %s %s %s" |
| "[orph=%d/started=%d/done=%d/frozen=%d]", |
| task->id, task->request->method, |
| task->request->authority, task->request->path, |
| (stream? 0 : 1), task->worker_started, |
| task->worker_done, task->frozen); |
| } |
| else { |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ |
| "->03198: h2_stream(%ld-NULL): NULL", m->id); |
| } |
| return 1; |
| } |
| |
| static int task_abort_connection(void *ctx, void *val) |
| { |
| h2_task *task = val; |
| if (!task->worker_done) { |
| if (task->c) { |
| task->c->aborted = 1; |
| } |
| h2_beam_abort(task->input.beam); |
| h2_beam_abort(task->output.beam); |
| } |
| return 1; |
| } |
| |
| static int report_stream_iter(void *ctx, void *val) { |
| h2_mplx *m = ctx; |
| h2_stream *stream = val; |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", |
| m->id, stream->id, stream->started, stream->scheduled, |
| h2_stream_is_ready(stream)); |
| return 1; |
| } |
| |
| static int task_done_iter(void *ctx, void *val); |
| |
| apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| /* How to shut down a h2 connection: |
| * 1. tell the workers that no more tasks will come from us */ |
| h2_workers_unregister(m->workers, m); |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| int i, wait_secs = 60; |
| |
| /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear |
| * our TODO list and purge any streams we have collected */ |
| h2_mplx_set_consumed_cb(m, NULL, NULL); |
| h2_mplx_abort(m); |
| h2_iq_clear(m->q); |
| purge_streams(m); |
| |
| /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'. |
| * m->streams has to be empty afterwards with streams either in |
| * a) m->shold because a task is still active |
| * b) m->spurge because task is done, or was not started */ |
| h2_ihash_iter(m->tasks, task_abort_connection, m); |
| apr_thread_cond_broadcast(m->task_thawed); |
| while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { |
| /* iterate until all streams have been removed */ |
| } |
| ap_assert(h2_ihash_empty(m->streams)); |
| |
| /* 4. purge all streams we collected by marking them 'done' */ |
| purge_streams(m); |
| |
| /* 5. while workers are busy on this connection, meaning they |
| * are processing tasks from this connection, wait on them finishing |
| * to wake us and check again. Eventually, this has to succeed. */ |
| m->join_wait = wait; |
| for (i = 0; m->workers_busy > 0; ++i) { |
| status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); |
| |
| if (APR_STATUS_IS_TIMEUP(status)) { |
| /* This can happen if we have very long running requests |
| * that do not time out on IO. */ |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198) |
| "h2_mplx(%ld): release, waiting for %d seconds now for " |
| "%d h2_workers to return, have still %d tasks outstanding", |
| m->id, i*wait_secs, m->workers_busy, |
| (int)h2_ihash_count(m->tasks)); |
| h2_ihash_iter(m->shold, report_stream_iter, m); |
| h2_ihash_iter(m->tasks, task_print, m); |
| } |
| purge_streams(m); |
| } |
| m->join_wait = NULL; |
| |
| /* 6. All workers for this connection are done, we are in |
| * single-threaded processing now effectively. */ |
| leave_mutex(m, acquired); |
| |
| if (!h2_ihash_empty(m->tasks)) { |
| /* when we are here, we lost track of the tasks still present. |
| * this currently happens with mod_proxy_http2 when we shut |
| * down a h2_req_engine with tasks assigned. Since no parallel |
| * processing is going on any more, we just clean them up. */ |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) |
| "h2_mplx(%ld): 3. release_join with %d tasks", |
| m->id, (int)h2_ihash_count(m->tasks)); |
| h2_ihash_iter(m->tasks, task_print, m); |
| |
| while (!h2_ihash_iter(m->tasks, task_done_iter, m)) { |
| /* iterate until all tasks have been removed */ |
| } |
| } |
| |
| /* 7. With all tasks done, the stream hold should be empty and all |
| * remaining streams are ready for purging */ |
| ap_assert(h2_ihash_empty(m->shold)); |
| purge_streams(m); |
| |
| /* 8. close the h2_req_enginge shed and self destruct */ |
| h2_ngn_shed_destroy(m->ngn_shed); |
| m->ngn_shed = NULL; |
| h2_mplx_destroy(m); |
| } |
| return status; |
| } |
| |
| void h2_mplx_abort(h2_mplx *m) |
| { |
| int acquired; |
| |
| if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) { |
| m->aborted = 1; |
| h2_ngn_shed_abort(m->ngn_shed); |
| leave_mutex(m, acquired); |
| } |
| } |
| |
| apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) |
| { |
| apr_status_t status = APR_SUCCESS; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%ld-%d): marking stream as done.", |
| m->id, stream->id); |
| stream_done(m, stream, stream->rst_error); |
| purge_streams(m); |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) |
| { |
| h2_stream *s = NULL; |
| int acquired; |
| |
| if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| s = h2_ihash_get(m->streams, id); |
| leave_mutex(m, acquired); |
| } |
| return s; |
| } |
| |
| void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) |
| { |
| m->input_consumed = cb; |
| m->input_consumed_ctx = ctx; |
| } |
| |
| static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) |
| { |
| h2_mplx *m = ctx; |
| apr_status_t status; |
| h2_stream *stream; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| stream = h2_ihash_get(m->streams, beam->id); |
| if (stream) { |
| have_out_data_for(m, stream, 0); |
| } |
| leave_mutex(m, acquired); |
| } |
| } |
| |
| static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) |
| { |
| apr_status_t status = APR_SUCCESS; |
| h2_task *task = h2_ihash_get(m->tasks, stream_id); |
| h2_stream *stream = h2_ihash_get(m->streams, stream_id); |
| apr_size_t beamed_count; |
| |
| if (!task || !stream) { |
| return APR_ECONNABORTED; |
| } |
| |
| if (APLOGctrace2(m->c)) { |
| h2_beam_log(beam, stream_id, "out_open", m->c, APLOG_TRACE2); |
| } |
| else { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, |
| "h2_mplx(%s): out open", task->id); |
| } |
| |
| h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task); |
| h2_beam_on_produced(stream->output, output_produced, m); |
| beamed_count = h2_beam_get_files_beamed(stream->output); |
| if (m->tx_handles_reserved >= beamed_count) { |
| m->tx_handles_reserved -= beamed_count; |
| } |
| else { |
| m->tx_handles_reserved = 0; |
| } |
| if (!task->output.copy_files) { |
| h2_beam_on_file_beam(stream->output, can_beam_file, m); |
| } |
| |
| /* time to protect the beam against multi-threaded use */ |
| h2_beam_mutex_set(stream->output, beam_enter, task->cond, m); |
| |
| /* we might see some file buckets in the output, see |
| * if we have enough handles reserved. */ |
| check_tx_reservation(m); |
| have_out_data_for(m, stream, 0); |
| return status; |
| } |
| |
| apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (m->aborted) { |
| status = APR_ECONNABORTED; |
| } |
| else { |
| status = out_open(m, stream_id, beam); |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| static apr_status_t out_close(h2_mplx *m, h2_task *task) |
| { |
| apr_status_t status = APR_SUCCESS; |
| h2_stream *stream; |
| |
| if (!task) { |
| return APR_ECONNABORTED; |
| } |
| |
| stream = h2_ihash_get(m->streams, task->stream_id); |
| if (!stream) { |
| return APR_ECONNABORTED; |
| } |
| |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, |
| "h2_mplx(%s): close", task->id); |
| status = h2_beam_close(task->output.beam); |
| h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, |
| APLOG_TRACE2); |
| output_consumed_signal(m, task); |
| have_out_data_for(m, stream, 0); |
| return status; |
| } |
| |
| apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, |
| apr_thread_cond_t *iowait) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (m->aborted) { |
| status = APR_ECONNABORTED; |
| } |
| else if (apr_atomic_read32(&m->event_pending) > 0) { |
| status = APR_SUCCESS; |
| } |
| else { |
| purge_streams(m); |
| m->added_output = iowait; |
| status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); |
| if (APLOGctrace2(m->c)) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%ld): trywait on data for %f ms)", |
| m->id, timeout/1000.0); |
| } |
| m->added_output = NULL; |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) |
| { |
| ap_assert(m); |
| ap_assert(stream); |
| h2_iq_append(m->readyq, stream->id); |
| apr_atomic_set32(&m->event_pending, 1); |
| if (m->added_output) { |
| apr_thread_cond_signal(m->added_output); |
| } |
| } |
| |
| apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (m->aborted) { |
| status = APR_ECONNABORTED; |
| } |
| else { |
| h2_iq_sort(m->q, cmp, ctx); |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): reprioritize tasks", m->id); |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, |
| h2_stream_pri_cmp *cmp, void *ctx) |
| { |
| apr_status_t status; |
| int do_registration = 0; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (m->aborted) { |
| status = APR_ECONNABORTED; |
| } |
| else { |
| h2_ihash_add(m->streams, stream); |
| if (h2_stream_is_ready(stream)) { |
| apr_atomic_set32(&m->event_pending, 1); |
| h2_iq_append(m->readyq, stream->id); |
| } |
| else { |
| if (!m->need_registration) { |
| m->need_registration = h2_iq_empty(m->q); |
| } |
| if (m->workers_busy < m->workers_max) { |
| do_registration = m->need_registration; |
| } |
| h2_iq_add(m->q, stream->id, cmp, ctx); |
| } |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, |
| "h2_mplx(%ld-%d): process", m->c->id, stream->id); |
| } |
| leave_mutex(m, acquired); |
| } |
| if (do_registration) { |
| m->need_registration = 0; |
| h2_workers_register(m->workers, m); |
| } |
| return status; |
| } |
| |
| static h2_task *next_stream_task(h2_mplx *m) |
| { |
| h2_task *task = NULL; |
| h2_stream *stream; |
| int sid; |
| while (!m->aborted && !task && (m->workers_busy < m->workers_limit) |
| && (sid = h2_iq_shift(m->q)) > 0) { |
| |
| stream = h2_ihash_get(m->streams, sid); |
| if (stream) { |
| conn_rec *slave, **pslave; |
| int new_conn = 0; |
| |
| pslave = (conn_rec **)apr_array_pop(m->spare_slaves); |
| if (pslave) { |
| slave = *pslave; |
| } |
| else { |
| slave = h2_slave_create(m->c, stream->id, m->pool); |
| new_conn = 1; |
| } |
| |
| slave->sbh = m->c->sbh; |
| slave->aborted = 0; |
| task = h2_task_create(slave, stream->id, stream->request, |
| stream->input, stream->output, m); |
| h2_ihash_add(m->tasks, task); |
| |
| m->c->keepalives++; |
| apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); |
| if (new_conn) { |
| h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); |
| } |
| stream->started = 1; |
| stream->can_be_cleaned = 0; |
| task->worker_started = 1; |
| task->started_at = apr_time_now(); |
| if (sid > m->max_stream_started) { |
| m->max_stream_started = sid; |
| } |
| |
| h2_beam_timeout_set(stream->input, m->stream_timeout); |
| h2_beam_on_consumed(stream->input, stream_input_ev, |
| stream_input_consumed, m); |
| h2_beam_on_file_beam(stream->input, can_beam_file, m); |
| h2_beam_mutex_set(stream->input, beam_enter, task->cond, m); |
| |
| h2_beam_buffer_size_set(stream->output, m->stream_max_mem); |
| h2_beam_timeout_set(stream->output, m->stream_timeout); |
| ++m->workers_busy; |
| } |
| } |
| return task; |
| } |
| |
| h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) |
| { |
| h2_task *task = NULL; |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (m->aborted) { |
| *has_more = 0; |
| } |
| else { |
| task = next_stream_task(m); |
| *has_more = !h2_iq_empty(m->q); |
| } |
| |
| if (has_more && !task) { |
| m->need_registration = 1; |
| } |
| leave_mutex(m, acquired); |
| } |
| return task; |
| } |
| |
| static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) |
| { |
| if (task->frozen) { |
| /* this task was handed over to an engine for processing |
| * and the original worker has finished. That means the |
| * engine may start processing now. */ |
| h2_task_thaw(task); |
| apr_thread_cond_broadcast(m->task_thawed); |
| return; |
| } |
| else { |
| h2_stream *stream; |
| |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): task(%s) done", m->id, task->id); |
| out_close(m, task); |
| |
| if (ngn) { |
| apr_off_t bytes = 0; |
| h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); |
| bytes += h2_beam_get_buffered(task->output.beam); |
| if (bytes > 0) { |
| /* we need to report consumed and current buffered output |
| * to the engine. The request will be streamed out or cancelled, |
| * no more data is coming from it and the engine should update |
| * its calculations before we destroy this information. */ |
| h2_req_engine_out_consumed(ngn, task->c, bytes); |
| } |
| } |
| |
| if (task->engine) { |
| if (!m->aborted && !task->c->aborted |
| && !h2_req_engine_is_shutdown(task->engine)) { |
| ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, |
| "h2_mplx(%ld): task(%s) has not-shutdown " |
| "engine(%s)", m->id, task->id, |
| h2_req_engine_get_id(task->engine)); |
| } |
| h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); |
| } |
| |
| stream = h2_ihash_get(m->streams, task->stream_id); |
| if (!m->aborted && stream |
| && h2_ihash_get(m->redo_tasks, task->stream_id)) { |
| /* reset and schedule again */ |
| h2_task_redo(task); |
| h2_ihash_remove(m->redo_tasks, task->stream_id); |
| h2_iq_add(m->q, task->stream_id, NULL, NULL); |
| return; |
| } |
| |
| task->worker_done = 1; |
| task->done_at = apr_time_now(); |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%s): request done, %f ms elapsed", task->id, |
| (task->done_at - task->started_at) / 1000.0); |
| if (task->started_at > m->last_idle_block) { |
| /* this task finished without causing an 'idle block', e.g. |
| * a block by flow control. |
| */ |
| if (task->done_at- m->last_limit_change >= m->limit_change_interval |
| && m->workers_limit < m->workers_max) { |
| /* Well behaving stream, allow it more workers */ |
| m->workers_limit = H2MIN(m->workers_limit * 2, |
| m->workers_max); |
| m->last_limit_change = task->done_at; |
| m->need_registration = 1; |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): increase worker limit to %d", |
| m->id, m->workers_limit); |
| } |
| } |
| |
| if (stream) { |
| /* hang around until the stream deregisters */ |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%s): task_done, stream still open", |
| task->id); |
| /* more data will not arrive, resume the stream */ |
| have_out_data_for(m, stream, 0); |
| h2_beam_on_consumed(stream->output, NULL, NULL, NULL); |
| h2_beam_mutex_set(stream->output, NULL, NULL, NULL); |
| } |
| else { |
| /* stream no longer active, was it placed in hold? */ |
| stream = h2_ihash_get(m->shold, task->stream_id); |
| if (stream) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%s): task_done, stream %d in hold", |
| task->id, stream->id); |
| /* We cannot destroy the stream here since this is |
| * called from a worker thread and freeing memory pools |
| * is only safe in the only thread using it (and its |
| * parent pool / allocator) */ |
| h2_beam_on_consumed(stream->output, NULL, NULL, NULL); |
| h2_beam_mutex_set(stream->output, NULL, NULL, NULL); |
| h2_ihash_remove(m->shold, stream->id); |
| h2_ihash_add(m->spurge, stream); |
| } |
| else { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, |
| "h2_mplx(%s): task_done, stream not found", |
| task->id); |
| task_destroy(m, task, 0); |
| } |
| } |
| } |
| } |
| |
| static int task_done_iter(void *ctx, void *val) |
| { |
| task_done((h2_mplx*)ctx, val, 0); |
| return 0; |
| } |
| |
| void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) |
| { |
| int acquired; |
| |
| if (enter_mutex(m, &acquired) == APR_SUCCESS) { |
| task_done(m, task, NULL); |
| --m->workers_busy; |
| if (m->join_wait) { |
| apr_thread_cond_signal(m->join_wait); |
| } |
| if (ptask) { |
| /* caller wants another task */ |
| *ptask = next_stream_task(m); |
| } |
| leave_mutex(m, acquired); |
| } |
| } |
| |
| /******************************************************************************* |
| * h2_mplx DoS protection |
| ******************************************************************************/ |
| |
| static int latest_repeatable_unsubmitted_iter(void *data, void *val) |
| { |
| task_iter_ctx *ctx = data; |
| h2_stream *stream; |
| h2_task *task = val; |
| if (!task->worker_done && h2_task_can_redo(task) |
| && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { |
| stream = h2_ihash_get(ctx->m->streams, task->stream_id); |
| if (stream && !h2_stream_is_ready(stream)) { |
| /* this task occupies a worker, the response has not been submitted |
| * yet, not been cancelled and it is a repeatable request |
| * -> it can be re-scheduled later */ |
| if (!ctx->task || ctx->task->started_at < task->started_at) { |
| /* we did not have one or this one was started later */ |
| ctx->task = task; |
| } |
| } |
| } |
| return 1; |
| } |
| |
| static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) |
| { |
| task_iter_ctx ctx; |
| ctx.m = m; |
| ctx.task = NULL; |
| h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx); |
| return ctx.task; |
| } |
| |
| static int timed_out_busy_iter(void *data, void *val) |
| { |
| task_iter_ctx *ctx = data; |
| h2_task *task = val; |
| if (!task->worker_done |
| && (ctx->now - task->started_at) > ctx->m->stream_timeout) { |
| /* timed out stream occupying a worker, found */ |
| ctx->task = task; |
| return 0; |
| } |
| return 1; |
| } |
| |
| static h2_task *get_timed_out_busy_task(h2_mplx *m) |
| { |
| task_iter_ctx ctx; |
| ctx.m = m; |
| ctx.task = NULL; |
| ctx.now = apr_time_now(); |
| h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx); |
| return ctx.task; |
| } |
| |
| static apr_status_t unschedule_slow_tasks(h2_mplx *m) |
| { |
| h2_task *task; |
| int n; |
| |
| /* Try to get rid of streams that occupy workers. Look for safe requests |
| * that are repeatable. If none found, fail the connection. |
| */ |
| n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->redo_tasks)); |
| while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) { |
| h2_task_rst(task, H2_ERR_CANCEL); |
| h2_ihash_add(m->redo_tasks, task); |
| --n; |
| } |
| |
| if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) { |
| task = get_timed_out_busy_task(m); |
| if (task) { |
| /* Too many busy workers, unable to cancel enough streams |
| * and with a busy, timed out stream, we tell the client |
| * to go away... */ |
| return APR_TIMEUP; |
| } |
| } |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t h2_mplx_idle(h2_mplx *m) |
| { |
| apr_status_t status = APR_SUCCESS; |
| apr_time_t now; |
| int acquired; |
| |
| if (enter_mutex(m, &acquired) == APR_SUCCESS) { |
| apr_size_t scount = h2_ihash_count(m->streams); |
| if (scount > 0 && m->workers_busy) { |
| /* If we have streams in connection state 'IDLE', meaning |
| * all streams are ready to sent data out, but lack |
| * WINDOW_UPDATEs. |
| * |
| * This is ok, unless we have streams that still occupy |
| * h2 workers. As worker threads are a scarce resource, |
| * we need to take measures that we do not get DoSed. |
| * |
| * This is what we call an 'idle block'. Limit the amount |
| * of busy workers we allow for this connection until it |
| * well behaves. |
| */ |
| now = apr_time_now(); |
| m->last_idle_block = now; |
| if (m->workers_limit > 2 |
| && now - m->last_limit_change >= m->limit_change_interval) { |
| if (m->workers_limit > 16) { |
| m->workers_limit = 16; |
| } |
| else if (m->workers_limit > 8) { |
| m->workers_limit = 8; |
| } |
| else if (m->workers_limit > 4) { |
| m->workers_limit = 4; |
| } |
| else if (m->workers_limit > 2) { |
| m->workers_limit = 2; |
| } |
| m->last_limit_change = now; |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): decrease worker limit to %d", |
| m->id, m->workers_limit); |
| } |
| |
| if (m->workers_busy > m->workers_limit) { |
| status = unschedule_slow_tasks(m); |
| } |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| /******************************************************************************* |
| * HTTP/2 request engines |
| ******************************************************************************/ |
| |
| typedef struct { |
| h2_mplx * m; |
| h2_req_engine *ngn; |
| int streams_updated; |
| } ngn_update_ctx; |
| |
| static int ngn_update_window(void *ctx, void *val) |
| { |
| ngn_update_ctx *uctx = ctx; |
| h2_task *task = val; |
| if (task && task->assigned == uctx->ngn |
| && output_consumed_signal(uctx->m, task)) { |
| ++uctx->streams_updated; |
| } |
| return 1; |
| } |
| |
| static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) |
| { |
| ngn_update_ctx ctx; |
| |
| ctx.m = m; |
| ctx.ngn = ngn; |
| ctx.streams_updated = 0; |
| h2_ihash_iter(m->tasks, ngn_update_window, &ctx); |
| |
| return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; |
| } |
| |
| apr_status_t h2_mplx_req_engine_push(const char *ngn_type, |
| request_rec *r, |
| http2_req_engine_init *einit) |
| { |
| apr_status_t status; |
| h2_mplx *m; |
| h2_task *task; |
| int acquired; |
| |
| task = h2_ctx_rget_task(r); |
| if (!task) { |
| return APR_ECONNABORTED; |
| } |
| m = task->mplx; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); |
| |
| if (stream) { |
| status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); |
| } |
| else { |
| status = APR_ECONNABORTED; |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, |
| apr_read_type_e block, |
| int capacity, |
| request_rec **pr) |
| { |
| h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); |
| h2_mplx *m = h2_ngn_shed_get_ctx(shed); |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| int want_shutdown = (block == APR_BLOCK_READ); |
| |
| /* Take this opportunity to update output consummation |
| * for this engine */ |
| ngn_out_update_windows(m, ngn); |
| |
| if (want_shutdown && !h2_iq_empty(m->q)) { |
| /* For a blocking read, check first if requests are to be |
| * had and, if not, wait a short while before doing the |
| * blocking, and if unsuccessful, terminating read. |
| */ |
| status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); |
| if (APR_STATUS_IS_EAGAIN(status)) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, |
| "h2_mplx(%ld): start block engine pull", m->id); |
| apr_thread_cond_timedwait(m->task_thawed, m->lock, |
| apr_time_from_msec(20)); |
| status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); |
| } |
| } |
| else { |
| status = h2_ngn_shed_pull_request(shed, ngn, capacity, |
| want_shutdown, pr); |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, |
| apr_status_t status) |
| { |
| h2_task *task = h2_ctx_cget_task(r_conn); |
| |
| if (task) { |
| h2_mplx *m = task->mplx; |
| int acquired; |
| |
| if (enter_mutex(m, &acquired) == APR_SUCCESS) { |
| ngn_out_update_windows(m, ngn); |
| h2_ngn_shed_done_task(m->ngn_shed, ngn, task); |
| if (status != APR_SUCCESS && h2_task_can_redo(task) |
| && !h2_ihash_get(m->redo_tasks, task->stream_id)) { |
| h2_ihash_add(m->redo_tasks, task); |
| } |
| if (task->engine) { |
| /* cannot report that as done until engine returns */ |
| } |
| else { |
| task_done(m, task, ngn); |
| } |
| /* Take this opportunity to update output consummation |
| * for this engine */ |
| leave_mutex(m, acquired); |
| } |
| } |
| } |
| |
| /******************************************************************************* |
| * mplx master events dispatching |
| ******************************************************************************/ |
| |
| int h2_mplx_has_master_events(h2_mplx *m) |
| { |
| return apr_atomic_read32(&m->event_pending) > 0; |
| } |
| |
| static int report_consumption_iter(void *ctx, void *val) |
| { |
| input_consumed_signal(ctx, val); |
| return 1; |
| } |
| |
| apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, |
| stream_ev_callback *on_resume, |
| void *on_ctx) |
| { |
| apr_status_t status; |
| int acquired; |
| int ids[100]; |
| h2_stream *stream; |
| size_t i, n; |
| |
| if (!h2_mplx_has_master_events(m)) { |
| return APR_EAGAIN; |
| } |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, |
| "h2_mplx(%ld): dispatch events", m->id); |
| apr_atomic_set32(&m->event_pending, 0); |
| /* update input windows for streams */ |
| h2_ihash_iter(m->streams, report_consumption_iter, m); |
| |
| if (!h2_iq_empty(m->readyq)) { |
| n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); |
| for (i = 0; i < n; ++i) { |
| stream = h2_ihash_get(m->streams, ids[i]); |
| if (stream) { |
| on_resume(on_ctx, stream); |
| } |
| } |
| } |
| if (!h2_iq_empty(m->readyq)) { |
| apr_atomic_set32(&m->event_pending, 1); |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id) |
| { |
| apr_status_t status; |
| int acquired; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| h2_stream *s = h2_ihash_get(m->streams, stream_id); |
| if (s) { |
| h2_iq_append(m->readyq, stream_id); |
| apr_atomic_set32(&m->event_pending, 1); |
| } |
| leave_mutex(m, acquired); |
| } |
| return status; |
| } |
| |
| int h2_mplx_awaits_data(h2_mplx *m) |
| { |
| apr_status_t status; |
| int acquired, waiting = 1; |
| |
| if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { |
| if (h2_ihash_empty(m->streams)) { |
| waiting = 0; |
| } |
| if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) { |
| waiting = 0; |
| } |
| leave_mutex(m, acquired); |
| } |
| return waiting; |
| } |