name: Formal RFC about: Submit a formal Request For Comments for consideration by the team. title: ‘Replicator Implementation On FDB’ labels: rfc, discussion assignees: ‘vatamane@apache.org


Introduction

This document describes the design of the replicator application for CouchDB 4.x. The replicator will rely on couch_jobs for centralized scheduling and monitoring of replication jobs.

Abstract

Replication jobs can be created from documents in _replicator databases, or by POST-ing requests to the HTTP /_replicate endpoint. Previously, in CouchDB <= 3.x, replication jobs were mapped to individual cluster nodes and a scheduler component would run up to max_jobs number of jobs at a time on each node. The new design proposes using couch_jobs, as described in the Background Jobs RFC, to have a central, FDB-based queue of replication jobs. couch_jobs application will manage job scheduling and coordination. The new design also proposes using heterogeneous node types as defined in the Node Types RFC such that replication jobs will be created only on api_frontend nodes and run only on replication nodes.

Requirements Language

The key words “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”, “SHOULD NOT”, “RECOMMENDED”, “MAY”, and “OPTIONAL” in this document are to be interpreted as described in RFC 2119.

Terminology

_replicator databases : A database that is either named _replicator or ends with the /_replicator suffix.

transient replications : Replication jobs created by POST-ing to the /_replicate endpoint.

persistent replications : Replication jobs defined in document in a _replicator database.

continuous replications : Replication jobs created with the "continuous": true parameter. These jobs will try to run continuously until the user removes them. They may be temporarily paused to allow other jobs to make progress.

one-shot replications : Replication jobs which are not continuous. If the "continuous":true parameter is not specified, by default, replication jobs will be one-shot. These jobs will try to run until they reach the end of the changes feed, then stop.

api_frontend node : Database node which has the api_frontend type set to true as described in RFC. Replication jobs can be only be created on these nodes.

replication node : Database node which has the replication type set to true as described in RFC. Replication jobs can only be run on these nodes.

filtered replications: Replications with a user-defined filter on the source endpoint to filter its changes feed.

replication_id : An ID defined by replication jobs, which is a hash of replication parameters that affect the result of the replication. These may include source and target endpoint URLs, as well as a filter function specified in a design document on the source endpoint.

job_id : A replication job ID derived from the database and document IDs for persistent replications, and from source, target endpoint, user name and some options for transient replications. Computing a job_id, unlike a replication_id, doesn't require making any network requests. A filtered replication with a given job_id during its lifetime may change its replication_id multiple times when filter contents changes on the source.

max_jobs : Configuration parameter which specifies up to how many replication jobs to run on each replication node.

max_churn : Configuration parameter which specifies a limit of how many new jobs to spawn during each rescheduling interval.

min_backoff_penalty : Configuration parameter specifying the minimum (the base) penalty applied to jobs which crash repeatedly.

max_backoff_penalty : Configuration parameter specifying the maximum penalty applied to jobs which crash repeatedly.


Detailed Description

