| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed | |
| * with this work for additional information regarding copyright | |
| * ownership. The ASF licenses this file to you under the Apache | |
| * License, Version 2.0 (the "License"); you may not use this file | |
| * except in compliance with the License. You may obtain a copy of | |
| * the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| * implied. See the License for the specific language governing | |
| * permissions and limitations under the License. | |
| */ | |
| /* | |
| * etch_threadpool_apr.c | |
| * apache portable runtime threadpool code | |
| */ | |
| #include <assert.h> | |
| #include "etch_threadpool_apr.h" | |
| #include "apr_ring.h" | |
| #include "apr_thread_cond.h" | |
| #include "apr_portable.h" | |
| #define TASK_PRIORITY_SEGS 4 | |
| #define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) | |
| typedef struct apr_thread_pool_task | |
| { | |
| APR_RING_ENTRY(apr_thread_pool_task) link; | |
| apr_thread_start_t func; | |
| void *param; | |
| void *owner; | |
| union | |
| { | |
| apr_byte_t priority; | |
| apr_time_t time; | |
| } dispatch; | |
| } apr_thread_pool_task_t; | |
| APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); | |
| struct apr_thread_list_elt | |
| { | |
| APR_RING_ENTRY(apr_thread_list_elt) link; | |
| apr_thread_t *thd; | |
| volatile void *current_owner; | |
| volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state; | |
| }; | |
| APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); | |
| struct apr_thread_pool | |
| { | |
| apr_pool_t *pool; | |
| volatile apr_size_t thd_max; | |
| volatile apr_size_t idle_max; | |
| volatile apr_interval_time_t idle_wait; | |
| volatile apr_size_t thd_cnt; | |
| volatile apr_size_t idle_cnt; | |
| volatile apr_size_t task_cnt; | |
| volatile apr_size_t scheduled_task_cnt; | |
| volatile apr_size_t threshold; | |
| volatile apr_size_t tasks_run; | |
| volatile apr_size_t tasks_high; | |
| volatile apr_size_t thd_high; | |
| volatile apr_size_t thd_timed_out; | |
| struct apr_thread_pool_tasks *tasks; | |
| struct apr_thread_pool_tasks *scheduled_tasks; | |
| struct apr_thread_list *busy_thds; | |
| struct apr_thread_list *idle_thds; | |
| apr_thread_mutex_t *lock; | |
| apr_thread_mutex_t *cond_lock; | |
| apr_thread_cond_t *cond; | |
| volatile int terminated; | |
| struct apr_thread_pool_tasks *recycled_tasks; | |
| struct apr_thread_list *recycled_thds; | |
| apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; | |
| }; | |
| static apr_status_t thread_pool_construct(apr_thread_pool_t * me, | |
| apr_size_t init_threads, | |
| apr_size_t max_threads) | |
| { | |
| apr_status_t rv; | |
| int i; | |
| me->thd_max = max_threads; | |
| me->idle_max = init_threads; | |
| me->threshold = init_threads / 2; | |
| rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, | |
| me->pool); | |
| if (APR_SUCCESS != rv) { | |
| return rv; | |
| } | |
| rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED, | |
| me->pool); | |
| if (APR_SUCCESS != rv) { | |
| apr_thread_mutex_destroy(me->lock); | |
| return rv; | |
| } | |
| rv = apr_thread_cond_create(&me->cond, me->pool); | |
| if (APR_SUCCESS != rv) { | |
| apr_thread_mutex_destroy(me->lock); | |
| apr_thread_mutex_destroy(me->cond_lock); | |
| return rv; | |
| } | |
| me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); | |
| if (!me->tasks) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->tasks, apr_thread_pool_task, link); | |
| me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); | |
| if (!me->scheduled_tasks) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); | |
| me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); | |
| if (!me->recycled_tasks) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); | |
| me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); | |
| if (!me->busy_thds) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); | |
| me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); | |
| if (!me->idle_thds) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); | |
| me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); | |
| if (!me->recycled_thds) { | |
| goto CATCH_ENOMEM; | |
| } | |
| APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); | |
| me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; | |
| me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0; | |
| me->idle_wait = 0; | |
| me->terminated = 0; | |
| for (i = 0; i < TASK_PRIORITY_SEGS; i++) { | |
| me->task_idx[i] = NULL; | |
| } | |
| goto FINAL_EXIT; | |
| CATCH_ENOMEM: | |
| rv = APR_ENOMEM; | |
| apr_thread_mutex_destroy(me->lock); | |
| apr_thread_mutex_destroy(me->cond_lock); | |
| apr_thread_cond_destroy(me->cond); | |
| FINAL_EXIT: | |
| return rv; | |
| } | |
| /* | |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock | |
| */ | |
| static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) | |
| { | |
| apr_thread_pool_task_t *task = NULL; | |
| int seg; | |
| /* check for scheduled tasks */ | |
| if (me->scheduled_task_cnt > 0) { | |
| task = APR_RING_FIRST(me->scheduled_tasks); | |
| assert(task != NULL); | |
| assert(task != | |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, | |
| link)); | |
| /* if it's time */ | |
| if (task->dispatch.time <= apr_time_now()) { | |
| --me->scheduled_task_cnt; | |
| APR_RING_REMOVE(task, link); | |
| return task; | |
| } | |
| } | |
| /* check for normal tasks if we're not returning a scheduled task */ | |
| if (me->task_cnt == 0) { | |
| return NULL; | |
| } | |
| task = APR_RING_FIRST(me->tasks); | |
| assert(task != NULL); | |
| assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); | |
| --me->task_cnt; | |
| seg = TASK_PRIORITY_SEG(task); | |
| if (task == me->task_idx[seg]) { | |
| me->task_idx[seg] = APR_RING_NEXT(task, link); | |
| if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, | |
| apr_thread_pool_task, link) | |
| || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { | |
| me->task_idx[seg] = NULL; | |
| } | |
| } | |
| APR_RING_REMOVE(task, link); | |
| return task; | |
| } | |
| static apr_interval_time_t waiting_time(apr_thread_pool_t * me) | |
| { | |
| apr_thread_pool_task_t *task = NULL; | |
| task = APR_RING_FIRST(me->scheduled_tasks); | |
| assert(task != NULL); | |
| assert(task != | |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, | |
| link)); | |
| return task->dispatch.time - apr_time_now(); | |
| } | |
| /* | |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock | |
| */ | |
| static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, | |
| apr_thread_t * t) | |
| { | |
| struct apr_thread_list_elt *elt; | |
| if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { | |
| elt = apr_pcalloc(me->pool, sizeof(*elt)); | |
| if (NULL == elt) { | |
| return NULL; | |
| } | |
| } | |
| else { | |
| elt = APR_RING_FIRST(me->recycled_thds); | |
| APR_RING_REMOVE(elt, link); | |
| } | |
| APR_RING_ELEM_INIT(elt, link); | |
| elt->thd = t; | |
| elt->current_owner = NULL; | |
| elt->state = TH_RUN; | |
| return elt; | |
| } | |
| /* | |
| * The worker thread function. Take a task from the queue and perform it if | |
| * there is any. Otherwise, put itself into the idle thread list and waiting | |
| * for signal to wake up. | |
| * The thread terminate directly by detach and exit when it is asked to stop | |
| * after finishing a task. Otherwise, the thread should be in idle thread list | |
| * and should be joined. | |
| */ | |
| static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) | |
| { | |
| apr_status_t rv = APR_SUCCESS; | |
| apr_thread_pool_t *me = param; | |
| apr_thread_pool_task_t *task = NULL; | |
| apr_interval_time_t wait; | |
| struct apr_thread_list_elt *elt; | |
| apr_thread_mutex_lock(me->lock); | |
| elt = elt_new(me, t); | |
| if (!elt) { | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_exit(t, APR_ENOMEM); | |
| } | |
| while (!me->terminated && elt->state != TH_STOP) { | |
| /* Test if not new element, it is awakened from idle */ | |
| if (APR_RING_NEXT(elt, link) != elt) { | |
| --me->idle_cnt; | |
| APR_RING_REMOVE(elt, link); | |
| } | |
| APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); | |
| task = pop_task(me); | |
| while (NULL != task && !me->terminated) { | |
| ++me->tasks_run; | |
| elt->current_owner = task->owner; | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); | |
| task->func(t, task->param); | |
| apr_thread_mutex_lock(me->lock); | |
| APR_RING_INSERT_TAIL(me->recycled_tasks, task, | |
| apr_thread_pool_task, link); | |
| elt->current_owner = NULL; | |
| if (TH_STOP == elt->state) { | |
| break; | |
| } | |
| task = pop_task(me); | |
| } | |
| assert(NULL == elt->current_owner); | |
| if (TH_STOP != elt->state) | |
| APR_RING_REMOVE(elt, link); | |
| /* Test if a busy thread been asked to stop, which is not joinable */ | |
| if ((me->idle_cnt >= me->idle_max | |
| && !(me->scheduled_task_cnt && 0 >= me->idle_max) | |
| && !me->idle_wait) | |
| || me->terminated || elt->state != TH_RUN) { | |
| --me->thd_cnt; | |
| if ((TH_PROBATION == elt->state) && me->idle_wait) | |
| ++me->thd_timed_out; | |
| APR_RING_INSERT_TAIL(me->recycled_thds, elt, | |
| apr_thread_list_elt, link); | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_detach(t); | |
| apr_thread_exit(t, APR_SUCCESS); | |
| return NULL; /* should not be here, safe net */ | |
| } | |
| /* busy thread become idle */ | |
| ++me->idle_cnt; | |
| APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); | |
| /* | |
| * If there is a scheduled task, always scheduled to perform that task. | |
| * Since there is no guarantee that current idle threads are scheduled | |
| * for next scheduled task. | |
| */ | |
| if (me->scheduled_task_cnt) | |
| wait = waiting_time(me); | |
| else if (me->idle_cnt > me->idle_max) { | |
| wait = me->idle_wait; | |
| elt->state = TH_PROBATION; | |
| } | |
| else | |
| wait = -1; | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_mutex_lock(me->cond_lock); | |
| if (wait >= 0) { | |
| rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait); | |
| } | |
| else { | |
| rv = apr_thread_cond_wait(me->cond, me->cond_lock); | |
| } | |
| apr_thread_mutex_unlock(me->cond_lock); | |
| apr_thread_mutex_lock(me->lock); | |
| } | |
| /* idle thread been asked to stop, will be joined */ | |
| --me->thd_cnt; | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_exit(t, APR_SUCCESS); | |
| return NULL; /* should not be here, safe net */ | |
| } | |
| static apr_status_t thread_pool_cleanup(void *me) | |
| { | |
| apr_thread_pool_t *_self = me; | |
| _self->terminated = 1; | |
| etch_apr_thread_pool_idle_max_set(_self, 0); | |
| while (_self->thd_cnt) { | |
| apr_sleep(20 * 1000); /* spin lock with 20 ms */ | |
| } | |
| apr_thread_mutex_destroy(_self->lock); | |
| apr_thread_mutex_destroy(_self->cond_lock); | |
| apr_thread_cond_destroy(_self->cond); | |
| return APR_SUCCESS; | |
| } | |
| apr_status_t etch_apr_thread_pool_create(apr_thread_pool_t ** me, | |
| apr_size_t init_threads, | |
| apr_size_t max_threads, | |
| apr_pool_t * pool) | |
| { | |
| apr_thread_t *t; | |
| apr_status_t rv = APR_SUCCESS; | |
| *me = apr_pcalloc(pool, sizeof(**me)); | |
| if (!*me) { | |
| return APR_ENOMEM; | |
| } | |
| (*me)->pool = pool; | |
| rv = thread_pool_construct(*me, init_threads, max_threads); | |
| if (APR_SUCCESS != rv) { | |
| *me = NULL; | |
| return rv; | |
| } | |
| apr_pool_cleanup_register(pool, *me, thread_pool_cleanup, | |
| apr_pool_cleanup_null); | |
| while (init_threads) { | |
| rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool); | |
| if (APR_SUCCESS != rv) { | |
| break; | |
| } | |
| ++(*me)->thd_cnt; | |
| if ((*me)->thd_cnt > (*me)->thd_high) | |
| (*me)->thd_high = (*me)->thd_cnt; | |
| --init_threads; | |
| } | |
| return rv; | |
| } | |
| apr_status_t etch_apr_thread_pool_destroy(apr_thread_pool_t * me) | |
| { | |
| return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup); | |
| } | |
| /* | |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock | |
| */ | |
| static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, | |
| apr_thread_start_t func, | |
| void *param, apr_byte_t priority, | |
| void *owner, apr_time_t time) | |
| { | |
| apr_thread_pool_task_t *t; | |
| if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { | |
| t = apr_pcalloc(me->pool, sizeof(*t)); | |
| if (NULL == t) { | |
| return NULL; | |
| } | |
| } | |
| else { | |
| t = APR_RING_FIRST(me->recycled_tasks); | |
| APR_RING_REMOVE(t, link); | |
| } | |
| APR_RING_ELEM_INIT(t, link); | |
| t->func = func; | |
| t->param = param; | |
| t->owner = owner; | |
| if (time > 0) { | |
| t->dispatch.time = apr_time_now() + time; | |
| } | |
| else { | |
| t->dispatch.priority = priority; | |
| } | |
| return t; | |
| } | |
| /* | |
| * Test it the task is the only one within the priority segment. | |
| * If it is not, return the first element with same or lower priority. | |
| * Otherwise, add the task into the queue and return NULL. | |
| * | |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock | |
| */ | |
| static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, | |
| apr_thread_pool_task_t * const t) | |
| { | |
| int seg; | |
| int next; | |
| apr_thread_pool_task_t *t_next; | |
| seg = TASK_PRIORITY_SEG(t); | |
| if (me->task_idx[seg]) { | |
| assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != | |
| me->task_idx[seg]); | |
| t_next = me->task_idx[seg]; | |
| while (t_next->dispatch.priority > t->dispatch.priority) { | |
| t_next = APR_RING_NEXT(t_next, link); | |
| if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == | |
| t_next) { | |
| return t_next; | |
| } | |
| } | |
| return t_next; | |
| } | |
| for (next = seg - 1; next >= 0; next--) { | |
| if (me->task_idx[next]) { | |
| APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); | |
| break; | |
| } | |
| } | |
| if (0 > next) { | |
| APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); | |
| } | |
| me->task_idx[seg] = t; | |
| return NULL; | |
| } | |
| /* | |
| * schedule a task to run in "time" microseconds. Find the spot in the ring where | |
| * the time fits. Adjust the short_time so the thread wakes up when the time is reached. | |
| */ | |
| static apr_status_t schedule_task(apr_thread_pool_t *me, | |
| apr_thread_start_t func, void *param, | |
| void *owner, apr_interval_time_t time) | |
| { | |
| apr_thread_pool_task_t *t; | |
| apr_thread_pool_task_t *t_loc; | |
| apr_thread_t *thd; | |
| apr_status_t rv = APR_SUCCESS; | |
| apr_thread_mutex_lock(me->lock); | |
| t = task_new(me, func, param, 0, owner, time); | |
| if (NULL == t) { | |
| apr_thread_mutex_unlock(me->lock); | |
| return APR_ENOMEM; | |
| } | |
| t_loc = APR_RING_FIRST(me->scheduled_tasks); | |
| while (NULL != t_loc) { | |
| /* if the time is less than the entry insert ahead of it */ | |
| if (t->dispatch.time < t_loc->dispatch.time) { | |
| ++me->scheduled_task_cnt; | |
| APR_RING_INSERT_BEFORE(t_loc, t, link); | |
| break; | |
| } | |
| else { | |
| t_loc = APR_RING_NEXT(t_loc, link); | |
| if (t_loc == | |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, | |
| link)) { | |
| ++me->scheduled_task_cnt; | |
| APR_RING_INSERT_TAIL(me->scheduled_tasks, t, | |
| apr_thread_pool_task, link); | |
| break; | |
| } | |
| } | |
| } | |
| /* there should be at least one thread for scheduled tasks */ | |
| if (0 == me->thd_cnt) { | |
| rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); | |
| if (APR_SUCCESS == rv) { | |
| ++me->thd_cnt; | |
| if (me->thd_cnt > me->thd_high) | |
| me->thd_high = me->thd_cnt; | |
| } | |
| } | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_mutex_lock(me->cond_lock); | |
| apr_thread_cond_signal(me->cond); | |
| apr_thread_mutex_unlock(me->cond_lock); | |
| return rv; | |
| } | |
| static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, | |
| void *param, apr_byte_t priority, int push, | |
| void *owner) | |
| { | |
| apr_thread_pool_task_t *t; | |
| apr_thread_pool_task_t *t_loc; | |
| apr_thread_t *thd; | |
| apr_status_t rv = APR_SUCCESS; | |
| apr_thread_mutex_lock(me->lock); | |
| t = task_new(me, func, param, priority, owner, 0); | |
| if (NULL == t) { | |
| apr_thread_mutex_unlock(me->lock); | |
| return APR_ENOMEM; | |
| } | |
| t_loc = add_if_empty(me, t); | |
| if (NULL == t_loc) { | |
| goto FINAL_EXIT; | |
| } | |
| if (push) { | |
| while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != | |
| t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { | |
| t_loc = APR_RING_NEXT(t_loc, link); | |
| } | |
| } | |
| APR_RING_INSERT_BEFORE(t_loc, t, link); | |
| if (!push) { | |
| if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { | |
| me->task_idx[TASK_PRIORITY_SEG(t)] = t; | |
| } | |
| } | |
| FINAL_EXIT: | |
| me->task_cnt++; | |
| if (me->task_cnt > me->tasks_high) | |
| me->tasks_high = me->task_cnt; | |
| if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && | |
| me->task_cnt > me->threshold)) { | |
| rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); | |
| if (APR_SUCCESS == rv) { | |
| ++me->thd_cnt; | |
| if (me->thd_cnt > me->thd_high) | |
| me->thd_high = me->thd_cnt; | |
| } | |
| } | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_thread_mutex_lock(me->cond_lock); | |
| apr_thread_cond_signal(me->cond); | |
| apr_thread_mutex_unlock(me->cond_lock); | |
| return rv; | |
| } | |
| apr_status_t etch_apr_thread_pool_push(apr_thread_pool_t *me, | |
| apr_thread_start_t func, | |
| void *param, | |
| apr_byte_t priority, | |
| void *owner) | |
| { | |
| return add_task(me, func, param, priority, 1, owner); | |
| } | |
| apr_status_t etch_apr_thread_pool_schedule(apr_thread_pool_t *me, | |
| apr_thread_start_t func, | |
| void *param, | |
| apr_interval_time_t time, | |
| void *owner) | |
| { | |
| return schedule_task(me, func, param, owner, time); | |
| } | |
| apr_status_t etch_apr_thread_pool_top(apr_thread_pool_t *me, | |
| apr_thread_start_t func, | |
| void *param, | |
| apr_byte_t priority, | |
| void *owner) | |
| { | |
| return add_task(me, func, param, priority, 0, owner); | |
| } | |
| static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, | |
| void *owner) | |
| { | |
| apr_thread_pool_task_t *t_loc; | |
| apr_thread_pool_task_t *next; | |
| t_loc = APR_RING_FIRST(me->scheduled_tasks); | |
| while (t_loc != | |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, | |
| link)) { | |
| next = APR_RING_NEXT(t_loc, link); | |
| /* if this is the owner remove it */ | |
| if (t_loc->owner == owner) { | |
| --me->scheduled_task_cnt; | |
| APR_RING_REMOVE(t_loc, link); | |
| } | |
| t_loc = next; | |
| } | |
| return APR_SUCCESS; | |
| } | |
| static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) | |
| { | |
| apr_thread_pool_task_t *t_loc; | |
| apr_thread_pool_task_t *next; | |
| int seg; | |
| t_loc = APR_RING_FIRST(me->tasks); | |
| while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { | |
| next = APR_RING_NEXT(t_loc, link); | |
| if (t_loc->owner == owner) { | |
| --me->task_cnt; | |
| seg = TASK_PRIORITY_SEG(t_loc); | |
| if (t_loc == me->task_idx[seg]) { | |
| me->task_idx[seg] = APR_RING_NEXT(t_loc, link); | |
| if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, | |
| apr_thread_pool_task, | |
| link) | |
| || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { | |
| me->task_idx[seg] = NULL; | |
| } | |
| } | |
| APR_RING_REMOVE(t_loc, link); | |
| } | |
| t_loc = next; | |
| } | |
| return APR_SUCCESS; | |
| } | |
| static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) | |
| { | |
| #ifndef NDEBUG | |
| apr_os_thread_t *os_thread; | |
| #endif | |
| struct apr_thread_list_elt *elt; | |
| apr_thread_mutex_lock(me->lock); | |
| elt = APR_RING_FIRST(me->busy_thds); | |
| while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { | |
| if (elt->current_owner != owner) { | |
| elt = APR_RING_NEXT(elt, link); | |
| continue; | |
| } | |
| #ifndef NDEBUG | |
| /* make sure the thread is not the one calling tasks_cancel */ | |
| apr_os_thread_get(&os_thread, elt->thd); | |
| #ifdef WIN32 | |
| /* hack for apr win32 bug */ | |
| assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); | |
| #else | |
| assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); | |
| #endif | |
| #endif | |
| while (elt->current_owner == owner) { | |
| apr_thread_mutex_unlock(me->lock); | |
| apr_sleep(200 * 1000); | |
| apr_thread_mutex_lock(me->lock); | |
| } | |
| elt = APR_RING_FIRST(me->busy_thds); | |
| } | |
| apr_thread_mutex_unlock(me->lock); | |
| return; | |
| } | |
| apr_status_t etch_apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, | |
| void *owner) | |
| { | |
| apr_status_t rv = APR_SUCCESS; | |
| apr_thread_mutex_lock(me->lock); | |
| if (me->task_cnt > 0) { | |
| rv = remove_tasks(me, owner); | |
| } | |
| if (me->scheduled_task_cnt > 0) { | |
| rv = remove_scheduled_tasks(me, owner); | |
| } | |
| apr_thread_mutex_unlock(me->lock); | |
| wait_on_busy_threads(me, owner); | |
| return rv; | |
| } | |
| apr_size_t etch_apr_thread_pool_tasks_count(apr_thread_pool_t *me) | |
| { | |
| return me->task_cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) | |
| { | |
| return me->scheduled_task_cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_threads_count(apr_thread_pool_t *me) | |
| { | |
| return me->thd_cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_busy_count(apr_thread_pool_t *me) | |
| { | |
| return me->thd_cnt - me->idle_cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_idle_count(apr_thread_pool_t *me) | |
| { | |
| return me->idle_cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) | |
| { | |
| return me->tasks_run; | |
| } | |
| apr_size_t etch_apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) | |
| { | |
| return me->tasks_high; | |
| } | |
| apr_size_t etch_apr_thread_pool_threads_high_count(apr_thread_pool_t * me) | |
| { | |
| return me->thd_high; | |
| } | |
| apr_size_t etch_apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) | |
| { | |
| return me->thd_timed_out; | |
| } | |
| apr_size_t etch_apr_thread_pool_idle_max_get(apr_thread_pool_t *me) | |
| { | |
| return me->idle_max; | |
| } | |
| apr_interval_time_t etch_apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) | |
| { | |
| return me->idle_wait; | |
| } | |
| /* | |
| * This function stop extra idle threads to the cnt. | |
| * @return the number of threads stopped | |
| * NOTE: There could be busy threads become idle during this function | |
| */ | |
| static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, | |
| apr_size_t *cnt, int idle) | |
| { | |
| struct apr_thread_list *thds; | |
| apr_size_t n, n_dbg, i; | |
| struct apr_thread_list_elt *head, *tail, *elt; | |
| apr_thread_mutex_lock(me->lock); | |
| if (idle) { | |
| thds = me->idle_thds; | |
| n = me->idle_cnt; | |
| } | |
| else { | |
| thds = me->busy_thds; | |
| n = me->thd_cnt - me->idle_cnt; | |
| } | |
| if (n <= *cnt) { | |
| apr_thread_mutex_unlock(me->lock); | |
| *cnt = 0; | |
| return NULL; | |
| } | |
| n -= *cnt; | |
| head = APR_RING_FIRST(thds); | |
| for (i = 0; i < *cnt; i++) { | |
| head = APR_RING_NEXT(head, link); | |
| } | |
| tail = APR_RING_LAST(thds); | |
| if (idle) { | |
| APR_RING_UNSPLICE(head, tail, link); | |
| me->idle_cnt = *cnt; | |
| } | |
| n_dbg = 0; | |
| for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { | |
| elt->state = TH_STOP; | |
| n_dbg++; | |
| } | |
| elt->state = TH_STOP; | |
| n_dbg++; | |
| assert(n == n_dbg); | |
| *cnt = n; | |
| apr_thread_mutex_unlock(me->lock); | |
| APR_RING_PREV(head, link) = NULL; | |
| APR_RING_NEXT(tail, link) = NULL; | |
| return head; | |
| } | |
| static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) | |
| { | |
| apr_size_t n_dbg; | |
| struct apr_thread_list_elt *elt, *head, *tail; | |
| apr_status_t rv; | |
| elt = trim_threads(me, &cnt, 1); | |
| apr_thread_mutex_lock(me->cond_lock); | |
| apr_thread_cond_broadcast(me->cond); | |
| apr_thread_mutex_unlock(me->cond_lock); | |
| n_dbg = 0; | |
| if (NULL != (head = elt)) { | |
| while (elt) { | |
| tail = elt; | |
| apr_thread_join(&rv, elt->thd); | |
| elt = APR_RING_NEXT(elt, link); | |
| ++n_dbg; | |
| } | |
| apr_thread_mutex_lock(me->lock); | |
| APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, | |
| apr_thread_list_elt, link); | |
| apr_thread_mutex_unlock(me->lock); | |
| } | |
| assert(cnt == n_dbg); | |
| return cnt; | |
| } | |
| /* don't join on busy threads for performance reasons, who knows how long will | |
| * the task takes to perform | |
| */ | |
| static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) | |
| { | |
| trim_threads(me, &cnt, 0); | |
| return cnt; | |
| } | |
| apr_size_t etch_apr_thread_pool_idle_max_set(apr_thread_pool_t *me, | |
| apr_size_t cnt) | |
| { | |
| me->idle_max = cnt; | |
| cnt = trim_idle_threads(me, cnt); | |
| return cnt; | |
| } | |
| apr_interval_time_t etch_apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, | |
| apr_interval_time_t timeout) | |
| { | |
| apr_interval_time_t oldtime; | |
| oldtime = me->idle_wait; | |
| me->idle_wait = timeout; | |
| return oldtime; | |
| } | |
| apr_size_t etch_apr_thread_pool_thread_max_get(apr_thread_pool_t *me) | |
| { | |
| return me->thd_max; | |
| } | |
| /* | |
| * This function stop extra working threads to the new limit. | |
| * NOTE: There could be busy threads become idle during this function | |
| */ | |
| apr_size_t etch_apr_thread_pool_thread_max_set(apr_thread_pool_t *me, | |
| apr_size_t cnt) | |
| { | |
| unsigned int n; | |
| me->thd_max = cnt; | |
| if (0 == cnt || me->thd_cnt <= cnt) { | |
| return 0; | |
| } | |
| n = (unsigned) me->thd_cnt - cnt; | |
| if (n >= me->idle_cnt) { | |
| trim_busy_threads(me, n - me->idle_cnt); | |
| trim_idle_threads(me, 0); | |
| } | |
| else { | |
| trim_idle_threads(me, me->idle_cnt - n); | |
| } | |
| return n; | |
| } | |
| apr_size_t etch_apr_thread_pool_threshold_get(apr_thread_pool_t *me) | |
| { | |
| return me->threshold; | |
| } | |
| apr_size_t etch_apr_thread_pool_threshold_set(apr_thread_pool_t *me, | |
| apr_size_t val) | |
| { | |
| apr_size_t ov; | |
| ov = me->threshold; | |
| me->threshold = val; | |
| return ov; | |
| } | |
| apr_status_t etch_apr_thread_pool_task_owner_get(apr_thread_t *thd, | |
| void **owner) | |
| { | |
| apr_status_t rv; | |
| apr_thread_pool_task_t *task; | |
| void *data; | |
| rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); | |
| if (rv != APR_SUCCESS) { | |
| return rv; | |
| } | |
| task = data; | |
| if (!task) { | |
| *owner = NULL; | |
| return APR_BADARG; | |
| } | |
| *owner = task->owner; | |
| return APR_SUCCESS; | |
| } | |
| /* vim: set ts=4 sw=4 et cin tw=80: */ |