| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed |
| * with this work for additional information regarding copyright |
| * ownership. The ASF licenses this file to you under the Apache |
| * License, Version 2.0 (the "License"); you may not use this file |
| * except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| * implied. See the License for the specific language governing |
| * permissions and limitations under the License. |
| */ |
| |
| #include <assert.h> |
| #include "apr_thread_pool.h" |
| #include "apr_ring.h" |
| #include "apr_thread_cond.h" |
| #include "apr_portable.h" |
| |
| #if APR_HAS_THREADS |
| |
| #define TASK_PRIORITY_SEGS 4 |
| #define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) |
| |
| typedef struct apr_thread_pool_task |
| { |
| APR_RING_ENTRY(apr_thread_pool_task) link; |
| apr_thread_start_t func; |
| void *param; |
| void *owner; |
| union |
| { |
| apr_byte_t priority; |
| apr_time_t time; |
| } dispatch; |
| } apr_thread_pool_task_t; |
| |
| APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); |
| |
| struct apr_thread_list_elt |
| { |
| APR_RING_ENTRY(apr_thread_list_elt) link; |
| apr_thread_t *thd; |
| void *current_owner; |
| enum { TH_RUN, TH_STOP, TH_PROBATION } state; |
| int signal_work_done; |
| }; |
| |
| APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); |
| |
| struct apr_thread_pool |
| { |
| apr_pool_t *pool; |
| volatile apr_size_t thd_max; |
| volatile apr_size_t idle_max; |
| volatile apr_interval_time_t idle_wait; |
| volatile apr_size_t thd_cnt; |
| volatile apr_size_t idle_cnt; |
| volatile apr_size_t busy_cnt; |
| volatile apr_size_t task_cnt; |
| volatile apr_size_t scheduled_task_cnt; |
| volatile apr_size_t threshold; |
| volatile apr_size_t tasks_run; |
| volatile apr_size_t tasks_high; |
| volatile apr_size_t thd_high; |
| volatile apr_size_t thd_timed_out; |
| struct apr_thread_pool_tasks *tasks; |
| struct apr_thread_pool_tasks *scheduled_tasks; |
| struct apr_thread_list *busy_thds; |
| struct apr_thread_list *idle_thds; |
| struct apr_thread_list *dead_thds; |
| apr_thread_cond_t *more_work; |
| apr_thread_cond_t *work_done; |
| apr_thread_cond_t *all_done; |
| apr_thread_mutex_t *lock; |
| volatile int terminated; |
| struct apr_thread_pool_tasks *recycled_tasks; |
| struct apr_thread_list *recycled_thds; |
| apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; |
| }; |
| |
| static apr_status_t thread_pool_construct(apr_thread_pool_t **tp, |
| apr_size_t init_threads, |
| apr_size_t max_threads, |
| apr_pool_t *pool) |
| { |
| apr_status_t rv; |
| apr_thread_pool_t *me; |
| |
| me = *tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); |
| me->thd_max = max_threads; |
| me->idle_max = init_threads; |
| me->threshold = init_threads / 2; |
| |
| /* This pool will be used by different threads. As we cannot ensure that |
| * our caller won't use the pool without acquiring the mutex, we must |
| * create a new sub pool. |
| */ |
| rv = apr_pool_create(&me->pool, pool); |
| if (APR_SUCCESS != rv) { |
| return rv; |
| } |
| /* Create the mutex on the parent pool such that it's always alive from |
| * apr_thread_pool_{push,schedule,top}() callers. |
| */ |
| rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, pool); |
| if (APR_SUCCESS != rv) { |
| return rv; |
| } |
| rv = apr_thread_cond_create(&me->more_work, me->pool); |
| if (APR_SUCCESS != rv) { |
| apr_thread_mutex_destroy(me->lock); |
| return rv; |
| } |
| rv = apr_thread_cond_create(&me->work_done, me->pool); |
| if (APR_SUCCESS != rv) { |
| apr_thread_cond_destroy(me->more_work); |
| apr_thread_mutex_destroy(me->lock); |
| return rv; |
| } |
| rv = apr_thread_cond_create(&me->all_done, me->pool); |
| if (APR_SUCCESS != rv) { |
| apr_thread_cond_destroy(me->work_done); |
| apr_thread_cond_destroy(me->more_work); |
| apr_thread_mutex_destroy(me->lock); |
| return rv; |
| } |
| me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); |
| if (!me->tasks) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->tasks, apr_thread_pool_task, link); |
| me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); |
| if (!me->scheduled_tasks) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); |
| me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); |
| if (!me->recycled_tasks) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); |
| me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); |
| if (!me->busy_thds) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); |
| me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); |
| if (!me->idle_thds) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); |
| me->dead_thds = apr_palloc(me->pool, sizeof(*me->dead_thds)); |
| if (!me->dead_thds) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->dead_thds, apr_thread_list_elt, link); |
| me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); |
| if (!me->recycled_thds) { |
| goto CATCH_ENOMEM; |
| } |
| APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); |
| goto FINAL_EXIT; |
| CATCH_ENOMEM: |
| rv = APR_ENOMEM; |
| apr_thread_cond_destroy(me->all_done); |
| apr_thread_cond_destroy(me->work_done); |
| apr_thread_cond_destroy(me->more_work); |
| apr_thread_mutex_destroy(me->lock); |
| FINAL_EXIT: |
| return rv; |
| } |
| |
| /* |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock |
| */ |
| static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) |
| { |
| apr_thread_pool_task_t *task = NULL; |
| int seg; |
| |
| /* check for scheduled tasks */ |
| if (me->scheduled_task_cnt > 0) { |
| task = APR_RING_FIRST(me->scheduled_tasks); |
| assert(task != NULL); |
| assert(task != |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, |
| link)); |
| /* if it's time */ |
| if (task->dispatch.time <= apr_time_now()) { |
| --me->scheduled_task_cnt; |
| APR_RING_REMOVE(task, link); |
| return task; |
| } |
| } |
| /* check for normal tasks if we're not returning a scheduled task */ |
| if (me->task_cnt == 0) { |
| return NULL; |
| } |
| |
| task = APR_RING_FIRST(me->tasks); |
| assert(task != NULL); |
| assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); |
| --me->task_cnt; |
| seg = TASK_PRIORITY_SEG(task); |
| if (task == me->task_idx[seg]) { |
| me->task_idx[seg] = APR_RING_NEXT(task, link); |
| if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, |
| apr_thread_pool_task, link) |
| || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { |
| me->task_idx[seg] = NULL; |
| } |
| } |
| APR_RING_REMOVE(task, link); |
| return task; |
| } |
| |
| static apr_interval_time_t waiting_time(apr_thread_pool_t * me) |
| { |
| apr_thread_pool_task_t *task = NULL; |
| |
| task = APR_RING_FIRST(me->scheduled_tasks); |
| assert(task != NULL); |
| assert(task != |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, |
| link)); |
| return task->dispatch.time - apr_time_now(); |
| } |
| |
| /* |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock |
| */ |
| static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, |
| apr_thread_t * t) |
| { |
| struct apr_thread_list_elt *elt; |
| |
| if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { |
| elt = apr_palloc(me->pool, sizeof(*elt)); |
| if (NULL == elt) { |
| return NULL; |
| } |
| } |
| else { |
| elt = APR_RING_FIRST(me->recycled_thds); |
| APR_RING_REMOVE(elt, link); |
| } |
| |
| APR_RING_ELEM_INIT(elt, link); |
| elt->thd = t; |
| elt->current_owner = NULL; |
| elt->signal_work_done = 0; |
| elt->state = TH_RUN; |
| return elt; |
| } |
| |
| /* |
| * The worker thread function. Take a task from the queue and perform it if |
| * there is any. Otherwise, put itself into the idle thread list and waiting |
| * for signal to wake up. |
| * The thread terminates directly and exits when it is asked to stop, after |
| * handling its task if busy. The thread will then be in the dead_thds list |
| * and should be joined. |
| */ |
| static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) |
| { |
| apr_thread_pool_t *me = param; |
| apr_thread_pool_task_t *task = NULL; |
| apr_interval_time_t wait; |
| struct apr_thread_list_elt *elt; |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| elt = elt_new(me, t); |
| if (!elt) { |
| apr_thread_mutex_unlock(me->lock); |
| apr_thread_exit(t, APR_ENOMEM); |
| } |
| |
| for (;;) { |
| /* Test if not new element, it is awakened from idle */ |
| if (APR_RING_NEXT(elt, link) != elt) { |
| --me->idle_cnt; |
| APR_RING_REMOVE(elt, link); |
| } |
| |
| if (elt->state != TH_STOP) { |
| ++me->busy_cnt; |
| APR_RING_INSERT_TAIL(me->busy_thds, elt, |
| apr_thread_list_elt, link); |
| do { |
| task = pop_task(me); |
| if (!task) { |
| break; |
| } |
| ++me->tasks_run; |
| elt->current_owner = task->owner; |
| apr_thread_mutex_unlock(me->lock); |
| |
| /* Run the task (or drop it if terminated already) */ |
| if (!me->terminated) { |
| apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); |
| task->func(t, task->param); |
| } |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| APR_RING_INSERT_TAIL(me->recycled_tasks, task, |
| apr_thread_pool_task, link); |
| elt->current_owner = NULL; |
| if (elt->signal_work_done) { |
| elt->signal_work_done = 0; |
| apr_thread_cond_signal(me->work_done); |
| } |
| } while (elt->state != TH_STOP); |
| APR_RING_REMOVE(elt, link); |
| --me->busy_cnt; |
| } |
| assert(NULL == elt->current_owner); |
| |
| /* thread should die? */ |
| if (me->terminated |
| || elt->state != TH_RUN |
| || (me->idle_cnt >= me->idle_max |
| && (me->idle_max || !me->scheduled_task_cnt) |
| && !me->idle_wait)) { |
| if ((TH_PROBATION == elt->state) && me->idle_wait) |
| ++me->thd_timed_out; |
| break; |
| } |
| |
| /* busy thread become idle */ |
| ++me->idle_cnt; |
| APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); |
| |
| /* |
| * If there is a scheduled task, always scheduled to perform that task. |
| * Since there is no guarantee that current idle threads are scheduled |
| * for next scheduled task. |
| */ |
| if (me->scheduled_task_cnt) |
| wait = waiting_time(me); |
| else if (me->idle_cnt > me->idle_max) { |
| wait = me->idle_wait; |
| elt->state = TH_PROBATION; |
| } |
| else |
| wait = -1; |
| |
| if (wait >= 0) { |
| apr_thread_cond_timedwait(me->more_work, me->lock, wait); |
| } |
| else { |
| apr_thread_cond_wait(me->more_work, me->lock); |
| } |
| apr_pool_owner_set(me->pool, 0); |
| } |
| |
| /* Dead thread, to be joined */ |
| APR_RING_INSERT_TAIL(me->dead_thds, elt, apr_thread_list_elt, link); |
| if (--me->thd_cnt == 0 && me->terminated) { |
| apr_thread_cond_signal(me->all_done); |
| } |
| apr_thread_mutex_unlock(me->lock); |
| |
| apr_thread_exit(t, APR_SUCCESS); |
| return NULL; /* should not be here, safe net */ |
| } |
| |
| /* Must be locked by the caller */ |
| static void join_dead_threads(apr_thread_pool_t *me) |
| { |
| while (!APR_RING_EMPTY(me->dead_thds, apr_thread_list_elt, link)) { |
| struct apr_thread_list_elt *elt; |
| apr_status_t status; |
| |
| elt = APR_RING_FIRST(me->dead_thds); |
| APR_RING_REMOVE(elt, link); |
| apr_thread_mutex_unlock(me->lock); |
| |
| apr_thread_join(&status, elt->thd); |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| APR_RING_INSERT_TAIL(me->recycled_thds, elt, |
| apr_thread_list_elt, link); |
| } |
| } |
| |
| static apr_status_t thread_pool_cleanup(void *me) |
| { |
| apr_thread_pool_t *_myself = me; |
| |
| _myself->terminated = 1; |
| apr_thread_pool_tasks_cancel(_myself, NULL); |
| apr_thread_pool_thread_max_set(_myself, 0); |
| apr_thread_mutex_lock(_myself->lock); |
| apr_pool_owner_set(_myself->pool, 0); |
| |
| if (_myself->thd_cnt) { |
| apr_thread_cond_wait(_myself->all_done, _myself->lock); |
| apr_pool_owner_set(_myself->pool, 0); |
| } |
| |
| /* All threads should be dead now, join them */ |
| join_dead_threads(_myself); |
| |
| apr_thread_mutex_unlock(_myself->lock); |
| |
| return APR_SUCCESS; |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, |
| apr_size_t init_threads, |
| apr_size_t max_threads, |
| apr_pool_t * pool) |
| { |
| apr_thread_t *t; |
| apr_status_t rv = APR_SUCCESS; |
| apr_thread_pool_t *tp; |
| |
| *me = NULL; |
| |
| rv = thread_pool_construct(&tp, init_threads, max_threads, pool); |
| if (APR_SUCCESS != rv) |
| return rv; |
| apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup); |
| |
| /* Grab the mutex as apr_thread_create() and thread_pool_func() will |
| * allocate from (*me)->pool. This is dangerous if there are multiple |
| * initial threads to create. |
| */ |
| apr_thread_mutex_lock(tp->lock); |
| apr_pool_owner_set(tp->pool, 0); |
| while (init_threads--) { |
| rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); |
| if (APR_SUCCESS != rv) { |
| break; |
| } |
| tp->thd_cnt++; |
| if (tp->thd_cnt > tp->thd_high) { |
| tp->thd_high = tp->thd_cnt; |
| } |
| } |
| apr_thread_mutex_unlock(tp->lock); |
| |
| if (rv == APR_SUCCESS) { |
| *me = tp; |
| } |
| |
| return rv; |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) |
| { |
| apr_pool_destroy(me->pool); |
| return APR_SUCCESS; |
| } |
| |
| /* |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock |
| */ |
| static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, |
| apr_thread_start_t func, |
| void *param, apr_byte_t priority, |
| void *owner, apr_time_t time) |
| { |
| apr_thread_pool_task_t *t; |
| |
| if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { |
| t = apr_palloc(me->pool, sizeof(*t)); |
| if (NULL == t) { |
| return NULL; |
| } |
| } |
| else { |
| t = APR_RING_FIRST(me->recycled_tasks); |
| APR_RING_REMOVE(t, link); |
| } |
| APR_RING_ELEM_INIT(t, link); |
| |
| t->func = func; |
| t->param = param; |
| t->owner = owner; |
| if (time > 0) { |
| t->dispatch.time = apr_time_now() + time; |
| } |
| else { |
| t->dispatch.priority = priority; |
| } |
| return t; |
| } |
| |
| /* |
| * Test it the task is the only one within the priority segment. |
| * If it is not, return the first element with same or lower priority. |
| * Otherwise, add the task into the queue and return NULL. |
| * |
| * NOTE: This function is not thread safe by itself. Caller should hold the lock |
| */ |
| static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, |
| apr_thread_pool_task_t * const t) |
| { |
| int seg; |
| int next; |
| apr_thread_pool_task_t *t_next; |
| |
| seg = TASK_PRIORITY_SEG(t); |
| if (me->task_idx[seg]) { |
| assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != |
| me->task_idx[seg]); |
| t_next = me->task_idx[seg]; |
| while (t_next->dispatch.priority > t->dispatch.priority) { |
| t_next = APR_RING_NEXT(t_next, link); |
| if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == |
| t_next) { |
| return t_next; |
| } |
| } |
| return t_next; |
| } |
| |
| for (next = seg - 1; next >= 0; next--) { |
| if (me->task_idx[next]) { |
| APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); |
| break; |
| } |
| } |
| if (0 > next) { |
| APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); |
| } |
| me->task_idx[seg] = t; |
| return NULL; |
| } |
| |
| /* |
| * schedule a task to run in "time" microseconds. Find the spot in the ring where |
| * the time fits. Adjust the short_time so the thread wakes up when the time is reached. |
| */ |
| static apr_status_t schedule_task(apr_thread_pool_t *me, |
| apr_thread_start_t func, void *param, |
| void *owner, apr_interval_time_t time) |
| { |
| apr_thread_pool_task_t *t; |
| apr_thread_pool_task_t *t_loc; |
| apr_thread_t *thd; |
| apr_status_t rv = APR_SUCCESS; |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| if (me->terminated) { |
| /* Let the caller know that we are done */ |
| apr_thread_mutex_unlock(me->lock); |
| return APR_NOTFOUND; |
| } |
| |
| /* Maintain dead threads */ |
| join_dead_threads(me); |
| |
| t = task_new(me, func, param, 0, owner, time); |
| if (NULL == t) { |
| apr_thread_mutex_unlock(me->lock); |
| return APR_ENOMEM; |
| } |
| t_loc = APR_RING_FIRST(me->scheduled_tasks); |
| while (NULL != t_loc) { |
| /* if the time is less than the entry insert ahead of it */ |
| if (t->dispatch.time < t_loc->dispatch.time) { |
| ++me->scheduled_task_cnt; |
| APR_RING_INSERT_BEFORE(t_loc, t, link); |
| break; |
| } |
| else { |
| t_loc = APR_RING_NEXT(t_loc, link); |
| if (t_loc == |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, |
| link)) { |
| ++me->scheduled_task_cnt; |
| APR_RING_INSERT_TAIL(me->scheduled_tasks, t, |
| apr_thread_pool_task, link); |
| break; |
| } |
| } |
| } |
| /* there should be at least one thread for scheduled tasks */ |
| if (0 == me->thd_cnt) { |
| rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); |
| if (APR_SUCCESS == rv) { |
| ++me->thd_cnt; |
| if (me->thd_cnt > me->thd_high) |
| me->thd_high = me->thd_cnt; |
| } |
| } |
| apr_thread_cond_signal(me->more_work); |
| apr_thread_mutex_unlock(me->lock); |
| |
| return rv; |
| } |
| |
| static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, |
| void *param, apr_byte_t priority, int push, |
| void *owner) |
| { |
| apr_thread_pool_task_t *t; |
| apr_thread_pool_task_t *t_loc; |
| apr_thread_t *thd; |
| apr_status_t rv = APR_SUCCESS; |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| if (me->terminated) { |
| /* Let the caller know that we are done */ |
| apr_thread_mutex_unlock(me->lock); |
| return APR_NOTFOUND; |
| } |
| |
| /* Maintain dead threads */ |
| join_dead_threads(me); |
| |
| t = task_new(me, func, param, priority, owner, 0); |
| if (NULL == t) { |
| apr_thread_mutex_unlock(me->lock); |
| return APR_ENOMEM; |
| } |
| |
| t_loc = add_if_empty(me, t); |
| if (NULL == t_loc) { |
| goto FINAL_EXIT; |
| } |
| |
| if (push) { |
| while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != |
| t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { |
| t_loc = APR_RING_NEXT(t_loc, link); |
| } |
| } |
| APR_RING_INSERT_BEFORE(t_loc, t, link); |
| if (!push) { |
| if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { |
| me->task_idx[TASK_PRIORITY_SEG(t)] = t; |
| } |
| } |
| |
| FINAL_EXIT: |
| me->task_cnt++; |
| if (me->task_cnt > me->tasks_high) |
| me->tasks_high = me->task_cnt; |
| if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && |
| me->task_cnt > me->threshold)) { |
| rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); |
| if (APR_SUCCESS == rv) { |
| ++me->thd_cnt; |
| if (me->thd_cnt > me->thd_high) |
| me->thd_high = me->thd_cnt; |
| } |
| } |
| |
| apr_thread_cond_signal(me->more_work); |
| apr_thread_mutex_unlock(me->lock); |
| |
| return rv; |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, |
| apr_thread_start_t func, |
| void *param, |
| apr_byte_t priority, |
| void *owner) |
| { |
| return add_task(me, func, param, priority, 1, owner); |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, |
| apr_thread_start_t func, |
| void *param, |
| apr_interval_time_t time, |
| void *owner) |
| { |
| return schedule_task(me, func, param, owner, time); |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, |
| apr_thread_start_t func, |
| void *param, |
| apr_byte_t priority, |
| void *owner) |
| { |
| return add_task(me, func, param, priority, 0, owner); |
| } |
| |
| static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, |
| void *owner) |
| { |
| apr_thread_pool_task_t *t_loc; |
| apr_thread_pool_task_t *next; |
| |
| t_loc = APR_RING_FIRST(me->scheduled_tasks); |
| while (t_loc != |
| APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, |
| link)) { |
| next = APR_RING_NEXT(t_loc, link); |
| /* if this is the owner remove it */ |
| if (!owner || t_loc->owner == owner) { |
| --me->scheduled_task_cnt; |
| APR_RING_REMOVE(t_loc, link); |
| } |
| t_loc = next; |
| } |
| return APR_SUCCESS; |
| } |
| |
| static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) |
| { |
| apr_thread_pool_task_t *t_loc; |
| apr_thread_pool_task_t *next; |
| int seg; |
| |
| t_loc = APR_RING_FIRST(me->tasks); |
| while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { |
| next = APR_RING_NEXT(t_loc, link); |
| if (!owner || t_loc->owner == owner) { |
| --me->task_cnt; |
| seg = TASK_PRIORITY_SEG(t_loc); |
| if (t_loc == me->task_idx[seg]) { |
| me->task_idx[seg] = APR_RING_NEXT(t_loc, link); |
| if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, |
| apr_thread_pool_task, |
| link) |
| || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { |
| me->task_idx[seg] = NULL; |
| } |
| } |
| APR_RING_REMOVE(t_loc, link); |
| } |
| t_loc = next; |
| } |
| return APR_SUCCESS; |
| } |
| |
| /* Must be locked by the caller */ |
| static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) |
| { |
| #ifndef NDEBUG |
| apr_os_thread_t *os_thread; |
| #endif |
| struct apr_thread_list_elt *elt; |
| |
| elt = APR_RING_FIRST(me->busy_thds); |
| while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { |
| if (owner ? owner != elt->current_owner : !elt->current_owner) { |
| elt = APR_RING_NEXT(elt, link); |
| continue; |
| } |
| |
| #ifndef NDEBUG |
| /* make sure the thread is not the one calling tasks_cancel */ |
| apr_os_thread_get(&os_thread, elt->thd); |
| #ifdef WIN32 |
| /* hack for apr win32 bug */ |
| assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); |
| #else |
| assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); |
| #endif |
| #endif |
| |
| elt->signal_work_done = 1; |
| apr_thread_cond_wait(me->work_done, me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| /* Restart */ |
| elt = APR_RING_FIRST(me->busy_thds); |
| } |
| |
| /* Maintain dead threads */ |
| join_dead_threads(me); |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, |
| void *owner) |
| { |
| apr_status_t rv = APR_SUCCESS; |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| if (me->task_cnt > 0) { |
| rv = remove_tasks(me, owner); |
| } |
| if (me->scheduled_task_cnt > 0) { |
| rv = remove_scheduled_tasks(me, owner); |
| } |
| |
| wait_on_busy_threads(me, owner); |
| |
| apr_thread_mutex_unlock(me->lock); |
| |
| return rv; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) |
| { |
| return me->task_cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) |
| apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) |
| { |
| return me->scheduled_task_cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) |
| { |
| return me->thd_cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) |
| { |
| return me->busy_cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) |
| { |
| return me->idle_cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) |
| apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) |
| { |
| return me->tasks_run; |
| } |
| |
| APR_DECLARE(apr_size_t) |
| apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) |
| { |
| return me->tasks_high; |
| } |
| |
| APR_DECLARE(apr_size_t) |
| apr_thread_pool_threads_high_count(apr_thread_pool_t * me) |
| { |
| return me->thd_high; |
| } |
| |
| APR_DECLARE(apr_size_t) |
| apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) |
| { |
| return me->thd_timed_out; |
| } |
| |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) |
| { |
| return me->idle_max; |
| } |
| |
| APR_DECLARE(apr_interval_time_t) |
| apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) |
| { |
| return me->idle_wait; |
| } |
| |
| /* |
| * Stop threads above given *cnt, set the number of threads stopped in *cnt. |
| * NOTE: There could be busy threads become idle during this function |
| */ |
| static void stop_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle) |
| { |
| struct apr_thread_list *thds; |
| struct apr_thread_list_elt *elt, *last; |
| apr_size_t n, i; |
| |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| |
| if (idle) { |
| thds = me->idle_thds; |
| n = me->idle_cnt; |
| } |
| else { |
| thds = me->busy_thds; |
| n = me->busy_cnt; |
| } |
| if (n <= *cnt) { |
| apr_thread_mutex_unlock(me->lock); |
| *cnt = 0; |
| return; |
| } |
| |
| elt = APR_RING_FIRST(thds); |
| last = APR_RING_LAST(thds); |
| for (i = 0; i < *cnt; ++i) { |
| elt = APR_RING_NEXT(elt, link); |
| } |
| for (; i < n; ++i) { |
| elt->state = TH_STOP; |
| if (elt == last) { |
| break; |
| } |
| elt = APR_RING_NEXT(elt, link); |
| } |
| assert(i + 1 == n); |
| *cnt -= n; |
| |
| join_dead_threads(me); |
| |
| apr_thread_mutex_unlock(me->lock); |
| } |
| |
| static apr_size_t stop_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) |
| { |
| stop_threads(me, &cnt, 1); |
| if (cnt) { |
| apr_thread_mutex_lock(me->lock); |
| apr_pool_owner_set(me->pool, 0); |
| apr_thread_cond_broadcast(me->more_work); |
| apr_thread_mutex_unlock(me->lock); |
| } |
| return cnt; |
| } |
| |
| static apr_size_t stop_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) |
| { |
| stop_threads(me, &cnt, 0); |
| return cnt; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, |
| apr_size_t cnt) |
| { |
| me->idle_max = cnt; |
| return stop_idle_threads(me, cnt); |
| } |
| |
| APR_DECLARE(apr_interval_time_t) |
| apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, |
| apr_interval_time_t timeout) |
| { |
| apr_interval_time_t oldtime; |
| |
| oldtime = me->idle_wait; |
| me->idle_wait = timeout; |
| |
| return oldtime; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) |
| { |
| return me->thd_max; |
| } |
| |
| /* |
| * This function stop extra working threads to the new limit. |
| * NOTE: There could be busy threads become idle during this function |
| */ |
| APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, |
| apr_size_t cnt) |
| { |
| apr_size_t n, i; |
| |
| me->thd_max = cnt; |
| n = me->thd_cnt; |
| if (n <= cnt) { |
| return 0; |
| } |
| n -= cnt; /* #threads to stop */ |
| |
| i = me->idle_cnt; |
| if (n >= i) { |
| stop_busy_threads(me, n - i); |
| n = i; /* stop all idle threads */ |
| } |
| stop_idle_threads(me, i - n); |
| |
| return n; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) |
| { |
| return me->threshold; |
| } |
| |
| APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, |
| apr_size_t val) |
| { |
| apr_size_t ov; |
| |
| ov = me->threshold; |
| me->threshold = val; |
| return ov; |
| } |
| |
| APR_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, |
| void **owner) |
| { |
| apr_status_t rv; |
| apr_thread_pool_task_t *task; |
| void *data; |
| |
| rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); |
| if (rv != APR_SUCCESS) { |
| return rv; |
| } |
| |
| task = data; |
| if (!task) { |
| *owner = NULL; |
| return APR_BADARG; |
| } |
| |
| *owner = task->owner; |
| return APR_SUCCESS; |
| } |
| |
| #endif /* APR_HAS_THREADS */ |
| |
| /* vim: set ts=4 sw=4 et cin tw=80: */ |