/* batch_fsync.c --- efficiently fsync multiple targets
 *
 * ====================================================================
 *    Licensed to the Apache Software Foundation (ASF) under one
 *    or more contributor license agreements.  See the NOTICE file
 *    distributed with this work for additional information
 *    regarding copyright ownership.  The ASF licenses this file
 *    to you under the Apache License, Version 2.0 (the
 *    "License"); you may not use this file except in compliance
 *    with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing,
 *    software distributed under the License is distributed on an
 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *    KIND, either express or implied.  See the License for the
 *    specific language governing permissions and limitations
 *    under the License.
 * ====================================================================
 */

#include <apr_thread_pool.h>

#include "batch_fsync.h"
#include "svn_pools.h"
#include "svn_hash.h"
#include "svn_dirent_uri.h"
#include "svn_private_config.h"

#include "private/svn_atomic.h"
#include "private/svn_dep_compat.h"
#include "private/svn_mutex.h"
#include "private/svn_subr_private.h"
#include "private/svn_thread_cond.h"
#include "private/svn_waitable_counter.h"

/* Handy macro to check APR function results and turning them into
 * svn_error_t upon failure. */
#define WRAP_APR_ERR(x,msg)                     \
  {                                             \
    apr_status_t status_ = (x);                 \
    if (status_)                                \
      return svn_error_wrap_apr(status_, msg);  \
  }

/* Entry type for the svn_fs_x__batch_fsync_t collection.  There is one
 * instance per file handle.
 */
typedef struct to_sync_t
{
  /* Open handle of the file / directory to fsync. */
  apr_file_t *file;

  /* Pool to use with FILE.  It is private to FILE such that it can be
   * used safely together with FILE in a separate thread. */
  apr_pool_t *pool;

  /* Result of the file operations. */
  svn_error_t *result;

  /* Counter to increment when we completed the task. */
  svn_waitable_counter_t *counter;
} to_sync_t;

/* The actual collection object. */
struct svn_fs_x__batch_fsync_t
{
  /* Maps open file handles: C-string path to to_sync_t *. */
  apr_hash_t *files;

  /* Counts the number of completed fsync tasks. */
  svn_waitable_counter_t *counter;

  /* Perform fsyncs only if this flag has been set. */
  svn_boolean_t flush_to_disk;
};

/* Data structures for concurrent fsync execution are only available if
 * we have threading support.
 */
#if APR_HAS_THREADS

/* Number of microseconds that an unused thread remains in the pool before
 * being terminated.
 *
 * Higher values are useful if clients frequently send small requests and
 * you want to minimize the latency for those.
 */
#define THREADPOOL_THREAD_IDLE_LIMIT 1000000

/* Maximum number of threads in THREAD_POOL, i.e. number of paths we can
 * fsync concurrently throughout the process. */
#define MAX_THREADS 16

/* Thread pool to execute the fsync tasks. */
static apr_thread_pool_t *thread_pool = NULL;

#endif

/* Keep track on whether we already created the THREAD_POOL . */
static svn_atomic_t thread_pool_initialized = FALSE;

/* We open non-directory files with these flags. */
#define FILE_FLAGS (APR_READ | APR_WRITE | APR_BUFFERED | APR_CREATE)

#if APR_HAS_THREADS

/* Destructor function that implicitly cleans up any running threads
   in the TRHEAD_POOL *once*.

   Must be run as a pre-cleanup hook.
 */
static apr_status_t
thread_pool_pre_cleanup(void *data)
{
  apr_thread_pool_t *tp = thread_pool;
  if (!thread_pool)
    return APR_SUCCESS;

  thread_pool = NULL;
  thread_pool_initialized = FALSE;

  return apr_thread_pool_destroy(tp);
}

#endif

/* Core implementation of svn_fs_x__batch_fsync_init. */
static svn_error_t *
create_thread_pool(void *baton,
                   apr_pool_t *owning_pool)
{
#if APR_HAS_THREADS
  /* The thread-pool must be allocated from a thread-safe pool.
     GLOBAL_POOL may be single-threaded, though. */
  apr_pool_t *pool = svn_pool_create(NULL);

  /* This thread pool will get cleaned up automatically when GLOBAL_POOL
     gets cleared.  No additional cleanup callback is needed. */
  WRAP_APR_ERR(apr_thread_pool_create(&thread_pool, 0, MAX_THREADS, pool),
               _("Can't create fsync thread pool in FSX"));

  /* Work around an APR bug:  The cleanup must happen in the pre-cleanup
     hook instead of the normal cleanup hook.  Otherwise, the sub-pools
     containing the thread objects would already be invalid. */
  apr_pool_pre_cleanup_register(pool, NULL, thread_pool_pre_cleanup);
  apr_pool_pre_cleanup_register(owning_pool, NULL, thread_pool_pre_cleanup);

  /* let idle threads linger for a while in case more requests are
     coming in */
  apr_thread_pool_idle_wait_set(thread_pool, THREADPOOL_THREAD_IDLE_LIMIT);

  /* don't queue requests unless we reached the worker thread limit */
  apr_thread_pool_threshold_set(thread_pool, 0);

#endif

  return SVN_NO_ERROR;
}

