blob: 2a1599914c65f3fd8aba0d34b2f531bc22a0115d [file] [log] [blame]
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <apr_atomic.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <mpm_common.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include "h2.h"
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_task.h"
#include "h2_worker.h"
#include "h2_workers.h"
static int in_list(h2_workers *workers, h2_mplx *m)
{
h2_mplx *e;
for (e = H2_MPLX_LIST_FIRST(&workers->mplxs);
e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
e = H2_MPLX_NEXT(e)) {
if (e == m) {
return 1;
}
}
return 0;
}
static void cleanup_zombies(h2_workers *workers, int lock)
{
if (lock) {
apr_thread_mutex_lock(workers->lock);
}
while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
H2_WORKER_REMOVE(zombie);
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: cleanup zombie %d", zombie->id);
h2_worker_destroy(zombie);
}
if (lock) {
apr_thread_mutex_unlock(workers->lock);
}
}
static h2_task *next_task(h2_workers *workers)
{
h2_task *task = NULL;
h2_mplx *last = NULL;
int has_more;
/* Get the next h2_mplx to process that has a task to hand out.
* If it does, place it at the end of the queu and return the
* task to the worker.
* If it (currently) has no tasks, remove it so that it needs
* to register again for scheduling.
* If we run out of h2_mplx in the queue, we need to wait for
* new mplx to arrive. Depending on how many workers do exist,
* we do a timed wait or block indefinitely.
*/
while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
if (last == m) {
break;
}
H2_MPLX_REMOVE(m);
--workers->mplx_count;
task = h2_mplx_pop_task(m, &has_more);
if (has_more) {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
++workers->mplx_count;
if (!last) {
last = m;
}
}
}
return task;
}
/**
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
h2_task **ptask, int *psticky)
{
apr_status_t status;
apr_time_t wait_until = 0, now;
h2_workers *workers = ctx;
h2_task *task = NULL;
*ptask = NULL;
*psticky = 0;
status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
++workers->idle_workers;
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): looking for work", worker->id);
while (!h2_worker_is_aborted(worker) && !workers->aborted
&& !(task = next_task(workers))) {
/* Need to wait for a new tasks to arrive. If we are above
* minimum workers, we do a timed wait. When timeout occurs
* and we have still more workers, we shut down one after
* the other. */
cleanup_zombies(workers, 0);
if (workers->worker_count > workers->min_workers) {
now = apr_time_now();
if (now >= wait_until) {
wait_until = now + apr_time_from_sec(workers->max_idle_secs);
}
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): waiting signal, "
"workers=%d, idle=%d", worker->id,
(int)workers->worker_count,
workers->idle_workers);
status = apr_thread_cond_timedwait(workers->mplx_added,
workers->lock,
wait_until - now);
if (status == APR_TIMEUP
&& workers->worker_count > workers->min_workers) {
/* waited long enough without getting a task and
* we are above min workers, abort this one. */
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
workers->s,
"h2_workers: aborting idle worker");
h2_worker_abort(worker);
break;
}
}
else {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): waiting signal (eternal), "
"worker_count=%d, idle=%d", worker->id,
(int)workers->worker_count,
workers->idle_workers);
apr_thread_cond_wait(workers->mplx_added, workers->lock);
}
}
/* Here, we either have gotten task or decided to shut down
* the calling worker.
*/
if (task) {
/* Ok, we got something to give back to the worker for execution.
* If we have more idle workers than h2_mplx in our queue, then
* we let the worker be sticky, e.g. making it poll the task's
* h2_mplx instance for more work before asking back here.
* This avoids entering our global lock as long as enough idle
* workers remain. Stickiness of a worker ends when the connection
* has no new tasks to process, so the worker will get back here
* eventually.
*/
*ptask = task;
*psticky = (workers->max_workers >= workers->mplx_count);
if (workers->mplx_count && workers->idle_workers > 1) {
apr_thread_cond_signal(workers->mplx_added);
}
}
--workers->idle_workers;
apr_thread_mutex_unlock(workers->lock);
}
return *ptask? APR_SUCCESS : APR_EOF;
}
static void worker_done(h2_worker *worker, void *ctx)
{
h2_workers *workers = ctx;
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): done", worker->id);
H2_WORKER_REMOVE(worker);
--workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
apr_thread_mutex_unlock(workers->lock);
}
}
static apr_status_t add_worker(h2_workers *workers)
{
h2_worker *w = h2_worker_create(workers->next_worker_id++,
workers->pool, workers->thread_attr,
get_mplx_next, worker_done, workers);
if (!w) {
return APR_ENOMEM;
}
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: adding worker(%d)", w->id);
++workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
return APR_SUCCESS;
}
static apr_status_t h2_workers_start(h2_workers *workers)
{
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: starting");
while (workers->worker_count < workers->min_workers
&& status == APR_SUCCESS) {
status = add_worker(workers);
}
apr_thread_mutex_unlock(workers->lock);
}
return status;
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
int min_workers, int max_workers,
apr_size_t max_tx_handles)
{
apr_status_t status;
h2_workers *workers;
apr_pool_t *pool;
AP_DEBUG_ASSERT(s);
AP_DEBUG_ASSERT(server_pool);
/* let's have our own pool that will be parent to all h2_worker
* instances we create. This happens in various threads, but always
* guarded by our lock. Without this pool, all subpool creations would
* happen on the pool handed to us, which we do not guard.
*/
apr_pool_create(&pool, server_pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (workers) {
workers->s = s;
workers->pool = pool;
workers->min_workers = min_workers;
workers->max_workers = max_workers;
workers->max_idle_secs = 10;
workers->max_tx_handles = max_tx_handles;
workers->spare_tx_handles = workers->max_tx_handles;
apr_threadattr_create(&workers->thread_attr, workers->pool);
if (ap_thread_stacksize != 0) {
apr_threadattr_stacksize_set(workers->thread_attr,
ap_thread_stacksize);
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
"h2_workers: using stacksize=%ld",
(long)ap_thread_stacksize);
}
APR_RING_INIT(&workers->workers, h2_worker, link);
APR_RING_INIT(&workers->zombies, h2_worker, link);
APR_RING_INIT(&workers->mplxs, h2_mplx, link);
status = apr_thread_mutex_create(&workers->lock,
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
if (status == APR_SUCCESS) {
status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
}
if (status == APR_SUCCESS) {
status = apr_thread_mutex_create(&workers->tx_lock,
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
}
if (status == APR_SUCCESS) {
status = h2_workers_start(workers);
}
if (status != APR_SUCCESS) {
h2_workers_destroy(workers);
workers = NULL;
}
}
return workers;
}
void h2_workers_destroy(h2_workers *workers)
{
/* before we go, cleanup any zombie workers that may have accumulated */
cleanup_zombies(workers, 1);
if (workers->mplx_added) {
apr_thread_cond_destroy(workers->mplx_added);
workers->mplx_added = NULL;
}
if (workers->lock) {
apr_thread_mutex_destroy(workers->lock);
workers->lock = NULL;
}
while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
H2_MPLX_REMOVE(m);
}
while (!H2_WORKER_LIST_EMPTY(&workers->workers)) {
h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
H2_WORKER_REMOVE(w);
}
if (workers->pool) {
apr_pool_destroy(workers->pool);
/* workers is gone */
}
}
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
"h2_workers: register mplx(%ld), idle=%d",
m->id, workers->idle_workers);
if (in_list(workers, m)) {
status = APR_EAGAIN;
}
else {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
++workers->mplx_count;
status = APR_SUCCESS;
}
if (workers->idle_workers > 0) {
apr_thread_cond_signal(workers->mplx_added);
}
else if (status == APR_SUCCESS
&& workers->worker_count < workers->max_workers) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: got %d worker, adding 1",
workers->worker_count);
add_worker(workers);
}
apr_thread_mutex_unlock(workers->lock);
}
return status;
}
apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
{
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
status = APR_EAGAIN;
if (in_list(workers, m)) {
H2_MPLX_REMOVE(m);
status = APR_SUCCESS;
}
apr_thread_mutex_unlock(workers->lock);
}
return status;
}
void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
{
if (idle_secs <= 0) {
ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
" is not valid, ignored.", idle_secs);
return;
}
workers->max_idle_secs = idle_secs;
}
apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count)
{
apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
if (status == APR_SUCCESS) {
count = H2MIN(workers->spare_tx_handles, count);
workers->spare_tx_handles -= count;
ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
"h2_workers: reserved %d tx handles, %d/%d left",
(int)count, (int)workers->spare_tx_handles,
(int)workers->max_tx_handles);
apr_thread_mutex_unlock(workers->tx_lock);
return count;
}
return 0;
}
void h2_workers_tx_free(h2_workers *workers, apr_size_t count)
{
apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
if (status == APR_SUCCESS) {
workers->spare_tx_handles += count;
ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
"h2_workers: freed %d tx handles, %d/%d left",
(int)count, (int)workers->spare_tx_handles,
(int)workers->max_tx_handles);
apr_thread_mutex_unlock(workers->tx_lock);
}
}