| /* task.c : Implement the parallel task execution machine. |
| * |
| * ==================================================================== |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * ==================================================================== |
| */ |
| |
| #include "private/svn_task.h" |
| |
| #include <assert.h> |
| #include <apr_thread_proc.h> |
| |
| #include "private/svn_atomic.h" |
| #include "private/svn_thread_cond.h" |
| |
| |
| /* Top of the task tree. |
| * |
| * It is accessible from all tasks and contains all necessary resource |
| * pools and synchronization mechanisms. |
| */ |
| typedef struct root_t |
| { |
| /* Global mutex protecting the whole task tree. |
| * Any modification on the tree structure or task state requires |
| * serialization through this mutex. |
| * |
| * In single-threaded execution, this will be a no-op dummy. |
| */ |
| svn_mutex__t *mutex; |
| |
| /* Used to portably implement a memory barrier in C. */ |
| svn_mutex__t *memory_barrier_mutex; |
| |
| /* Signals to waiting ("sleeping") worker threads that they need to wake |
| * up. This may be due to new tasks being available or because the task |
| * runner is about to terminate. |
| * |
| * Waiting tasks must hold the MUTEX when entering the waiting state. |
| * |
| * NULL if execution is single-threaded. |
| */ |
| svn_thread_cond__t *worker_wakeup; |
| |
| /* Signals to the foreground thread that some tasks may have been processed |
| * and output process may commence. However, there is no guarantee that |
| * any task actually completed nor that it is the one whose output needs |
| * to be processed next. |
| * |
| * Waiting tasks must hold the MUTEX when entering the waiting state. |
| * |
| * NULL if execution is single-threaded. |
| */ |
| svn_thread_cond__t *task_processed; |
| |
| /* The actual root task. */ |
| svn_task__t *task; |
| |
| /* All allocations from TASK_POOL must be serialized through this mutex. */ |
| svn_mutex__t *task_alloc_mutex; |
| |
| /* Pools "segregated" for reduced lock contention when multi-threading. |
| * Ordered by lifetime of the objects allocated in them (long to short). |
| */ |
| |
| /* Allocate tasks and callbacks here. |
| * These have the longest lifetimes and will (currently) not be released |
| * until this root object gets cleaned up. |
| */ |
| apr_pool_t *task_pool; |
| |
| /* Allocate per-task parameters (process_baton) from sub-pools of this. |
| * They should be cleaned up as soon as the respective task has been |
| * processed (the parameters will not be needed anymore). |
| */ |
| apr_pool_t *process_pool; |
| |
| /* Allocate per-task results_t as well as the actual outputs from sub-pools |
| * of this. Allocation will happen just before calling the processing |
| * function. Release the memory immediately afterwards, unless some actual |
| * output has been produced. |
| */ |
| apr_pool_t *results_pool; |
| |
| /* Context construction parameters as passed in to svn_task__run(). */ |
| svn_task__thread_context_constructor_t context_constructor; |
| void *context_baton; |
| |
| /* If TRUE, end task processing. In multi-threaded execution, the main |
| * (= output) thread will set this upon error, cancellation or simply |
| * when all work is done. Worker threads will check for it and terminate |
| * asap. |
| */ |
| svn_atomic_t terminate; |
| |
| } root_t; |
| |
| /* Sub-structure of svn_task__t containing that task's processing output. |
| */ |
| typedef struct results_t |
| { |
| /* (Last part of the) output produced by the task. |
| * If the task has sub-tasks, additional output (produced before creating |
| * each sub-task) may be found in the respective sub-task's |
| * PRIOR_PARENT_OUTPUT. NULL, if no output was produced. |
| */ |
| void *output; |
| |
| /* Error code returned by the proccessing function. */ |
| svn_error_t *error; |
| |
| /* Parent task's output before this task has been created, i.e. the part |
| * that shall be passed to the output function before this task's output. |
| * NULL, if there is no prior parent output. |
| * |
| * This has to be allocated in the parent's OUTPUT->POOL. |
| */ |
| void *prior_parent_output; |
| |
| /* The tasks' output may be split into multiple parts, produced before |
| * and in between sub-tasks. Those will be stored in the OUTPUT structs |
| * of those sub-tasks but have been allocated in (their parent's) POOL. |
| * |
| * This flag indicates if such partial results exist. POOL must not be |
| * destroyed in that case, until all sub-tasks' outputs have been handled. |
| */ |
| svn_boolean_t has_partial_results; |
| |
| /* Pool used to allocate this structure as well as the contents of OUTPUT |
| * and PRIOR_PARENT_OUTPUT in any immediate sub-task. */ |
| apr_pool_t *pool; |
| |
| } results_t; |
| |
| /* The task's callbacks. |
| * |
| * We keep them in a separate structure such that they may be shared |
| * easily between task & sub-task. |
| */ |
| typedef struct callbacks_t |
| { |
| /* Process function to call. |
| * The respective baton is task-specific and held in svn_task__t. |
| * |
| * NULL is legal here (for stability reasons and maybe future extensions) |
| * but pointless as no processing will happen and no output can be |
| * produced, in turn bypassing OUTPUT_FUNC. |
| */ |
| svn_task__process_func_t process_func; |
| |
| /* Output function to call, if there was output. */ |
| svn_task__output_func_t output_func; |
| |
| /* Baton to pass into OUTPUT_FUNC. */ |
| void *output_baton; |
| |
| } callbacks_t; |
| |
| /* Task main data structure - a node in the task tree. |
| */ |
| struct svn_task__t |
| { |
| /* Root object where all allocation & synchronization happens. */ |
| root_t *root; |
| |
| /* Tree structure */ |
| |
| /* Parent task. |
| * NULL for the root node: |
| * (PARENT==NULL) == (this==ROOT->TASK). |
| */ |
| svn_task__t *parent; |
| |
| /* First immediate sub-task (in creation order). */ |
| svn_task__t *first_sub; |
| |
| /* Latest immediate sub-task (in creation order). */ |
| svn_task__t *last_sub; |
| |
| /* Next sibling, i.e. next in the list of PARENT's immediate sub-tasks |
| * (in creation order). */ |
| svn_task__t *next; |
| |
| /* Index of this task within the PARENT's sub-task list, i.e. the number |
| * of siblings created before this one. The value will *not* be adjusted |
| * should prior siblings be removed. |
| * |
| * This will be used to efficiently determine before / after relationships |
| * between arbitrary siblings. |
| */ |
| apr_size_t sub_task_idx; |
| |
| /* Efficiently track tasks that need processing */ |
| |
| /* The first task, in pre-order, of this sub-tree whose processing has not |
| * been started yet. This will be NULL, iff for all tasks in this sub-tree, |
| * processing has at least been started. |
| * |
| * If this==FIRST_READY, this task itself waits for being proccessed. |
| * In that case, there can't be any sub-tasks. |
| */ |
| svn_task__t *first_ready; |
| |
| /* The first immidiate sub-task that has not been processed. If this is |
| * NULL, they might still be unprocessed tasks deeper down the tree. |
| * |
| * Use this to efficiently find unprocessed tasks high up in the tree. |
| */ |
| svn_task__t *first_unprocessed; |
| |
| /* Task state. */ |
| |
| /* The callbacks to use. Never NULL. */ |
| callbacks_t *callbacks; |
| |
| /* Process baton to pass into CALLBACKS->PROCESS_FUNC. */ |
| void *process_baton; |
| |
| /* Pool used to allocate the PROCESS_BATON. |
| * Sub-pool of ROOT->PROCESS_POOL. |
| * |
| * NULL, iff processing of this task has completed (sub-tasks may still |
| * need processing). |
| */ |
| apr_pool_t *process_pool; |
| |
| /* The processing results. |
| * |
| * Will be NULL before processing and may be NULL afterwards, if all |
| * fields would be NULL. |
| */ |
| results_t *results; |
| }; |
| |
| |
| /* Adding tasks to the tree. */ |
| |
| /* Return the index of the first immediate sub-task of TASK with a ready |
| * sub-task in its respective sub-tree. TASK must have at least one proper |
| * sub-task. |
| */ |
| static apr_size_t first_ready_sub_task_idx(const svn_task__t *task) |
| { |
| svn_task__t *sub_task = task->first_ready; |
| |
| /* Don't use this function if there is no ready sub-task. */ |
| assert(sub_task); |
| assert(sub_task != task); |
| |
| while (sub_task->parent != task) |
| sub_task = sub_task->parent; |
| |
| return sub_task->sub_task_idx; |
| } |
| |
| /* Link TASK up with TASK->PARENT. */ |
| static svn_error_t *link_new_task(svn_task__t *task) |
| { |
| svn_task__t *current, *parent; |
| |
| /* Insert into parent's sub-task list. */ |
| if (task->parent->last_sub) |
| { |
| task->parent->last_sub->next = task; |
| task->sub_task_idx = task->parent->last_sub->sub_task_idx + 1; |
| } |
| |
| task->parent->last_sub = task; |
| if (!task->parent->first_sub) |
| task->parent->first_sub = task; |
| |
| /* TASK is ready for execution. |
| * |
| * It may be the first one in pre-order. Update parents until they |
| * have a "FIRST_READY" in a sub-tree before (in pre-order) the one |
| * containing TASK. */ |
| for (current = task, parent = task->parent; |
| parent |
| && ( !parent->first_ready |
| || first_ready_sub_task_idx(parent) >= current->sub_task_idx); |
| current = parent, parent = parent->parent) |
| { |
| parent->first_ready = task; |
| } |
| |
| if (task->parent->first_unprocessed == NULL) |
| task->parent->first_unprocessed = task; |
| |
| /* Test invariants for new tasks. |
| * |
| * We have to do it while still holding task tree mutex; background |
| * processing of this task may already have started otherwise. */ |
| assert(task->parent != NULL); |
| assert(task->first_sub == NULL); |
| assert(task->last_sub == NULL); |
| assert(task->next == NULL); |
| assert(task->first_ready == task); |
| assert(task->first_unprocessed == NULL); |
| assert(task->callbacks != NULL); |
| assert(task->process_pool != NULL); |
| |
| return SVN_NO_ERROR; |
| } |
| |
| /* If TASK has no RESULTS sub-structure, add one. Return the RESULTS struct. |
| * |
| * In multi-threaded environments, calls to this must be serialized with |
| * root_t changes. */ |
| static results_t *ensure_results(svn_task__t *task) |
| { |
| if (!task->results) |
| { |
| apr_pool_t * results_pool = svn_pool_create(task->root->results_pool); |
| task->results = apr_pcalloc(results_pool, sizeof(*task->results)); |
| task->results->pool = results_pool; |
| } |
| |
| return task->results; |
| } |
| |
| /* Allocate a new task in POOL and return it in *RESULT. |
| * |
| * In multi-threaded environments, calls to this must be serialized with |
| * root_t changes. */ |
| static svn_error_t *alloc_task(svn_task__t **result, apr_pool_t *pool) |
| { |
| *result = apr_pcalloc(pool, sizeof(**result)); |
| return SVN_NO_ERROR; |
| } |
| |
| /* Allocate a new callbacks structure in POOL and return it in *RESULT. |
| * |
| * In multi-threaded environments, calls to this must be serialized with |
| * root_t changes. */ |
| static svn_error_t *alloc_callbacks(callbacks_t **result, apr_pool_t *pool) |
| { |
| *result = apr_pcalloc(pool, sizeof(**result)); |
| return SVN_NO_ERROR; |
| } |
| |
| /* Allocate a new task and append it to PARENT's sub-task list. |
| * Store PROCESS_POOL, CALLBACKS and PROCESS_BATON in the respective |
| * fields of the task struct. If PARTIAL_OUTPUT is not NULL, it will |
| * be stored in the new task's OUTPUT structure. |
| * |
| * This function does not return the new struct. Instead, it is scheduled |
| * to eventually be picked up by the task runner, i.e. execution is fully |
| * controlled by the execution model and sub-tasks may only be added when |
| * the new task itself is being processed. |
| */ |
| static svn_error_t *add_task( |
| svn_task__t *parent, |
| apr_pool_t *process_pool, |
| void *partial_output, |
| callbacks_t *callbacks, |
| void *process_baton) |
| { |
| svn_task__t *new_task; |
| |
| /* The root node has its own special construction code and does not use |
| * this function. So, here we will always have a parent. */ |
| assert(parent != NULL); |
| |
| /* Catch construction snafus early in the process. */ |
| assert(callbacks != NULL); |
| |
| SVN_MUTEX__WITH_LOCK(parent->root->task_alloc_mutex, |
| alloc_task(&new_task, parent->root->task_pool)); |
| |
| new_task->root = parent->root; |
| new_task->process_baton = process_baton; |
| new_task->process_pool = process_pool; |
| |
| new_task->parent = parent; |
| if (partial_output && parent->callbacks->output_func) |
| { |
| ensure_results(parent)->has_partial_results = TRUE; |
| ensure_results(new_task)->prior_parent_output = partial_output; |
| } |
| |
| /* The new task will be ready for execution once we link it up in PARENT. */ |
| new_task->first_ready = new_task; |
| new_task->callbacks = callbacks; |
| |
| SVN_MUTEX__WITH_LOCK(new_task->root->mutex, link_new_task(new_task)); |
| |
| /* Wake up all waiting worker threads: There is work to do. |
| * If there is not enough work for all, some will go back to sleep. |
| * |
| * In single-threaded execution, no signalling is needed. */ |
| if (new_task->root->worker_wakeup) |
| SVN_ERR(svn_thread_cond__broadcast(new_task->root->worker_wakeup)); |
| |
| return SVN_NO_ERROR; |
| } |
| |
| svn_error_t *svn_task__add( |
| svn_task__t *current, |
| apr_pool_t *process_pool, |
| void *partial_output, |
| svn_task__process_func_t process_func, |
| void *process_baton, |
| svn_task__output_func_t output_func, |
| void *output_baton) |
| { |
| callbacks_t *callbacks; |
| SVN_MUTEX__WITH_LOCK(current->root->task_alloc_mutex, |
| alloc_callbacks(&callbacks, current->root->task_pool)); |
| |
| callbacks->process_func = process_func; |
| callbacks->output_func = output_func; |
| callbacks->output_baton = output_baton; |
| |
| return svn_error_trace(add_task(current, process_pool, partial_output, |
| callbacks, process_baton)); |
| } |
| |
| svn_error_t* svn_task__add_similar( |
| svn_task__t* current, |
| apr_pool_t *process_pool, |
| void* partial_output, |
| void* process_baton) |
| { |
| return svn_error_trace(add_task(current, process_pool, partial_output, |
| current->callbacks, process_baton)); |
| } |
| |
| apr_pool_t *svn_task__create_process_pool( |
| svn_task__t *parent) |
| { |
| return svn_pool_create(parent->root->process_pool); |
| } |
| |
| |
| /* Removing tasks from the tree */ |
| |
| /* Remove TASK from the parent tree. |
| * TASK must have been fully processed and there shall be no more sub-tasks. |
| */ |
| static svn_error_t *remove_task(svn_task__t *task) |
| { |
| svn_task__t *parent = task->parent; |
| |
| assert(task->first_ready == NULL); |
| assert(task->first_sub == NULL); |
| |
| if (parent) |
| { |
| if (parent->first_sub == task) |
| parent->first_sub = task->next; |
| if (parent->last_sub == task) |
| parent->last_sub = NULL; |
| } |
| else |
| { |
| /* Root task. Keep it because we might rely on it being present. |
| * However, make sure nobody tries to process its outputs twice. */ |
| task->results = NULL; |
| } |
| |
| return SVN_NO_ERROR; |
| } |
| |
| /* Recursively free all errors in TASK. |
| */ |
| static void clear_errors(svn_task__t *task) |
| { |
| svn_task__t *sub_task; |
| for (sub_task = task->first_sub; sub_task; sub_task = sub_task->next) |
| clear_errors(sub_task); |
| |
| if (task->results) |
| svn_error_clear(task->results->error); |
| } |
| |
| |
| /* Picking the next task to process */ |
| |
| /* Utility function that follows the chain of siblings and returns the first |
| * that has *some* unprocessed task in its sub-tree. |
| * |
| * Returns TASK if either TASK or any of its sub-tasks is unprocessed. |
| * Returns NULL if all direct or indirect sub-tasks of TASK->PARENT are |
| * already being processed or have been completed. |
| */ |
| static svn_task__t *next_ready(svn_task__t *task) |
| { |
| for (; task; task = task->next) |
| if (task->first_ready) |
| break; |
| |
| return task; |
| } |
| |
| /* Utility function that follows the chain of siblings and returns the |
| * first unprocessed task. |
| * |
| * Returns TASK if TASK is unprocessed. Returns NULL if all direct sub- |
| * tasks of TASK->PARENT are already being processed or have been completed. |
| */ |
| static svn_task__t *next_unprocessed(svn_task__t *task) |
| { |
| for (; task; task = task->next) |
| if (task->first_ready == task) |
| break; |
| |
| return task; |
| } |
| |
| /* Mark TASK as no longer being unprocessed. |
| * Call this before starting actual processing of TASK. |
| */ |
| static void unready_task(svn_task__t *task) |
| { |
| svn_task__t *parent, *current; |
| svn_task__t *first_ready = NULL; |
| |
| /* Make sure that processing on TASK has not already started. */ |
| assert(task->first_ready == task); |
| |
| /* Also, there should be no sub-tasks before processing this one. |
| * Sub-tasks may only be added by processing the immediate parent. */ |
| assert(task->first_sub == NULL); |
| |
| /* There are no sub-tasks, hence nothing in the sub-tree could be ready. */ |
| task->first_ready = NULL; |
| |
| /* Bubble up the tree while TASK is the "first ready" one. |
| * Update the pointers to the next one ready. */ |
| for (current = task, parent = task->parent; |
| parent && (parent->first_ready == task); |
| current = parent, parent = parent->parent) |
| { |
| /* If we have not found another task that is ready, search the |
| * siblings for one. A suitable one cannot be *before* CURRENT |
| * or otherwise, PARENT->FIRST_READY would not equal TASK. |
| * It is possible that we won't find one at the current level. */ |
| if (!first_ready) |
| { |
| svn_task__t *first_ready_sub_task = next_ready(current->next); |
| first_ready = first_ready_sub_task |
| ? first_ready_sub_task->first_ready |
| : NULL; |
| } |
| |
| parent->first_ready = first_ready; |
| } |
| |
| /* Update FIRST_PROCESSED as well. Since this points only from parent to |
| * some immediate sub-task, no bubble-up action is required here. */ |
| if (task->parent && task->parent->first_unprocessed == task) |
| task->parent->first_unprocessed = next_unprocessed(task->next); |
| } |
| |
| |
| /* Task processing and outputting results */ |
| |
| /* Return TRUE if there are signs that another worker thread is working on |
| * the sub-tree of TASK or its next sibling. Detection does not need to be |
| * perfect as this is just a hint to the scheduling strategy. See |
| * set_processed_and_pick() for details. |
| * |
| * This function must be called with TASK->ROOT->MUTEX acquired. |
| */ |
| static svn_boolean_t is_contented(const svn_task__t *task) |
| { |
| /* Assuming TASK has just been processed, the first sub-task should now |
| * be ready for execution. Having no sub-tasks is also o.k. If both |
| * pointers differ, some other worker already picked up a sub-task. */ |
| if (task->first_sub != task->first_ready) |
| return TRUE; |
| |
| /* If this whole sub-tree has been completed, check whether we can |
| * continue with the next sibling. If that is already being processed, |
| * we would "step on somebody else's toes". */ |
| if ( task->first_ready == NULL |
| && task->next |
| && task->next->first_ready == task->next) |
| return TRUE; |
| |
| /* No signs of a clash found. */ |
| return FALSE; |
| } |
| |
| /* The forground output_processed() function will now consider TASK's |
| * processing function to be completed. Sub-tasks may still be pending. */ |
| static void set_processed(svn_task__t *task) |
| { |
| task->process_pool = NULL; |
| } |
| |
| /* Return whether TASK's processing function has been completed. |
| * Pending sub-tasks will be ignored. */ |
| static svn_boolean_t is_processed(const svn_task__t *task) |
| { |
| return (task->process_pool == NULL); |
| } |
| |
| /* Mark TASK as "processing completed" and pick a *NEXT_TASK to continue |
| * and mark it as "being processed". If no good candidate has been found, |
| * set *NEXT_TASK to NULL. |
| * |
| * The heuristics in here are crucial for an efficient parallel traversal |
| * of deep, unbalanced and growing trees. In their sequential form, many |
| * of the processing functions use parent context information to cache data |
| * and speed up processing in general. Processing tasks in tree order |
| * takes full advantage of this. |
| * |
| * However, having N workers serving the same sub-tree, that advantage gets |
| * cut down propertionally and may turn into pure overhead. Ideally, we |
| * want workers to be on distant sub-trees where they can process many tasks |
| * before stepping onto a different worker's "turf". Whenever the latter |
| * happends, we crawl up the tree to find the most distant (=most high up) |
| * unprocessed task in the tree. If that fails too, return NULL here and |
| * continue with the "canonical" next_task(). |
| * |
| * Please note that we are free to execute tasks in *any* order and this |
| * here is just about being efficient. |
| * |
| * This function must be called with TASK->ROOT->MUTEX acquired. |
| */ |
| static svn_error_t *set_processed_and_pick(svn_task__t **next_task, |
| svn_task__t *task) |
| { |
| set_processed(task); |
| |
| /* Are we still alone in our sub-tree? */ |
| if (is_contented(task)) |
| { |
| /* Nope. |
| * Maybe there is some untouched sub-tree under one of our parents. |
| * If so, find the one the highest up in the tree. |
| * |
| * Note that we may not find any despite some "cousin" being a good |
| * option. This is because we only walk up the ancestry line and |
| * check for immediate children there. */ |
| while (task->parent && task->parent->first_unprocessed) |
| task = task->parent; |
| |
| task = task->first_unprocessed; |
| } |
| else |
| { |
| /* Probably yes. |
| * Just pick the next task, continue at the parent as needed. */ |
| while (task->first_ready == NULL && task->parent) |
| task = task->parent; |
| |
| task = task->first_ready; |
| } |
| |
| /* Atomic operation: We need to mark the new task as "in process" while |
| * still holding the mutex. */ |
| if (task) |
| unready_task(task); |
| |
| *next_task = task; |
| return SVN_NO_ERROR; |
| } |
| |
| /* Full memory barrier to make sure we see all data changes made by other |
| * cores up to this point. ROOT provides any necessary synchronization |
| * objects. |
| */ |
| static svn_error_t *enforce_sequential_consistency(root_t *root) |
| { |
| /* We only need GCC's "asm volatile" construct but that is not portable. */ |
| |
| /* Mutexes come with full memory barriers as a side-effect. But because |
| * we use several mutexes for each single task, adding this mutex one, |
| * which is even never contested, adds only moderate overhead. */ |
| SVN_MUTEX__WITH_LOCK(root->memory_barrier_mutex, SVN_NO_ERROR); |
| return SVN_NO_ERROR; |
| } |
| |
| /* Process a single TASK within the given THREAD_CONTEXT. It may add |
| * sub-tasks but those need separate calls to this function to be processed. |
| * |
| * Pass CANCEL_FUNC, CANCEL_BATON and SCRATCH_POOL to the TASK's process |
| * function. |
| * |
| * This will destroy TASK->PROCESS_POOL but will not reset the pointer. |
| * You have to do that explicitly by calling set_processed(). The reason |
| * is that in multi-threaded execution, you may want that transition to |
| * be atomic with other tree operations. |
| */ |
| static void process(svn_task__t *task, |
| void *thread_context, |
| svn_cancel_func_t cancel_func, |
| void *cancel_baton, |
| apr_pool_t *scratch_pool) |
| { |
| callbacks_t *callbacks = task->callbacks; |
| |
| if (callbacks->process_func) |
| { |
| /* Depending on whether there is prior parent output, we may or |
| * may not have already an OUTPUT structure allocated for TASK. */ |
| results_t *results = ensure_results(task); |
| results->error = callbacks->process_func(&results->output, task, |
| thread_context, |
| task->process_baton, |
| cancel_func, cancel_baton, |
| results->pool, scratch_pool); |
| |
| /* If there is no way to output the results, we simply ignore them. */ |
| if (!callbacks->output_func) |
| results->output = NULL; |
| |
| /* Anything left that we may want to output? */ |
| if ( !results->error |
| && !results->output |
| && !results->prior_parent_output |
| && !results->has_partial_results) |
| { |
| /* Nope. Release the memory and reset OUTPUT such that |
| * output_processed() can quickly skip it. */ |
| svn_pool_destroy(results->pool); |
| task->results = NULL; |
| } |
| } |
| |
| svn_pool_destroy(task->process_pool); |
| } |
| |
| /* Output *TASK results in post-order until we encounter a task that has not |
| * been processed, yet - which may be *TASK itself - and return it in *TASK. |
| * |
| * Pass CANCEL_FUNC, CANCEL_BATON and RESULT_POOL into the respective task |
| * output functions. Use SCRATCH_POOL for temporary allocations. |
| */ |
| static svn_error_t *output_processed( |
| svn_task__t **task, |
| svn_cancel_func_t cancel_func, |
| void *cancel_baton, |
| apr_pool_t *result_pool, |
| apr_pool_t *scratch_pool) |
| { |
| svn_task__t *current = *task; |
| apr_pool_t *iterpool = svn_pool_create(scratch_pool); |
| results_t *results; |
| callbacks_t *callbacks; |
| |
| while (current && is_processed(current)) |
| { |
| svn_pool_clear(iterpool); |
| |
| /* When tasks are executed in background threads, we do have a |
| * potential memory ordering issue: We may see a mix of old and new |
| * state in CURRENT, i.e. the "is processed" state (new) but without |
| * the updates to the other fields and sub-structures. |
| * |
| * The worker threads acquire a mutex before setting the "is processed" |
| * state and those act as full memory barriers, i.e. commit all prior |
| * changes to memory. On the hardware level, this is enough to make |
| * sure the we read the updated data once we saw the state change. |
| * |
| * The compiler, however, does not know that this is (potentially) |
| * threaded code. So, VERY technically, it is allowed to reorder |
| * operations and read fields of CURRENT *before* reading the state. |
| * This would no longer be sequentially consistent. Prevent that. */ |
| enforce_sequential_consistency(current->root); |
| |
| /* Post-order, i.e. dive into sub-tasks first. |
| * |
| * Note that the post-order refers to the task ordering and the output |
| * plus errors returned by the processing function. The CURRENT task |
| * may have produced additional, partial outputs and attached them to |
| * the sub-tasks. These outputs will be processed with the respective |
| * sub-tasks. |
| * |
| * Also note that the "background" processing for CURRENT has completed |
| * and only the output function may add further sub-tasks. */ |
| if (current->first_sub) |
| { |
| current = current->first_sub; |
| results = current->results; |
| callbacks = current->parent->callbacks; |
| |
| /* We will handle this sub-task in the next iteration but we |
| * may have to process results produced before or in between |
| * sub-tasks. Also note that PRIOR_PARENT_OUTPUT not being NULL |
| * implies that OUTPUT_FUNC is also not NULL. */ |
| if (results && results->prior_parent_output) |
| SVN_ERR(callbacks->output_func( |
| current->parent, results->prior_parent_output, |
| callbacks->output_baton, |
| cancel_func, cancel_baton, |
| result_pool, iterpool)); |
| } |
| else |
| { |
| /* No deeper sub-task. Process the results from CURRENT. */ |
| results = current->results; |
| if (results) |
| { |
| /* Return errors. |
| * Make sure they don't get cleaned up with the task tree. */ |
| svn_error_t *err = results->error; |
| results->error = SVN_NO_ERROR; |
| SVN_ERR(err); |
| |
| /* Handle remaining output of the CURRENT task. */ |
| callbacks = current->callbacks; |
| if (results->output) |
| SVN_ERR(callbacks->output_func( |
| current, results->output, |
| callbacks->output_baton, |
| cancel_func, cancel_baton, |
| result_pool, iterpool)); |
| } |
| |
| /* The output function may have added further sub-tasks. |
| * Handle those in the next iteration. */ |
| if (!current->first_sub) |
| { |
| /* Task completed. No further sub-tasks. |
| * Remove this task from the tree and continue at the parent, |
| * recursing into the next sub-task (==NEXT, if not NULL) |
| * with the next iteration. */ |
| svn_task__t *to_delete = current; |
| current = to_delete->parent; |
| SVN_MUTEX__WITH_LOCK(to_delete->root->mutex, |
| remove_task(to_delete)); |
| |
| /* We have output all sub-nodes, including all partial results. |
| * Therefore, the last used thing allocated in OUTPUT->POOL is |
| * OUTPUT itself and it is safe to clean that up. */ |
| if (results) |
| svn_pool_destroy(results->pool); |
| } |
| } |
| } |
| |
| svn_pool_destroy(iterpool); |
| *task = current; |
| |
| return SVN_NO_ERROR; |
| } |
| |
| |
| /* Execution models */ |
| |
| #if APR_HAS_THREADS |
| |
| /* From ROOT, find the first unprocessed task - in pre-order - mark it as |
| * "in process" and return it in *TASK. If no such task exists, wait for |
| * the ROOT->WORKER_WAKEUP condition and retry. |
| * |
| * If ROOT->TERMINATE is set, return NULL for *TASK. |
| * |
| * If the main thread is waiting on us to process tasks, this logic will |
| * implicitly pick that task. So, by default, all workers start on those |
| * tasks that are immediately useful for the output processing. Only later |
| * will contention detection bounce most of them off to other sub-tasks. |
| * However, there are still likely to come back to this once in a while and |
| * ensure that progress is made at the beginning of the tree and the output |
| * is not delayed much. |
| * |
| * This function must be called with ROOT->MUTEX acquired. |
| */ |
| static svn_error_t *next_task(svn_task__t **task, root_t *root) |
| { |
| while (TRUE) |
| { |
| /* Spurious wakeups are being handled implicitly |
| * (check conditions and go back to sleep). */ |
| |
| /* Worker thread needs to terminate? */ |
| if (svn_atomic_read(&root->terminate)) |
| { |
| *task = NULL; |
| return SVN_NO_ERROR; |
| } |
| |
| /* If there are unprocessed tasks, pick the first one. */ |
| if (root->task->first_ready) |
| { |
| svn_task__t *current = root->task->first_ready; |
| unready_task(current); |
| *task = current; |
| |
| return SVN_NO_ERROR; |
| } |
| |
| /* No task, no termination. Wait for one of these to happen. */ |
| SVN_ERR(svn_thread_cond__wait(root->worker_wakeup, root->mutex)); |
| } |
| |
| return SVN_NO_ERROR; |
| } |
| |
| /* Cancellation function to be used within background threads. |
| * BATON is the root_t object. |
| * |
| * This simply checks for cancellation by the forground thread. |
| * Normal cancellation is handled by the output function and then simply |
| * indicated by flag. |
| * |
| * Note that termination due to errors returned by other tasks will also |
| * be treated as a cancellation with the respective SVN_ERR_CANCELLED |
| * error being returned from the current tasks in all workers. |
| */ |
| static svn_error_t *worker_cancelled(void *baton) |
| { |
| root_t *root = baton; |
| return svn_atomic_read(&root->terminate) |
| ? svn_error_create(SVN_ERR_CANCELLED, NULL, NULL) |
| : SVN_NO_ERROR; |
| } |
| |
| /* Set the TERMINATE flag in ROOT and make sure all worker threads get the |
| * message. The latter is required to actually terminate all workers once |
| * all tasks have been completed, because workers don't terminate themselves |
| * unless there is some internal error. |
| */ |
| static svn_error_t *send_terminate(root_t *root) |
| { |
| svn_atomic_set(&root->terminate, TRUE); |
| return svn_thread_cond__broadcast(root->worker_wakeup); |
| } |
| |
| /* Background worker processing any task in ROOT until termination has been |
| * signalled in ROOT. Use SCRATCH_POOL for temporary allocations. |
| */ |
| static svn_error_t *worker(root_t *root, apr_pool_t *scratch_pool) |
| { |
| apr_pool_t *iterpool = svn_pool_create(scratch_pool); |
| svn_task__t *task = NULL; |
| |
| /* The context may be quite complex, so we use the ITERPOOL to clean up any |
| * memory that was used temporarily during context creation. */ |
| void *thread_context = NULL; |
| if (root->context_constructor) |
| SVN_ERR(root->context_constructor(&thread_context, root->context_baton, |
| scratch_pool, iterpool)); |
| |
| /* Keep processing tasks until termination. |
| * If no tasks need processing, sleep until being signalled |
| * (new task or termination). */ |
| while (!svn_atomic_read(&root->terminate)) |
| { |
| svn_pool_clear(iterpool); |
| if (!task) |
| { |
| /* We did not pick a suitable task to continue with. |
| * |
| * Make sure the output task is not sleeping (we may have processed |
| * many tasks in a large sub-tree without telling the forground |
| * thread), so it may tell us to continue or terminate. */ |
| SVN_ERR(svn_thread_cond__signal(root->task_processed)); |
| |
| /* Pick the next task in pre-order. If none exists, sleep until |
| * woken up. */ |
| SVN_MUTEX__WITH_LOCK(root->mutex, next_task(&task, root)); |
| |
| /* None existed, we slept, got woken up and there still was nothing. |
| * This implies termination. */ |
| if (!task) |
| break; |
| } |
| |
| /* Process this TASK and pick a suitable next one, if available. */ |
| process(task, thread_context, worker_cancelled, root, iterpool); |
| SVN_MUTEX__WITH_LOCK(root->mutex, |
| set_processed_and_pick(&task, task)); |
| } |
| |
| /* Cleanup. */ |
| svn_pool_destroy(iterpool); |
| |
| return SVN_NO_ERROR; |
| } |
| |
| /* The plain APR thread around the worker function. |
| * DATA is the root_t object to work on. */ |
| static void * APR_THREAD_FUNC |
| worker_thread(apr_thread_t *thread, void *data) |
| { |
| /* Each thread uses a separate single-threaded pool tree for minimum overhead |
| */ |
| apr_pool_t *pool = apr_allocator_owner_get(svn_pool_create_allocator(FALSE)); |
| |
| apr_status_t result = APR_SUCCESS; |
| svn_error_t *err = worker(data, pool); |
| if (err) |
| { |
| result = err->apr_err; |
| svn_error_clear(err); |
| } |
| |
| svn_pool_destroy(pool); |
| |
| /* End thread explicitly to prevent APR_INCOMPLETE return codes in |
| apr_thread_join(). */ |
| apr_thread_exit(thread, result); |
| return NULL; |
| } |
| |
| /* If TASK has not been processed, yet, wait for it. Before waiting for |
| * the "task processed" signal, start a new worker thread, allocated in a |
| * THREAD_SAFE_POOL, and add it to the array of THREADS. |
| * |
| * So, everytime we run out of processing results, we add a new worker. |
| * This results in a slightly delayed spawning of new threads. The total |
| * number of worker threads is limited to THREAD_COUNT. |
| * |
| * This function must be called with TASK->ROOT->MUTEX acquired. |
| */ |
| static svn_error_t *wait_for_outputting_state( |
| svn_task__t *task, |
| apr_int32_t thread_count, |
| apr_array_header_t *threads, |
| apr_pool_t *thread_safe_pool) |
| { |
| root_t* root = task->root; |
| while (TRUE) |
| { |
| if (is_processed(task)) |
| return SVN_NO_ERROR; |
| |
| /* Maybe spawn another worker thread because there are waiting tasks. |
| */ |
| if (thread_count > threads->nelts) |
| { |
| apr_thread_t *thread; |
| apr_status_t status = apr_thread_create(&thread, NULL, |
| worker_thread, |
| root, |
| thread_safe_pool); |
| if (status) |
| return svn_error_wrap_apr(status, |
| "Creating worker thread failed"); |
| |
| APR_ARRAY_PUSH(threads, apr_thread_t *) = thread; |
| } |
| |
| /* Efficiently wait for tasks to (maybe) be completed. */ |
| SVN_ERR(svn_thread_cond__wait(root->task_processed, root->mutex)); |
| } |
| |
| return SVN_NO_ERROR; |
| } |
| |
| #endif /* APR_HAS_THREADS */ |
| |
| /* Run the (root) TASK to completion, including dynamically added sub-tasks. |
| * Use up to THREAD_COUNT worker threads for that. |
| * |
| * Pass CANCEL_FUNC and CANCEL_BATON only into the output function callbacks. |
| * Pass the RESULT_POOL into the task output functions and use SCRATCH_POOL |
| * for everything else (unless covered by task pools). |
| */ |
| static svn_error_t *execute_concurrently( |
| svn_task__t *task, |
| apr_int32_t thread_count, |
| svn_cancel_func_t cancel_func, |
| void *cancel_baton, |
| apr_pool_t *result_pool, |
| apr_pool_t *scratch_pool) |
| { |
| #if APR_HAS_THREADS |
| |
| int i; |
| svn_task__t *current = task; |
| root_t *root = task->root; |
| svn_error_t *task_err = SVN_NO_ERROR; |
| svn_error_t *sync_err = SVN_NO_ERROR; |
| apr_pool_t *iterpool = svn_pool_create(scratch_pool); |
| apr_array_header_t *threads = apr_array_make(scratch_pool, thread_count, |
| sizeof(apr_thread_t *)); |
| |
| /* We need a thread-safe-pool to create the actual thread objects. */ |
| apr_pool_t *thread_safe_pool = svn_pool_create(root->results_pool); |
| |
| /* Main execution loop */ |
| while (current && !task_err) |
| { |
| svn_pool_clear(iterpool); |
| |
| /* Spawns worker thread as needed. |
| * |
| * Acquiring the mutex also acts as a full memory barrier such that we |
| * will see updates to task states. Since we set_processed() last, |
| * all other information relevant to the task will be valid, too. */ |
| SVN_MUTEX__WITH_LOCK(root->mutex, |
| wait_for_outputting_state(current, thread_count, |
| threads, |
| thread_safe_pool)); |
| |
| /* Crawl processed tasks and output results until we exhaust processed |
| * tasks. */ |
| task_err = output_processed(¤t, |
| cancel_func, cancel_baton, |
| result_pool, iterpool); |
| } |
| |
| /* Tell all worker threads to terminate. */ |
| SVN_MUTEX__WITH_LOCK(root->mutex, send_terminate(root)); |
| |
| /* Wait for all threads to terminate. */ |
| for (i = 0; i < threads->nelts; ++i) |
| { |
| apr_thread_t *thread = APR_ARRAY_IDX(threads, i, apr_thread_t *); |
| apr_status_t retval; |
| apr_status_t sync_status = apr_thread_join(&retval, thread); |
| |
| if (retval != APR_SUCCESS) |
| sync_err = svn_error_compose_create(sync_err, |
| svn_error_wrap_apr(retval, |
| "Worker thread returned error")); |
| |
| if (sync_status != APR_SUCCESS) |
| sync_err = svn_error_compose_create(sync_err, |
| svn_error_wrap_apr(sync_status, |
| "Worker thread join error")); |
| } |
| |
| /* Explicitly release any (other) error. Leave pools as they are. |
| * This is important in the case of early exists due to error returns. |
| * |
| * However, don't do it if something went wrong while waiting for worker |
| * threads to terminate. They might still be doing something and might |
| * crash at any time. Doing nothing here might increase our chance for |
| * the SYNC_ERR to eventually be reported to the user. */ |
| if (!sync_err) |
| clear_errors(task); |
| svn_pool_destroy(iterpool); |
| |
| /* Get rid of all remaining tasks. */ |
| return svn_error_trace(svn_error_compose_create(sync_err, task_err)); |
| |
| #else |
| |
| /* How the hell did we end up here? */ |
| SVN_ERR_MALFUNCTION(); |
| return SVN_NO_ERROR; |
| |
| #endif |
| } |
| |
| /* Run the (root) TASK to completion, including dynamically added sub-tasks. |
| * Pass CANCEL_FUNC and CANCEL_BATON directly into the task callbacks. |
| * Pass the RESULT_POOL into the task output functions and use SCRATCH_POOL |
| * for everything else (unless covered by task pools). |
| */ |
| static svn_error_t *execute_serially( |
| svn_task__t *task, |
| svn_cancel_func_t cancel_func, |
| void *cancel_baton, |
| apr_pool_t *result_pool, |
| apr_pool_t *scratch_pool) |
| { |
| root_t* root = task->root; |
| svn_error_t *task_err = SVN_NO_ERROR; |
| apr_pool_t *iterpool = svn_pool_create(scratch_pool); |
| |
| /* Task to execute currently. |
| * Always the first unprocessed task in pre-order. */ |
| svn_task__t *current = task; |
| |
| /* The context may be quite complex, so we use the ITERPOOL to clean up |
| * any memory that was used temporarily during context creation. */ |
| void *thread_context = NULL; |
| if (root->context_constructor) |
| SVN_ERR(root->context_constructor(&thread_context, root->context_baton, |
| scratch_pool, iterpool)); |
| |
| /* Process one task at a time, stop upon error or when the whole tree |
| * has been completed. */ |
| while (current && !task_err) |
| { |
| svn_pool_clear(iterpool); |
| |
| /* "would-be background" processing the CURRENT task. */ |
| unready_task(current); |
| process(current, thread_context, cancel_func, cancel_baton, iterpool); |
| set_processed(current); |
| |
| /* Output results in "forground" and move CURRENT to the next one |
| * needing processing. */ |
| task_err = output_processed(¤t, |
| cancel_func, cancel_baton, |
| result_pool, scratch_pool); |
| } |
| |
| /* Explicitly release any (other) error. Leave pools as they are. |
| * This is important in the case of early exists due to error returns. */ |
| clear_errors(task); |
| svn_pool_destroy(iterpool); |
| |
| /* Get rid of all remaining tasks. */ |
| return svn_error_trace(task_err); |
| } |
| |
| |
| /* Root data structure */ |
| |
| /* Pool cleanup function to make sure we free the root pools (allocators) */ |
| static apr_status_t |
| root_cleanup(void *baton) |
| { |
| root_t *root = baton; |
| svn_pool_destroy(root->task_pool); |
| svn_pool_destroy(root->process_pool); |
| svn_pool_destroy(root->results_pool); |
| |
| return APR_SUCCESS; |
| } |
| |
| svn_error_t *svn_task__run( |
| apr_int32_t thread_count, |
| svn_task__process_func_t process_func, |
| void *process_baton, |
| svn_task__output_func_t output_func, |
| void *output_baton, |
| svn_task__thread_context_constructor_t context_constructor, |
| void *context_baton, |
| svn_cancel_func_t cancel_func, |
| void *cancel_baton, |
| apr_pool_t *result_pool, |
| apr_pool_t *scratch_pool) |
| { |
| root_t *root = apr_pcalloc(scratch_pool, sizeof(*root)); |
| |
| /* Pick execution model. |
| * |
| * Note that multi-threading comes with significant overheads and should |
| * not be used unless requested. */ |
| #if APR_HAS_THREADS |
| svn_boolean_t threaded_execution = thread_count > 1; |
| #else |
| svn_boolean_t threaded_execution = FALSE; |
| #endif |
| |
| /* Allocation on stack is fine as this function will not exit before |
| * all task processing has been completed. */ |
| callbacks_t callbacks; |
| |
| /* The mutexes must always be constructed. But we only need their light- |
| * weight versions in single-threaded execution. */ |
| SVN_ERR(svn_mutex__init(&root->mutex, threaded_execution, scratch_pool)); |
| SVN_ERR(svn_mutex__init(&root->memory_barrier_mutex, threaded_execution, |
| scratch_pool)); |
| SVN_ERR(svn_mutex__init(&root->task_alloc_mutex, threaded_execution, |
| scratch_pool)); |
| |
| /* Inter-thread signalling (condition variables) are only needed when |
| * we use worker threads. */ |
| if (threaded_execution) |
| { |
| SVN_ERR(svn_thread_cond__create(&root->task_processed, scratch_pool)); |
| SVN_ERR(svn_thread_cond__create(&root->worker_wakeup, scratch_pool)); |
| } |
| |
| /* Permanently allocating a pool for each task is too expensive. |
| * So, we will allocate directly from this pool - which requires |
| * serialization of each apr_alloc() call. Therefore, we don't need |
| * the added overhead of allocator serialization here. */ |
| root->task_pool = |
| apr_allocator_owner_get(svn_pool_create_allocator(FALSE)); |
| |
| /* The sub-pools of these will used in a single thread each but created |
| * from different worker threads. Using allocator serialization is good |
| * enough for that. */ |
| root->process_pool = |
| apr_allocator_owner_get(svn_pool_create_allocator(threaded_execution)); |
| root->results_pool = |
| apr_allocator_owner_get(svn_pool_create_allocator(threaded_execution)); |
| |
| /* Be sure to clean the root pools up afterwards. */ |
| apr_pool_cleanup_register(scratch_pool, root, root_cleanup, |
| apr_pool_cleanup_null); |
| |
| callbacks.process_func = process_func; |
| callbacks.output_func = output_func; |
| callbacks.output_baton = output_baton; |
| |
| root->task = apr_pcalloc(scratch_pool, sizeof(*root->task)); |
| root->task->root = root; |
| root->task->first_ready = root->task; |
| root->task->callbacks = &callbacks; |
| root->task->process_baton = process_baton; |
| root->task->process_pool = svn_pool_create(root->process_pool); |
| |
| root->context_baton = context_baton; |
| root->context_constructor = context_constructor; |
| root->terminate = FALSE; |
| |
| /* Go, go, go! */ |
| if (threaded_execution) |
| { |
| SVN_ERR(execute_concurrently(root->task, thread_count, |
| cancel_func, cancel_baton, |
| result_pool, scratch_pool)); |
| } |
| else |
| { |
| SVN_ERR(execute_serially(root->task, |
| cancel_func, cancel_baton, |
| result_pool, scratch_pool)); |
| } |
| |
| return SVN_NO_ERROR; |
| } |