Replication job creation and scheduling works roughly as follows:

  1. Persistent and transient jobs both start by creating or updating a couch_jobs record in a separate replication key-space on api_frontend nodes. Persistent jobs are driven by the couch_epi callback mechanism which notifies couch_replicator application when documents in _replicator DBs are updated, or when _replicator DBs are created and deleted. Transient jobs are created from the _replicate HTTP handler directly. Newly created jobs are in a pending state.

  2. Each replication node spawns some acceptor processes which wait in couch_jobs:accept/2 call for jobs. It will accept only jobs which are scheduled to run at a time less or equal to the current time.

  3. After a job is accepted, its state is updated to running, and then, a gen_server process monitoring these replication jobs will spawn another acceptor. That happens until the max_jobs limit is reached.

  4. The same monitoring gen_server will periodically check if there are any pending jobs in the queue and, if there are, spawn up to some max_churn number of new acceptors. These acceptors may start new jobs and, if they do, for each one of them, the oldest running job will be stopped and re-enqueued as pending. This in large follows the logic from the replication scheduler in CouchDB <= 3.x except that is uses couch_jobs as the central queuing and scheduling mechanism.

  5. After the job is marked as running, it computes its replication_id, initializes an internal replication state record from job‘s data object, and starts replicating. Underneath this level the logic is identical to what’s already happening in CouchDB <= 3.x and so it is not described further in this document.

  6. As jobs run, they periodically checkpoint, and when they do that, they also recompute their replication_id. In the case of filtered replications the replication_id may change, and if so, that job is stopped and re-enqueued as pending. Also, during checkpointing the job‘s data value is updated with stats such that the job stays active and doesn’t get re-enqueued by the couch_jobs activity monitor.

  7. If the job crashes, it will reschedule itself in gen_server:terminate/2 via couch_jobs:resubmit/3 call to run again at some future time, defined roughly as now + max(min_backoff_penalty * 2^consecutive_errors, max_backoff_penalty). If a job starts and successfully runs for some predefined period of time without crashing, it is considered to be "healed" and its consecutive_errors count is reset to 0.

  8. If the node where replication job runs crashes, or the job is manually killed via exit(Pid, kill), couch_jobs activity monitor will automatically re-enqueue the job as pending.

Replicator Job States

Description

The set of replication job states is defined as:

  • pending : A job is marked as pending in these cases:

    • As soon as a job is created from an api_frontend node
    • When it stopped to let other replication jobs run
    • When a filtered replication's replication_id changes
  • running : Set when a job is accepted by the couch_jobs:accept/2 call. This generally means the job is actually running on a node, however, in cases when a node crashes, the job may show as running on that node until couch_jobs activity monitor re-enqueues the job, and it starts running on another node.

  • crashing : The job was running, but then crashed with an intermittent error. Job's data has an error count which is incremented, and then a backoff penalty is computed and the job is rescheduled to try again at some point in the future.

  • completed : One-Shot replications which have completed

  • failed : This can happen when:

    • A replication job could not be parsed from a replication document. For example, if the user has not specified a "source" field.
    • A transient replication job crashes. Transient jobs don't get rescheduled to run again after they crash.
    • There already is another persistent replication job running or pending with the same replication_id.

State Differences From CouchDB <= 3.x

The set of states is slightly different than the ones from before. There are now fewer states as some of them have been combined together:

  • initializing was combined with pending

  • error was combined with crashing

Mapping Between couch_jobs States and Replication States

couch_jobs application has its own set of state definitions and they map to replicator states like so:

Replicator Statescouch_jobs States
pendingpending
runningrunning
crashingpending
completedfinished
failedfinished

State Transition Diagram

Jobs start in the pending state, after either a _replicator db doc update, or a POST to the /_replicate endpoint. Continuous jobs, will normally toggle between pending and running states. One-Shot jobs may toggle between pending and running a few times and then end up in completed.

_replicator doc       +-------+
POST /_replicate ---->+pending|
                      +-------+
                          ^
                          |
                          |
                          v
                      +---+---+      +--------+
            +---------+running+<---->|crashing|
            |         +---+---+      +--------+
            |             |
            |             |
            v             v
        +------+     +---------+
        |failed|     |completed|
        +------+     +---------+

Replication ID Collisions

Multiple replication jobs may specify replications which map to the same replication_id. To handle these collisions there is an FDB subspace (..., LayerPrefix, ?REPLICATION_IDS, replication_id) -> job_id to keep track of them. After the replication_id is computed, each replication job checks if there is already another job pending or running with the same replication_id. If the other job is transient, then the current job will reschedule itself as crashing. If the other job is persistent, the current job will fail permanently as failed.

Replication Parameter Validation

