blob: 0ef9a1668f722a6488bc148850f0e7c087405036 [file] [log] [blame]
/* batch_fsync.c --- efficiently fsync multiple targets
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include <apr_thread_pool.h>
#include "batch_fsync.h"
#include "svn_pools.h"
#include "svn_hash.h"
#include "svn_dirent_uri.h"
#include "svn_private_config.h"
#include "private/svn_atomic.h"
#include "private/svn_dep_compat.h"
#include "private/svn_mutex.h"
#include "private/svn_subr_private.h"
#include "private/svn_thread_cond.h"
#include "private/svn_waitable_counter.h"
/* Handy macro to check APR function results and turning them into
* svn_error_t upon failure. */
#define WRAP_APR_ERR(x,msg) \
{ \
apr_status_t status_ = (x); \
if (status_) \
return svn_error_wrap_apr(status_, msg); \
}
/* Entry type for the svn_fs_x__batch_fsync_t collection. There is one
* instance per file handle.
*/
typedef struct to_sync_t
{
/* Open handle of the file / directory to fsync. */
apr_file_t *file;
/* Pool to use with FILE. It is private to FILE such that it can be
* used safely together with FILE in a separate thread. */
apr_pool_t *pool;
/* Result of the file operations. */
svn_error_t *result;
/* Counter to increment when we completed the task. */
svn_waitable_counter_t *counter;
} to_sync_t;
/* The actual collection object. */
struct svn_fs_x__batch_fsync_t
{
/* Maps open file handles: C-string path to to_sync_t *. */
apr_hash_t *files;
/* Counts the number of completed fsync tasks. */
svn_waitable_counter_t *counter;
/* Perform fsyncs only if this flag has been set. */
svn_boolean_t flush_to_disk;
};
/* Data structures for concurrent fsync execution are only available if
* we have threading support.
*/
#if APR_HAS_THREADS
/* Number of microseconds that an unused thread remains in the pool before
* being terminated.
*
* Higher values are useful if clients frequently send small requests and
* you want to minimize the latency for those.
*/
#define THREADPOOL_THREAD_IDLE_LIMIT 1000000
/* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
* fsync concurrently throughout the process. */
#define MAX_THREADS 16
/* Thread pool to execute the fsync tasks. */
static apr_thread_pool_t *thread_pool = NULL;
#endif
/* Keep track on whether we already created the THREAD_POOL . */
static svn_atomic_t thread_pool_initialized = FALSE;
/* We open non-directory files with these flags. */
#define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)
#if APR_HAS_THREADS
/* Destructor function that implicitly cleans up any running threads
in the TRHEAD_POOL *once*.
Must be run as a pre-cleanup hook.
*/
static apr_status_t
thread_pool_pre_cleanup(void *data)
{
apr_thread_pool_t *tp = thread_pool;
if (!thread_pool)
return APR_SUCCESS;
thread_pool = NULL;
thread_pool_initialized = FALSE;
return apr_thread_pool_destroy(tp);
}
#endif
/* Core implementation of svn_fs_x__batch_fsync_init. */
static svn_error_t *
create_thread_pool(void *baton,
apr_pool_t *owning_pool)
{
#if APR_HAS_THREADS
/* The thread-pool must be allocated from a thread-safe pool.
GLOBAL_POOL may be single-threaded, though. */
apr_pool_t *pool = svn_pool_create(NULL);
/* This thread pool will get cleaned up automatically when GLOBAL_POOL
gets cleared. No additional cleanup callback is needed. */
WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
_("Can't create fsync thread pool in FSX"));
/* Work around an APR bug: The cleanup must happen in the pre-cleanup
hook instead of the normal cleanup hook. Otherwise, the sub-pools
containing the thread objects would already be invalid. */
apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);
/* let idle threads linger for a while in case more requests are
coming in */
apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);
/* don't queue requests unless we reached the worker thread limit */
apr_thread_pool_threshold_set(thread_pool, 0);
#endif
return SVN_NO_ERROR;
}
svn_error_t *
svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
{
/* Protect against multiple calls. */
return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
create_thread_pool,
NULL, owning_pool));
}
/* Destructor for svn_fs_x__batch_fsync_t. Releases all global pool memory
* and closes all open file handles. */
static apr_status_t
fsync_batch_cleanup(void *data)
{
svn_fs_x__batch_fsync_t *batch = data;
apr_hash_index_t *hi;
/* Close all files (implicitly) and release memory. */
for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
hi;
hi = apr_hash_next(hi))
{
to_sync_t *to_sync = apr_hash_this_val(hi);
svn_pool_destroy(to_sync->pool);
}
return APR_SUCCESS;
}
svn_error_t *
svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
svn_boolean_t flush_to_disk,
apr_pool_t *result_pool)
{
svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
result->files = svn_hash__make(result_pool);
result->flush_to_disk = flush_to_disk;
SVN_ERR(svn_waitable_counter__create(&result->counter, result_pool));
apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
apr_pool_cleanup_null);
*result_p = result;
return SVN_NO_ERROR;
}
/* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
* and add it to BATCH. Set *FILE to the open file handle.
* Use SCRATCH_POOL for temporaries.
*/
static svn_error_t *
internal_open_file(apr_file_t **file,
svn_fs_x__batch_fsync_t *batch,
const char *path,
apr_int32_t flags,
apr_pool_t *scratch_pool)
{
svn_error_t *err;
apr_pool_t *pool;
to_sync_t *to_sync;
#ifdef SVN_ON_POSIX
svn_boolean_t is_new_file;
#endif
/* If we already have a handle for PATH, return that. */
to_sync = svn_hash_gets(batch->files, path);
if (to_sync)
{
*file = to_sync->file;
return SVN_NO_ERROR;
}
/* Calling fsync in PATH is going to be expensive in any case, so we can
* allow for some extra overhead figuring out whether the file already
* exists. If it doesn't, be sure to schedule parent folder updates, if
* required on this platform.
*
* See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
* needed at all. */
#ifdef SVN_ON_POSIX
is_new_file = FALSE;
if (flags & APR_CREATE)
{
svn_node_kind_t kind;
/* We might actually be about to create a new file.
* Check whether the file already exists. */
SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
is_new_file = kind == svn_node_none;
}
#endif
/* To be able to process each file in a separate thread, they must use
* separate, thread-safe pools. Allocating a sub-pool from the standard
* memory pool achieves exactly that. */
pool = svn_pool_create(NULL);
err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
if (err)
{
svn_pool_destroy(pool);
return svn_error_trace(err);
}
to_sync = apr_pcalloc(pool, sizeof(*to_sync));
to_sync->file = *file;
to_sync->pool = pool;
to_sync->result = SVN_NO_ERROR;
to_sync->counter = batch->counter;
svn_hash_sets(batch->files,
apr_pstrdup(apr_hash_pool_get(batch->files), path),
to_sync);
/* If we just created a new file, schedule any additional necessary fsyncs.
* Note that this can only recurse once since the parent folder already
* exists on disk. */
#ifdef SVN_ON_POSIX
if (is_new_file)
SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));
#endif
return SVN_NO_ERROR;
}
svn_error_t *
svn_fs_x__batch_fsync_open_file(apr_file_t **file,
svn_fs_x__batch_fsync_t *batch,
const char *filename,
apr_pool_t *scratch_pool)
{
apr_off_t offset = 0;
SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
scratch_pool));
SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));
return SVN_NO_ERROR;
}
svn_error_t *
svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
const char *path,
apr_pool_t *scratch_pool)
{
apr_file_t *file;
#ifdef SVN_ON_POSIX
/* On POSIX, we need to sync the parent directory because it contains
* the name for the file / folder given by PATH. */
path = svn_dirent_dirname(path, scratch_pool);
SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));
#else
svn_node_kind_t kind;
/* On non-POSIX systems, we assume that sync'ing the given PATH is the
* right thing to do. Also, we assume that only files may be sync'ed. */
SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
if (kind == svn_node_file)
SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
scratch_pool));
#endif
return SVN_NO_ERROR;
}
/* Thread-pool task Flush the to_sync_t instance given by DATA. */
static void * APR_THREAD_FUNC
flush_task(apr_thread_t *tid,
void *data)
{
to_sync_t *to_sync = data;
to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
(to_sync->file, to_sync->pool));
/* As soon as the increment call returns, TO_SYNC may be invalid
(the main thread may have woken up and released the struct.
Therefore, we cannot chain this error into TO_SYNC->RESULT.
OTOH, the main thread will probably deadlock anyway if we got
an error here, thus there is no point in trying to tell the
main thread what the problem was. */
svn_error_clear(svn_waitable_counter__increment(to_sync->counter));
return NULL;
}
svn_error_t *
svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
apr_pool_t *scratch_pool)
{
apr_hash_index_t *hi;
/* Number of tasks sent to the thread pool. */
int tasks = 0;
/* Because we allocated the open files from our global pool, don't bail
* out on the first error. Instead, process all files and but accumulate
* the errors in this chain.
*/
svn_error_t *chain = SVN_NO_ERROR;
/* First, flush APR-internal buffers. This should minimize / prevent the
* introduction of additional meta-data changes during the next phase.
* We might otherwise issue redundant fsyncs.
*/
for (hi = apr_hash_first(scratch_pool, batch->files);
hi;
hi = apr_hash_next(hi))
{
to_sync_t *to_sync = apr_hash_this_val(hi);
to_sync->result = svn_error_trace(svn_io_file_flush
(to_sync->file, to_sync->pool));
}
/* Make sure the task completion counter is set to 0. */
chain = svn_error_compose_create(chain,
svn_waitable_counter__reset(batch->counter));
/* Start the actual fsyncing process. */
if (batch->flush_to_disk)
{
for (hi = apr_hash_first(scratch_pool, batch->files);
hi;
hi = apr_hash_next(hi))
{
to_sync_t *to_sync = apr_hash_this_val(hi);
#if APR_HAS_THREADS
/* Forgot to call _init() or cleaned up the owning pool too early?
*/
SVN_ERR_ASSERT(thread_pool);
/* If there are multiple fsyncs to perform, run them in parallel.
* Otherwise, skip the thread-pool and synchronization overhead. */
if (apr_hash_count(batch->files) > 1)
{
apr_status_t status = APR_SUCCESS;
status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
0, NULL);
if (status)
to_sync->result = svn_error_wrap_apr(status,
_("Can't push task"));
else
tasks++;
}
else
#endif
{
to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
(to_sync->file,
to_sync->pool));
}
}
}
/* Wait for all outstanding flush operations to complete. */
chain = svn_error_compose_create(chain,
svn_waitable_counter__wait_for(
batch->counter, tasks));
/* Collect the results, close all files and release memory. */
for (hi = apr_hash_first(scratch_pool, batch->files);
hi;
hi = apr_hash_next(hi))
{
to_sync_t *to_sync = apr_hash_this_val(hi);
if (batch->flush_to_disk)
chain = svn_error_compose_create(chain, to_sync->result);
chain = svn_error_compose_create(chain,
svn_io_file_close(to_sync->file,
scratch_pool));
svn_pool_destroy(to_sync->pool);
}
/* Don't process any file / folder twice. */
apr_hash_clear(batch->files);
/* Report the errors that we encountered. */
return svn_error_trace(chain);
}