| /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <assert.h> |
| #include <stddef.h> |
| #include <stdlib.h> |
| |
| #include <apr_thread_mutex.h> |
| #include <apr_thread_cond.h> |
| #include <apr_strings.h> |
| #include <apr_time.h> |
| |
| #include <httpd.h> |
| #include <http_core.h> |
| #include <http_log.h> |
| |
| #include "mod_http2.h" |
| |
| #include "h2_private.h" |
| #include "h2.h" |
| #include "h2_config.h" |
| #include "h2_conn.h" |
| #include "h2_ctx.h" |
| #include "h2_h2.h" |
| #include "h2_mplx.h" |
| #include "h2_response.h" |
| #include "h2_request.h" |
| #include "h2_task.h" |
| #include "h2_util.h" |
| #include "h2_ngn_shed.h" |
| |
| |
| typedef struct h2_ngn_entry h2_ngn_entry; |
| struct h2_ngn_entry { |
| APR_RING_ENTRY(h2_ngn_entry) link; |
| h2_task *task; |
| }; |
| |
| #define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) |
| #define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link) |
| #define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) |
| |
| #define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link) |
| #define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link) |
| #define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) |
| #define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) |
| |
| #define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ |
| h2_ngn_entry *ap__b = (e); \ |
| APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \ |
| } while (0) |
| |
| #define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ |
| h2_ngn_entry *ap__b = (e); \ |
| APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \ |
| } while (0) |
| |
| struct h2_req_engine { |
| const char *id; /* identifier */ |
| const char *type; /* name of the engine type */ |
| apr_pool_t *pool; /* pool for engine specific allocations */ |
| conn_rec *c; /* connection this engine is assigned to */ |
| h2_task *task; /* the task this engine is base on, running in */ |
| h2_ngn_shed *shed; |
| |
| unsigned int shutdown : 1; /* engine is being shut down */ |
| unsigned int done : 1; /* engine has finished */ |
| |
| APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries; |
| apr_uint32_t capacity; /* maximum concurrent requests */ |
| apr_uint32_t no_assigned; /* # of assigned requests */ |
| apr_uint32_t no_live; /* # of live */ |
| apr_uint32_t no_finished; /* # of finished */ |
| |
| h2_output_consumed *out_consumed; |
| void *out_consumed_ctx; |
| }; |
| |
| const char *h2_req_engine_get_id(h2_req_engine *engine) |
| { |
| return engine->id; |
| } |
| |
| int h2_req_engine_is_shutdown(h2_req_engine *engine) |
| { |
| return engine->shutdown; |
| } |
| |
| void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, |
| apr_off_t bytes) |
| { |
| if (engine->out_consumed) { |
| engine->out_consumed(engine->out_consumed_ctx, c, bytes); |
| } |
| } |
| |
| h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, |
| apr_uint32_t default_capacity, |
| apr_uint32_t req_buffer_size) |
| { |
| h2_ngn_shed *shed; |
| |
| shed = apr_pcalloc(pool, sizeof(*shed)); |
| shed->c = c; |
| shed->pool = pool; |
| shed->default_capacity = default_capacity; |
| shed->req_buffer_size = req_buffer_size; |
| shed->ngns = apr_hash_make(pool); |
| |
| return shed; |
| } |
| |
| void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx) |
| { |
| shed->user_ctx = user_ctx; |
| } |
| |
| void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed) |
| { |
| return shed->user_ctx; |
| } |
| |
| h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn) |
| { |
| return ngn->shed; |
| } |
| |
| void h2_ngn_shed_abort(h2_ngn_shed *shed) |
| { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03394) |
| "h2_ngn_shed(%ld): abort", shed->c->id); |
| shed->aborted = 1; |
| } |
| |
| static void ngn_add_task(h2_req_engine *ngn, h2_task *task) |
| { |
| h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry)); |
| APR_RING_ELEM_INIT(entry, link); |
| entry->task = task; |
| H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); |
| } |
| |
| |
| apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, |
| h2_task *task, http2_req_engine_init *einit) |
| { |
| h2_req_engine *ngn; |
| |
| AP_DEBUG_ASSERT(shed); |
| |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, |
| "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, |
| task->id); |
| if (task->ser_headers) { |
| /* Max compatibility, deny processing of this */ |
| return APR_EOF; |
| } |
| |
| ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING); |
| if (ngn && !ngn->shutdown) { |
| /* this task will be processed in another thread, |
| * freeze any I/O for the time being. */ |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, |
| "h2_ngn_shed(%ld): pushing request %s to %s", |
| shed->c->id, task->id, ngn->id); |
| if (!h2_task_is_detached(task)) { |
| h2_task_freeze(task); |
| } |
| /* FIXME: sometimes ngn is garbage, probly alread freed */ |
| ngn_add_task(ngn, task); |
| ngn->no_assigned++; |
| return APR_SUCCESS; |
| } |
| |
| /* no existing engine or being shut down, start a new one */ |
| if (einit) { |
| apr_status_t status; |
| apr_pool_t *pool = task->pool; |
| h2_req_engine *newngn; |
| |
| newngn = apr_pcalloc(pool, sizeof(*ngn)); |
| newngn->pool = pool; |
| newngn->id = apr_psprintf(pool, "ngn-%s", task->id); |
| newngn->type = apr_pstrdup(pool, ngn_type); |
| newngn->c = task->c; |
| newngn->shed = shed; |
| newngn->capacity = shed->default_capacity; |
| newngn->no_assigned = 1; |
| newngn->no_live = 1; |
| APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); |
| |
| status = einit(newngn, newngn->id, newngn->type, newngn->pool, |
| shed->req_buffer_size, task->r, |
| &newngn->out_consumed, &newngn->out_consumed_ctx); |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395) |
| "h2_ngn_shed(%ld): create engine %s (%s)", |
| shed->c->id, newngn->id, newngn->type); |
| if (status == APR_SUCCESS) { |
| AP_DEBUG_ASSERT(task->engine == NULL); |
| newngn->task = task; |
| task->engine = newngn; |
| task->assigned = newngn; |
| apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn); |
| } |
| return status; |
| } |
| return APR_EOF; |
| } |
| |
| static h2_ngn_entry *pop_detached(h2_req_engine *ngn) |
| { |
| h2_ngn_entry *entry; |
| for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); |
| entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); |
| entry = H2_NGN_ENTRY_NEXT(entry)) { |
| if (h2_task_is_detached(entry->task) |
| || (entry->task->engine == ngn)) { |
| /* The task hosting this engine can always be pulled by it. |
| * For other task, they need to become detached, e.g. no longer |
| * assigned to another worker. */ |
| H2_NGN_ENTRY_REMOVE(entry); |
| return entry; |
| } |
| } |
| return NULL; |
| } |
| |
| apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, |
| h2_req_engine *ngn, |
| apr_uint32_t capacity, |
| int want_shutdown, |
| h2_task **ptask) |
| { |
| h2_ngn_entry *entry; |
| |
| AP_DEBUG_ASSERT(ngn); |
| *ptask = NULL; |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03396) |
| "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", |
| shed->c->id, ngn->id, want_shutdown); |
| if (shed->aborted) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03397) |
| "h2_ngn_shed(%ld): abort while pulling requests %s", |
| shed->c->id, ngn->id); |
| ngn->shutdown = 1; |
| return APR_ECONNABORTED; |
| } |
| |
| ngn->capacity = capacity; |
| if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) { |
| if (want_shutdown) { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, |
| "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", |
| shed->c->id, ngn->id); |
| ngn->shutdown = 1; |
| } |
| return ngn->shutdown? APR_EOF : APR_EAGAIN; |
| } |
| |
| if ((entry = pop_detached(ngn))) { |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, APLOGNO(03398) |
| "h2_ngn_shed(%ld): pulled request %s for engine %s", |
| shed->c->id, entry->task->id, ngn->id); |
| ngn->no_live++; |
| *ptask = entry->task; |
| entry->task->assigned = ngn; |
| /* task will now run in ngn's own thread. Modules like lua |
| * seem to require the correct thread set in the conn_rec. |
| * See PR 59542. */ |
| if (entry->task->c && ngn->c) { |
| entry->task->c->current_thread = ngn->c->current_thread; |
| } |
| if (entry->task->engine == ngn) { |
| /* If an engine pushes its own base task, and then pulls |
| * it back to itself again, it needs to be thawed. |
| */ |
| h2_task_thaw(entry->task); |
| } |
| return APR_SUCCESS; |
| } |
| |
| if (1) { |
| h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03399) |
| "h2_ngn_shed(%ld): pull task, nothing, first task %s", |
| shed->c->id, entry->task->id); |
| } |
| return APR_EAGAIN; |
| } |
| |
| static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, |
| h2_task *task, int waslive, int aborted) |
| { |
| ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03400) |
| "h2_ngn_shed(%ld): task %s %s by %s", |
| shed->c->id, task->id, aborted? "aborted":"done", ngn->id); |
| ngn->no_finished++; |
| if (waslive) ngn->no_live--; |
| ngn->no_assigned--; |
| task->assigned = NULL; |
| |
| return APR_SUCCESS; |
| } |
| |
| apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, |
| struct h2_req_engine *ngn, h2_task *task) |
| { |
| return ngn_done_task(shed, ngn, task, 1, 0); |
| } |
| |
| void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) |
| { |
| if (ngn->done) { |
| return; |
| } |
| |
| if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) { |
| h2_ngn_entry *entry; |
| ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, |
| "h2_ngn_shed(%ld): exit engine %s (%s), " |
| "has still requests queued, shutdown=%d," |
| "assigned=%ld, live=%ld, finished=%ld", |
| shed->c->id, ngn->id, ngn->type, |
| ngn->shutdown, |
| (long)ngn->no_assigned, (long)ngn->no_live, |
| (long)ngn->no_finished); |
| for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); |
| entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); |
| entry = H2_NGN_ENTRY_NEXT(entry)) { |
| h2_task *task = entry->task; |
| ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, |
| "h2_ngn_shed(%ld): engine %s has queued task %s, " |
| "frozen=%d, aborting", |
| shed->c->id, ngn->id, task->id, task->frozen); |
| ngn_done_task(shed, ngn, task, 0, 1); |
| } |
| } |
| if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { |
| ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, |
| "h2_ngn_shed(%ld): exit engine %s (%s), " |
| "assigned=%ld, live=%ld, finished=%ld", |
| shed->c->id, ngn->id, ngn->type, |
| (long)ngn->no_assigned, (long)ngn->no_live, |
| (long)ngn->no_finished); |
| } |
| else { |
| ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, |
| "h2_ngn_shed(%ld): exit engine %s", |
| shed->c->id, ngn->id); |
| } |
| |
| apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL); |
| ngn->done = 1; |
| } |