svn_error_t *
svn_fs_x__batch_fsync_init(apr_pool_t *owning_pool)
{
  /* Protect against multiple calls. */
  return svn_error_trace(svn_atomic__init_once(&thread_pool_initialized,
                                               create_thread_pool,
                                               NULL, owning_pool));
}

/* Destructor for svn_fs_x__batch_fsync_t.  Releases all global pool memory
 * and closes all open file handles. */
static apr_status_t
fsync_batch_cleanup(void *data)
{
  svn_fs_x__batch_fsync_t *batch = data;
  apr_hash_index_t *hi;

  /* Close all files (implicitly) and release memory. */
  for (hi = apr_hash_first(apr_hash_pool_get(batch->files), batch->files);
       hi;
       hi = apr_hash_next(hi))
    {
      to_sync_t *to_sync = apr_hash_this_val(hi);
      svn_pool_destroy(to_sync->pool);
    }

  return APR_SUCCESS;
}

svn_error_t *
svn_fs_x__batch_fsync_create(svn_fs_x__batch_fsync_t **result_p,
                             svn_boolean_t flush_to_disk,
                             apr_pool_t *result_pool)
{
  svn_fs_x__batch_fsync_t *result = apr_pcalloc(result_pool, sizeof(*result));
  result->files = svn_hash__make(result_pool);
  result->flush_to_disk = flush_to_disk;

  SVN_ERR(svn_waitable_counter__create(&result->counter, result_pool));
  apr_pool_cleanup_register(result_pool, result, fsync_batch_cleanup,
                            apr_pool_cleanup_null);

  *result_p = result;

  return SVN_NO_ERROR;
}

/* If BATCH does not contain a handle for PATH, yet, create one with FLAGS
 * and add it to BATCH.  Set *FILE to the open file handle.
 * Use SCRATCH_POOL for temporaries.
 */
static svn_error_t *
internal_open_file(apr_file_t **file,
                   svn_fs_x__batch_fsync_t *batch,
                   const char *path,
                   apr_int32_t flags,
                   apr_pool_t *scratch_pool)
{
  svn_error_t *err;
  apr_pool_t *pool;
  to_sync_t *to_sync;
#ifdef SVN_ON_POSIX
  svn_boolean_t is_new_file;
#endif

  /* If we already have a handle for PATH, return that. */
  to_sync = svn_hash_gets(batch->files, path);
  if (to_sync)
    {
      *file = to_sync->file;
      return SVN_NO_ERROR;
    }

  /* Calling fsync in PATH is going to be expensive in any case, so we can
   * allow for some extra overhead figuring out whether the file already
   * exists.  If it doesn't, be sure to schedule parent folder updates, if
   * required on this platform.
   *
   * See svn_fs_x__batch_fsync_new_path() for when such extra fsyncs may be
   * needed at all. */

#ifdef SVN_ON_POSIX

  is_new_file = FALSE;
  if (flags & APR_CREATE)
    {
      svn_node_kind_t kind;
      /* We might actually be about to create a new file.
       * Check whether the file already exists. */
      SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
      is_new_file = kind == svn_node_none;
    }

#endif

  /* To be able to process each file in a separate thread, they must use
   * separate, thread-safe pools.  Allocating a sub-pool from the standard
   * memory pool achieves exactly that. */
  pool = svn_pool_create(NULL);
  err = svn_io_file_open(file, path, flags, APR_OS_DEFAULT, pool);
  if (err)
    {
      svn_pool_destroy(pool);
      return svn_error_trace(err);
    }

  to_sync = apr_pcalloc(pool, sizeof(*to_sync));
  to_sync->file = *file;
  to_sync->pool = pool;
  to_sync->result = SVN_NO_ERROR;
  to_sync->counter = batch->counter;

  svn_hash_sets(batch->files,
                apr_pstrdup(apr_hash_pool_get(batch->files), path),
                to_sync);

  /* If we just created a new file, schedule any additional necessary fsyncs.
   * Note that this can only recurse once since the parent folder already
   * exists on disk. */
#ifdef SVN_ON_POSIX

  if (is_new_file)
    SVN_ERR(svn_fs_x__batch_fsync_new_path(batch, path, scratch_pool));

#endif

  return SVN_NO_ERROR;
}

svn_error_t *
svn_fs_x__batch_fsync_open_file(apr_file_t **file,
                                svn_fs_x__batch_fsync_t *batch,
                                const char *filename,
                                apr_pool_t *scratch_pool)
{
  apr_off_t offset = 0;

  SVN_ERR(internal_open_file(file, batch, filename, FILE_FLAGS,
                             scratch_pool));
  SVN_ERR(svn_io_file_seek(*file, APR_SET, &offset, scratch_pool));

  return SVN_NO_ERROR;
}