_replicator documents in CouchDB <= 3.x were parsed and validated in a two-step process:

  1. In a validate-doc-update (VDU) javascript function from a programmatically inserted _design document. This validation happened when the document was updated, and performed some rough checks on field names and value types. If this validation failed, the document update operation was rejected.

  2. Inside replicator‘s Erlang code when it was translated to an internal record used by the replication application. This validation was more thorough but didn’t have very friendly error messages. If validation failed here, the job would be marked as failed.

For CouchDB 4.x the proposal is to use only the Erlang parser. It would be called from the before_doc_update callback. This is a callback which runs before every document update. If validation fails there it would reject the document update operation. This should reduce code duplication and also provide better feedback to the users directly when they update the _replicator documents.

Transient Job Behavior

In CouchDB <= 3.x transient replication jobs ran in memory on a particular node in the cluster. If the node where the replication job ran crashes, the job would simply disappear without a trace. It was up to the user to periodically monitor the job status and re-create the job. In the current design, transient jobs are persisted to FDB as couch_jobs records, and so would survive node restarts. Also after transient jobs complete or failed, they used to disappear immediately. This design proposes keeping them around for a configurable emount of time to allow users to retrieve their status via _scheduler/jobs/$id API.

Monitoring Endpoints

_active_tasks, _scheduler/jobs and _scheduler/docs endpoint are handled by traversing the replication job‘s data using a new couch_jobs:fold_jobs/4 API function to retrieve each job’s data. _active_tasks implementation already works that way and _scheduler/* endpoint will work similarly.

Replication Documents Not Updated For Transient Errors

Configuration option [replicator] update_docs = false was introduced with the scheduling replicator in a 2.x release. It controls whether to update replication documents with transient states like triggered and error. It defaulted to false and was mainly for compatibility with older monitoring user scripts. That behavior now becomes hard-coded such that replication documents are only updated with terminal states of failed and completed. Users should use _scheduler/docs API to check for completion status instead.

Advantages and Disadvantages

Advantages:

  • Simplicity: re-using couch_jobs means having a lot less code to maintain in couch_replicator. In the draft implementation there are about 3000 lines of code saved compared to the replicator application in CouchDB 3.x

  • Simpler endpoint and monitoring implementation

  • Fewer replication job states to keep track of

  • Transient replications can survive node crashes and restarts

  • Simplified and improved validation logic

  • Using node types allows tightening firewall rules such that only replication nodes are the ones which may make arbitrary requests outside the cluster, and frontend_api nodes are the only ones that may accept incoming connections.

Disadvantages:

  • Behavior changes for transient jobs

  • Centralized job queue might mean handling some number of conflicts generated in the FDB backend when jobs are accepted. These are mitigated using the startup_jitter configuration parameter and a configurable number of max acceptors per node.

  • In monitoring API responses, running job state might not immediately reflect the running process state on the replication node. If the node crashes, it might take up to a minute or two until the job is re-enqueued by the couch_jobs activity monitor.

Key Changes

  • Behavior changes for transient jobs

  • A delay in running state as reflected in monitoring API responses

  • [replicator] update_docs = false configuration option becomes hard-coded

Applications and Modules affected

  • couch_jobs : New APIs to fold jobs and get pending count job estimate

  • fabric2_db : Adding EPI db create/delete callbacks

  • couch_replicator :

    • Remove couch_replicator_scheduler* modules
    • Remove couch_replicator_doc_processor_* modules
    • couch_replicator : job creation and a general API entry-point for couch_replicator.
    • couch_replicator_job : runs each replication job
    • couch_replicator_job_server : replication job monitoring gen_server
    • couch_replicator_parse : parses replication document and HTTP _replicate POST bodies

HTTP API additions

N/A

HTTP API deprecations

N/A

Security Considerations

Ability to confine replication jobs to run on replication nodes improves the security posture. It is possible to set up firewall rules which allow egress traffic sent out only from those nodes.

References

Co-authors

  • @davisp

Acknowledgements

  • @davisp