blob: 333f6251c1e7a871ddcd325c2e39dd97b65ddf68 [file] [log] [blame]
/* task.c : Implement the parallel task execution machine.
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include "private/svn_task.h"
#include <assert.h>
#include "private/svn_atomic.h"
#include "private/svn_thread_cond.h"
/* Top of the task tree.
*
* It is accessible from all tasks and contains all necessary resource
* pools and synchronization mechanisms.
*/
typedef struct root_t
{
/* The actual root task. */
svn_task__t *task;
/* Pools "segregated" for reduced lock contention when multi-threading.
* Ordered by lifetime of the objects allocated in them (long to short).
*/
/* Allocate tasks and callbacks here.
* These have the longest lifetimes and will (currently) not be released
* until this root object gets cleaned up.
*/
apr_pool_t *task_pool;
/* Allocate per-task parameters (process_baton) from sub-pools of this.
* They should be cleaned up as soon as the respective task has been
* processed (the parameters will not be needed anymore).
*/
apr_pool_t *process_pool;
/* Allocate per-task output_t as well as the actual outputs here.
* Allocation will happen just before calling the processing function.
* Release the memory immediately afterwards, unless some actual output
* has been produced.
*/
apr_pool_t *output_pool;
/* Context construction parameters as passed in to svn_task__run(). */
svn_task__thread_context_constructor_t context_constructor;
void *context_baton;
} root_t;
/* Sub-structure of svn_task__t containing that task's output.
*/
typedef struct output_t
{
/* (Last part of the) output produced by the task.
* If the task has sub-tasks, additional output (produced before creating
* each sub-task) may be found in the respective sub-task's
* PRIOR_PARENT_OUTPUT. NULL, if no output was produced.
*/
void *output;
/* Error code returned by the proccessing function. */
svn_error_t *error;
/* Parent task's output before this task has been created, i.e. the part
* that shall be passed to the output function before this task's output.
* NULL, if there is no prior parent output.
*
* This has to be allocated in the parent's OUTPUT->POOL.
*/
void *prior_parent_output;
/* The tasks' output may be split into multiple parts, produced before
* and in between sub-tasks. Those will be stored in the OUTPUT structs
* of those sub-tasks but have been allocated in (their parent's) POOL.
*
* This flag indicates if such partial results exist. POOL must not be
* destroyed in that case, until all sub-tasks' outputs have been handled.
*/
svn_boolean_t has_partial_results;
/* Pool used to allocate this structure as well as the contents of OUTPUT
* and PRIOR_PARENT_OUTPUT in any immediate sub-task. */
apr_pool_t *pool;
} output_t;
/* The task's callbacks.
*
* We keep them in a separate structure such that they may be shared
* easily between task & sub-task.
*/
typedef struct callbacks_t
{
/* Process function to call.
* The respective baton is task-specific and held in svn_task__t.
*
* NULL is legal here (for stability reasons and maybe future extensions)
* but pointless as no processing will happen and no output can be
* produced, in turn bypassing OUTPUT_FUNC.
*/
svn_task__process_func_t process_func;
/* Output function to call, if there was output. */
svn_task__output_func_t output_func;
/* Baton to pass into OUTPUT_FUNC. */
void *output_baton;
} callbacks_t;
/* Task main data structure - a node in the task tree.
*/
struct svn_task__t
{
/* Root object where all allocation & synchronization happens. */
root_t *root;
/* Tree structure */
/* Parent task.
* NULL for the root node:
* (PARENT==NULL) == (*this==ROOT->TASK).
*/
svn_task__t *parent;
/* First immediate sub-task (in creation order). */
svn_task__t *first_sub;
/* Latest immediate sub-task (in creation order). */
svn_task__t *last_sub;
/* Next sibling, i.e. next in the list of PARENT's immediate sub-tasks
* (in creation order). */
svn_task__t *next;
/* Index of this task within the PARENT's sub-task list, i.e. the number
* of siblings created before this one. The value will *not* be adjusted
* should prior siblings be removed.
*
* This will be used to efficiently determine before / after relationships
* between arbitrary siblings.
*/
apr_size_t sub_task_idx;
/* Efficiently track tasks that need processing */
/* The first task, in pre-order, of this sub-tree whose processing has not
* been started yet. This will be NULL, iff for all tasks in this sub-tree,
* processing has at least been started. If this==FIRST_READY, this task
* itself waits for being proccessed.
*
* Note that immediate and/or indirect sub-tasks may already have been
* processed before this one.
*/
svn_task__t *first_ready;
/* Task state. */
/* The callbacks to use. Never NULL. */
callbacks_t *callbacks;
/* Process baton to pass into CALLBACKS->PROCESS_FUNC. */
void *process_baton;
/* Pool used to allocate the PROCESS_BATON.
* Sub-pool of ROOT->PROCESS_POOL.
*
* NULL, iff processing of this task has completed (sub-tasks may still
* need processing). Used to check whether processing has been completed.
*/
apr_pool_t *process_pool;
/* The processing results.
*
* Will be NULL before processing and may be NULL afterwards, if all
* fields would be NULL.
*/
output_t *output;
};
/* Adding tasks to the tree. */
/* Return the index of the first immediate sub-task of TASK with a ready
* sub-task in its respective sub-tree. TASK must have at least one such
* sub-task.
*/
static apr_size_t first_ready_sub_task_idx(const svn_task__t *task)
{
svn_task__t *sub_task = task->first_ready;
assert(sub_task);
while (sub_task->parent != task)
sub_task = sub_task->parent;
return sub_task->sub_task_idx;
}
/* Link TASK up with TASK->PARENT. */
static svn_error_t *link_new_task(svn_task__t *task)
{
svn_task__t *current, *parent;
/* Insert into parent's sub-task list. */
if (task->parent->last_sub)
{
task->parent->last_sub->next = task;
task->sub_task_idx = task->parent->last_sub->sub_task_idx + 1;
}
task->parent->last_sub = task;
if (!task->parent->first_sub)
task->parent->first_sub = task;
/* TASK is ready for execution.
*
* It may be the first one in pre-order. Update parents until they
* have a "FIRST_READY" in a sub-tree before (in pre-order) the one
* containing TASK. */
for (current = task, parent = task->parent;
parent
&& ( !parent->first_ready
|| first_ready_sub_task_idx(parent) >= current->sub_task_idx);
current = parent, parent = parent->parent)
{
parent->first_ready = task;
}
return SVN_NO_ERROR;
}
/* If TASK has no OUTPUT sub-structure, add one. Return the OUTPUT struct.
*
* In multi-threaded environments, calls to this must be serialized with
* root_t changes. */
static output_t *ensure_output(svn_task__t *task)
{
if (!task->output)
{
apr_pool_t * output_pool = svn_pool_create(task->root->output_pool);
task->output = apr_pcalloc(output_pool, sizeof(*task->output));
task->output->pool = output_pool;
}
return task->output;
}
/* Allocate a new task in POOL and return it in *RESULT.
*
* In multi-threaded environments, calls to this must be serialized with
* root_t changes. */
static svn_error_t *alloc_task(svn_task__t **result, apr_pool_t *pool)
{
*result = apr_pcalloc(pool, sizeof(**result));
return SVN_NO_ERROR;
}
/* Allocate a new callbacks structure in POOL and return it in *RESULT.
*
* In multi-threaded environments, calls to this must be serialized with
* root_t changes. */
static svn_error_t *alloc_callbacks(callbacks_t **result, apr_pool_t *pool)
{
*result = apr_pcalloc(pool, sizeof(**result));
return SVN_NO_ERROR;
}
/* Allocate a new task and append it to PARENT's sub-task list.
* Store PROCESS_POOL, CALLBACKS and PROCESS_BATON in the respective
* fields of the task struct. If PARTIAL_OUTPUT is not NULL, it will
* be stored in the new task's OUTPUT structure.
*
* This function does not return the new struct. Instead, it is scheduled
* to eventually be picked up by the task runner, i.e. execution is fully
* controlled by the execution model and sub-tasks may only be added when
* the new task itself is being processed.
*/
static svn_error_t *add_task(
svn_task__t *parent,
apr_pool_t *process_pool,
void *partial_output,
callbacks_t *callbacks,
void *process_baton)
{
svn_task__t *new_task;
SVN_ERR(alloc_task(&new_task, parent->root->task_pool));
new_task->root = parent->root;
new_task->process_baton = process_baton;
new_task->process_pool = process_pool;
new_task->parent = parent;
if (partial_output && parent->callbacks->output_func)
{
ensure_output(parent)->has_partial_results = TRUE;
ensure_output(new_task)->prior_parent_output = partial_output;
}
/* The new task will be ready for execution once we link it up in PARENT. */
new_task->first_ready = new_task;
new_task->callbacks = callbacks;
SVN_ERR(link_new_task(new_task));
return SVN_NO_ERROR;
}
svn_error_t *svn_task__add(
svn_task__t *current,
apr_pool_t *process_pool,
void *partial_output,
svn_task__process_func_t process_func,
void *process_baton,
svn_task__output_func_t output_func,
void *output_baton)
{
callbacks_t *callbacks;
SVN_ERR(alloc_callbacks(&callbacks, current->root->task_pool));
callbacks->process_func = process_func;
callbacks->output_func = output_func;
callbacks->output_baton = output_baton;
return svn_error_trace(add_task(current, process_pool, partial_output,
callbacks, process_baton));
}
svn_error_t* svn_task__add_similar(
svn_task__t* current,
apr_pool_t *process_pool,
void* partial_output,
void* process_baton)
{
return svn_error_trace(add_task(current, process_pool, partial_output,
current->callbacks, process_baton));
}
apr_pool_t *svn_task__create_process_pool(
svn_task__t *parent)
{
return svn_pool_create(parent->root->process_pool);
}
/* Removing tasks from the tree */
/* Remove TASK from the parent tree.
* TASK must have been fully processed and there shall be no more sub-tasks.
*/
static svn_error_t *remove_task(svn_task__t *task)
{
svn_task__t *parent = task->parent;
assert(task->first_ready == NULL);
assert(task->first_sub == NULL);
if (parent)
{
if (parent->first_sub == task)
parent->first_sub = task->next;
if (parent->last_sub == task)
parent->last_sub = NULL;
}
return SVN_NO_ERROR;
}
/* Recursively free all errors in TASK.
*
* Don't try to clean up pools etc. because we will call this function
* mainly to prevent error leaks when terminating the task runner early.
* It is better to have the standard pool cleanups to deal with memory
* management in that case.
*/
static void free_sub_tasks(svn_task__t *task)
{
while (task->first_sub)
free_sub_tasks(task->first_sub);
if (task->output)
svn_error_clear(task->output->error);
if (task->parent && task->parent->first_sub == task)
task->parent->first_sub = task->next;
}
/* Picking the next task to process */
/* Utility function that follows the chain of siblings and returns the first
* that has *some* unprocessed task in its sub-tree.
*
* Returns TASK if either TASK or any of its sub-tasks is unprocessed.
* Returns NULL if all direct or indirect sub-tasks of TASK->PARENT are
* already being processed or have been completed.
*/
static svn_task__t *next_ready(svn_task__t *task)
{
for (; task; task = task->next)
if (task->first_ready)
break;
return task;
}
/* Mark TASK as no longer being unprocessed.
* Call this before starting actual processing of TASK.
*/
static void unready_task(svn_task__t *task)
{
svn_task__t *parent, *current;
svn_task__t *first_ready = NULL;
/* Make sure that processing on TASK has not already started. */
assert(task->first_ready == task);
/* Also, there should be no sub-tasks before processing this one.
* Sub-tasks may only be added by processing the immediate parent. */
assert(task->first_sub == NULL);
/* There are no sub-tasks, hence nothing in the sub-tree could be ready. */
task->first_ready = NULL;
/* Bubble up the tree while TASK is the "first ready" one.
* Update the pointers to the next one ready. */
for (current = task, parent = task->parent;
parent && (parent->first_ready == task);
current = parent, parent = parent->parent)
{
/* If we have not found another task that is ready, search the
* siblings for one. A suitable one cannot be *before* CURRENT
* or otherwise, PARENT->FIRST_READY would not equal TASK.
* It is possible that we won't find one at the current level. */
if (!first_ready)
{
svn_task__t *first_ready_sub_task = next_ready(current->next);
first_ready = first_ready_sub_task
? first_ready_sub_task->first_ready
: NULL;
}
parent->first_ready = first_ready;
}
}
/* Task processing and outputting results */
/* The forground output_processed() function will now consider TASK's
* processing function to be completed. Sub-tasks may still be pending. */
static void set_processed(svn_task__t *task)
{
task->process_pool = NULL;
}
/* Return whether TASK's processing function has been completed.
* Pending sub-tasks will be ignored. */
static svn_boolean_t is_processed(const svn_task__t *task)
{
return (task->process_pool == NULL);
}
/* Process a single TASK within the given THREAD_CONTEXT. It may add
* sub-tasks but those need separate calls to this function to be processed.
*
* Pass CANCEL_FUNC, CANCEL_BATON and SCRATCH_POOL to the TASK's process
* function.
*
* This will destroy TASK->PROCESS_POOL but will not reset the pointer.
* You have to do that explicitly by calling set_processed(). The reason
* is that in multi-threaded execution, you may want that transition to
* be atomic with other tree operations.
*/
static void process(svn_task__t *task,
void *thread_context,
svn_cancel_func_t cancel_func,
void *cancel_baton,
apr_pool_t *scratch_pool)
{
callbacks_t *callbacks = task->callbacks;
if (callbacks->process_func)
{
/* Depending on whether there is prior parent output, we may or
* may not have already an OUTPUT structure allocated for TASK. */
output_t *output = ensure_output(task);
output->error = callbacks->process_func(&output->output, task,
thread_context,
task->process_baton,
cancel_func, cancel_baton,
output->pool, scratch_pool);
/* If there is no way to output the results, we simply ignore them. */
if (!callbacks->output_func)
output->output = NULL;
/* Anything left that we may want to output? */
if ( !output->error
&& !output->output
&& !output->prior_parent_output
&& !output->has_partial_results)
{
/* Nope. Release the memory and reset OUTPUT such that
* output_processed() can quickly skip it. */
svn_pool_destroy(output->pool);
task->output = NULL;
}
}
svn_pool_destroy(task->process_pool);
}
/* Output *TASK results in post-order until we encounter a task that has not
* been processed, yet - which may be *TASK itself - and return it in *TASK.
*
* Pass CANCEL_FUNC, CANCEL_BATON and RESULT_POOL into the respective task
* output functions. Use SCRATCH_POOL for temporary allocations.
*/
static svn_error_t *output_processed(
svn_task__t **task,
svn_cancel_func_t cancel_func,
void *cancel_baton,
apr_pool_t *result_pool,
apr_pool_t *scratch_pool)
{
svn_task__t *current = *task;
apr_pool_t *iterpool = svn_pool_create(scratch_pool);
output_t *output;
callbacks_t *callbacks;
while (current && is_processed(current))
{
svn_pool_clear(iterpool);
/* Post-order, i.e. dive into sub-tasks first.
*
* Note that the "background" processing for CURRENT has completed
* and only the output function may add no further sub-tasks. */
if (current->first_sub)
{
current = current->first_sub;
output = current->output;
callbacks = current->parent->callbacks;
/* We will handle this sub-task in the next iteration but we
* may have to process results produced before or in between
* sub-tasks. Also note that PRIOR_PARENT_OUTPUT being true
* implies that OUTPUT_FUNC is not NULL. */
if (output && output->prior_parent_output)
SVN_ERR(callbacks->output_func(
current->parent, output->prior_parent_output,
callbacks->output_baton,
cancel_func, cancel_baton,
result_pool, iterpool));
}
else
{
/* No deeper sub-task. Process the results from CURRENT. */
output = current->output;
if (output)
{
/* Return errors.
* Make sure they don't get cleaned up with the task tree. */
svn_error_t *err = output->error;
output->error = SVN_NO_ERROR;
SVN_ERR(err);
/* Handle remaining output of the CURRENT task. */
callbacks = current->callbacks;
if (output->output)
SVN_ERR(callbacks->output_func(
current, output->output,
callbacks->output_baton,
cancel_func, cancel_baton,
result_pool, iterpool));
}
/* The output function may have added further sub-tasks.
* Handle those in the next iteration. */
if (!current->first_sub)
{
/* Task completed. No further sub-tasks.
* Remove this task from the tree and continue at the parent,
* recursing into the next sub-task (==NEXT, if not NULL)
* with the next iteration. */
svn_task__t *to_delete = current;
current = to_delete->parent;
SVN_ERR(remove_task(to_delete));
/* We have output all sub-nodes, including all partial results.
* Therefore, the last used thing allocated in OUTPUT->POOL is
* OUTPUT itself and it is safe to clean that up. */
if (output)
svn_pool_destroy(output->pool);
}
}
}
svn_pool_destroy(iterpool);
*task = current;
return SVN_NO_ERROR;
}
/* Execution models */
/* Run the (root) TASK to completion, including dynamically added sub-tasks.
* Pass CANCEL_FUNC and CANCEL_BATON directly into the task callbacks.
* Pass the RESULT_POOL into the task output functions and use SCRATCH_POOL
* for everything else (unless covered by task pools).
*/
static svn_error_t *execute_serially(
svn_task__t *task,
svn_cancel_func_t cancel_func,
void *cancel_baton,
apr_pool_t *result_pool,
apr_pool_t *scratch_pool)
{
root_t* root = task->root;
svn_error_t *task_err = SVN_NO_ERROR;
apr_pool_t *iterpool = svn_pool_create(scratch_pool);
/* Task to execute currently.
* Always the first unprocessed task in pre-order. */
svn_task__t *current = task;
/* The context may be quite complex, so we use the ITERPOOL to clean up
* any memory that was used temporarily during context creation. */
void *thread_context = NULL;
if (root->context_constructor)
SVN_ERR(root->context_constructor(&thread_context, root->context_baton,
scratch_pool, iterpool));
/* Process one task at a time, stop upon error or when the whole tree
* has been completed. */
while (current && !task_err)
{
svn_pool_clear(iterpool);
/* "would-be background" processing the CURRENT task. */
unready_task(current);
process(current, thread_context, cancel_func, cancel_baton, iterpool);
set_processed(current);
/* Output results in "forground" and move CURRENT to the next one
* needing processing. */
task_err = output_processed(&current,
cancel_func, cancel_baton,
result_pool, scratch_pool);
}
/* Explicitly release any (other) error and pool not cleaned up so far.
* This is important in the case of early exists due to error returns. */
free_sub_tasks(task);
svn_pool_destroy(iterpool);
/* Get rid of all remaining tasks. */
return svn_error_trace(task_err);
}
/* Root data structure */
svn_error_t *svn_task__run(
apr_int32_t thread_count,
svn_task__process_func_t process_func,
void *process_baton,
svn_task__output_func_t output_func,
void *output_baton,
svn_task__thread_context_constructor_t context_constructor,
void *context_baton,
svn_cancel_func_t cancel_func,
void *cancel_baton,
apr_pool_t *result_pool,
apr_pool_t *scratch_pool)
{
root_t *root = apr_pcalloc(scratch_pool, sizeof(*root));
/* Allocation on stack is fine as this function will not exit before
* all task processing has been completed. */
callbacks_t callbacks;
/* For now, we only have single-threaded execution.
* So, no special consideration required regarding pools & serialization.*/
root->task_pool = scratch_pool;
root->process_pool = scratch_pool;
root->output_pool = scratch_pool;
callbacks.process_func = process_func;
callbacks.output_func = output_func;
callbacks.output_baton = output_baton;
root->task = apr_pcalloc(scratch_pool, sizeof(*root->task));
root->task->root = root;
root->task->first_ready = root->task;
root->task->callbacks = &callbacks;
root->task->process_baton = process_baton;
root->task->process_pool = svn_pool_create(root->process_pool);
root->context_baton = context_baton;
root->context_constructor = context_constructor;
/* For now, there is only single-threaded execution. */
SVN_ERR(execute_serially(root->task,
cancel_func, cancel_baton,
result_pool, scratch_pool));
return SVN_NO_ERROR;
}