svn_error_t *
svn_fs_x__batch_fsync_new_path(svn_fs_x__batch_fsync_t *batch,
                               const char *path,
                               apr_pool_t *scratch_pool)
{
  apr_file_t *file;

#ifdef SVN_ON_POSIX

  /* On POSIX, we need to sync the parent directory because it contains
   * the name for the file / folder given by PATH. */
  path = svn_dirent_dirname(path, scratch_pool);
  SVN_ERR(internal_open_file(&file, batch, path, APR_READ, scratch_pool));

#else

  svn_node_kind_t kind;

  /* On non-POSIX systems, we assume that sync'ing the given PATH is the
   * right thing to do.  Also, we assume that only files may be sync'ed. */
  SVN_ERR(svn_io_check_path(path, &kind, scratch_pool));
  if (kind == svn_node_file)
    SVN_ERR(internal_open_file(&file, batch, path, FILE_FLAGS,
                               scratch_pool));

#endif

  return SVN_NO_ERROR;
}

/* Thread-pool task Flush the to_sync_t instance given by DATA. */
static void * APR_THREAD_FUNC
flush_task(apr_thread_t *tid,
           void *data)
{
  to_sync_t *to_sync = data;

  to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
                                        (to_sync->file, to_sync->pool));

  /* As soon as the increment call returns, TO_SYNC may be invalid
     (the main thread may have woken up and released the struct.

     Therefore, we cannot chain this error into TO_SYNC->RESULT.
     OTOH, the main thread will probably deadlock anyway if we got
     an error here, thus there is no point in trying to tell the
     main thread what the problem was. */
  svn_error_clear(svn_waitable_counter__increment(to_sync->counter));

  return NULL;
}

svn_error_t *
svn_fs_x__batch_fsync_run(svn_fs_x__batch_fsync_t *batch,
                          apr_pool_t *scratch_pool)
{
  apr_hash_index_t *hi;

  /* Number of tasks sent to the thread pool. */
  int tasks = 0;

  /* Because we allocated the open files from our global pool, don't bail
   * out on the first error.  Instead, process all files and but accumulate
   * the errors in this chain.
   */
  svn_error_t *chain = SVN_NO_ERROR;

  /* First, flush APR-internal buffers. This should minimize / prevent the
   * introduction of additional meta-data changes during the next phase.
   * We might otherwise issue redundant fsyncs.
   */
  for (hi = apr_hash_first(scratch_pool, batch->files);
       hi;
       hi = apr_hash_next(hi))
    {
      to_sync_t *to_sync = apr_hash_this_val(hi);
      to_sync->result = svn_error_trace(svn_io_file_flush
                                           (to_sync->file, to_sync->pool));
    }

  /* Make sure the task completion counter is set to 0. */
  chain = svn_error_compose_create(chain,
                                   svn_waitable_counter__reset(batch->counter));

  /* Start the actual fsyncing process. */
  if (batch->flush_to_disk)
    {
      for (hi = apr_hash_first(scratch_pool, batch->files);
           hi;
           hi = apr_hash_next(hi))
        {
          to_sync_t *to_sync = apr_hash_this_val(hi);

#if APR_HAS_THREADS

          /* Forgot to call _init() or cleaned up the owning pool too early?
           */
          SVN_ERR_ASSERT(thread_pool);

          /* If there are multiple fsyncs to perform, run them in parallel.
           * Otherwise, skip the thread-pool and synchronization overhead. */
          if (apr_hash_count(batch->files) > 1)
            {
              apr_status_t status = APR_SUCCESS;
              status = apr_thread_pool_push(thread_pool, flush_task, to_sync,
                                            0, NULL);
              if (status)
                to_sync->result = svn_error_wrap_apr(status,
                                                     _("Can't push task"));
              else
                tasks++;
            }
          else

#endif

            {
              to_sync->result = svn_error_trace(svn_io_file_flush_to_disk
                                                  (to_sync->file,
                                                   to_sync->pool));
            }
        }
    }

  /* Wait for all outstanding flush operations to complete. */
  chain = svn_error_compose_create(chain,
                                   svn_waitable_counter__wait_for(
                                       batch->counter, tasks));

  /* Collect the results, close all files and release memory. */
  for (hi = apr_hash_first(scratch_pool, batch->files);
       hi;
       hi = apr_hash_next(hi))
    {
      to_sync_t *to_sync = apr_hash_this_val(hi);
      if (batch->flush_to_disk)
        chain = svn_error_compose_create(chain, to_sync->result);

      chain = svn_error_compose_create(chain,
                                       svn_io_file_close(to_sync->file,
                                                         scratch_pool));
      svn_pool_destroy(to_sync->pool);
    }

  /* Don't process any file / folder twice. */
  apr_hash_clear(batch->files);

  /* Report the errors that we encountered. */
  return svn_error_trace(chain);
}
