name: Formal RFC about: Submit a formal Request For Comments for consideration by the team. title: ‘Replicator Implementation On FDB’ labels: rfc, discussion assignees: ‘vatamane@apache.org’
This document describes the design of the replicator application for CouchDB 4.x. The replicator will rely on couch_jobs
for centralized scheduling and monitoring of replication jobs.
Replication jobs can be created from documents in _replicator
databases, or by POST
-ing requests to the HTTP /_replicate
endpoint. Previously, in CouchDB <= 3.x, replication jobs were mapped to individual cluster nodes and a scheduler component would run up to max_jobs
number of jobs at a time on each node. The new design proposes using couch_jobs
, as described in the Background Jobs RFC, to have a central, FDB-based queue of replication jobs. couch_jobs
application will manage job scheduling and coordination. The new design also proposes using heterogeneous node types as defined in the Node Types RFC such that replication jobs will be created only on api_frontend
nodes and run only on replication
nodes.
The key words “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”, “SHOULD NOT”, “RECOMMENDED”, “MAY”, and “OPTIONAL” in this document are to be interpreted as described in RFC 2119.
_replicator
databases : A database that is either named _replicator
or ends with the /_replicator
suffix.
transient
replications : Replication jobs created by POST
-ing to the /_replicate
endpoint.
persistent
replications : Replication jobs defined in document in a _replicator
database.
continuous
replications : Replication jobs created with the "continuous": true
parameter. These jobs will try to run continuously until the user removes them. They may be temporarily paused to allow other jobs to make progress.
one-shot
replications : Replication jobs which are not continuous
. If the "continuous":true
parameter is not specified, by default, replication jobs will be one-shot
. These jobs will try to run until they reach the end of the changes feed, then stop.
api_frontend node
: Database node which has the api_frontend
type set to true
as described in RFC. Replication jobs can be only be created on these nodes.
replication node
: Database node which has the replication
type set to true
as described in RFC. Replication jobs can only be run on these nodes.
filtered
replications: Replications with a user-defined filter on the source endpoint to filter its changes feed.
replication_id
: An ID defined by replication jobs, which is a hash of replication parameters that affect the result of the replication. These may include source and target endpoint URLs, as well as a filter function specified in a design document on the source endpoint.
job_id
: A replication job ID derived from the database and document IDs for persistent replications, and from source, target endpoint, user name and some options for transient replications. Computing a job_id
, unlike a replication_id
, doesn't require making any network requests. A filtered replication with a given job_id
during its lifetime may change its replication_id
multiple times when filter contents changes on the source.
max_jobs
: Configuration parameter which specifies up to how many replication jobs to run on each replication
node.
max_churn
: Configuration parameter which specifies a limit of how many new jobs to spawn during each rescheduling interval.
min_backoff_penalty
: Configuration parameter specifying the minimum (the base) penalty applied to jobs which crash repeatedly.
max_backoff_penalty
: Configuration parameter specifying the maximum penalty applied to jobs which crash repeatedly.
Replication job creation and scheduling works roughly as follows:
Persistent
and transient
jobs both start by creating or updating a couch_jobs
record in a separate replication key-space on api_frontend
nodes. Persistent jobs are driven by the couch_epi
callback mechanism which notifies couch_replicator
application when documents in _replicator
DBs are updated, or when _replicator
DBs are created and deleted. Transient jobs are created from the _replicate
HTTP handler directly. Newly created jobs are in a pending
state.
Each replication
node spawns some acceptor processes which wait in couch_jobs:accept/2
call for jobs. It will accept only jobs which are scheduled to run at a time less or equal to the current time.
After a job is accepted, its state is updated to running
, and then, a gen_server process monitoring these replication jobs will spawn another acceptor. That happens until the max_jobs
limit is reached.
The same monitoring gen_server will periodically check if there are any pending jobs in the queue and, if there are, spawn up to some max_churn
number of new acceptors. These acceptors may start new jobs and, if they do, for each one of them, the oldest running job will be stopped and re-enqueued as pending
. This in large follows the logic from the replication scheduler in CouchDB <= 3.x except that is uses couch_jobs
as the central queuing and scheduling mechanism.
After the job is marked as running
, it computes its replication_id
, initializes an internal replication state record from job‘s data object, and starts replicating. Underneath this level the logic is identical to what’s already happening in CouchDB <= 3.x and so it is not described further in this document.
As jobs run, they periodically checkpoint, and when they do that, they also recompute their replication_id
. In the case of filtered replications the replication_id
may change, and if so, that job is stopped and re-enqueued as pending
. Also, during checkpointing the job‘s data value is updated with stats such that the job stays active and doesn’t get re-enqueued by the couch_jobs
activity monitor.
If the job crashes, it will reschedule itself in gen_server:terminate/2
via couch_jobs:resubmit/3
call to run again at some future time, defined roughly as now + max(min_backoff_penalty * 2^consecutive_errors, max_backoff_penalty)
. If a job starts and successfully runs for some predefined period of time without crashing, it is considered to be "healed"
and its consecutive_errors
count is reset to 0.
If the node where replication job runs crashes, or the job is manually killed via exit(Pid, kill)
, couch_jobs
activity monitor will automatically re-enqueue the job as pending
.
The set of replication job states is defined as:
pending
: A job is marked as pending
in these cases:
api_frontend
nodereplication_id
changesrunning
: Set when a job is accepted by the couch_jobs:accept/2
call. This generally means the job is actually running on a node, however, in cases when a node crashes, the job may show as running
on that node until couch_jobs
activity monitor re-enqueues the job, and it starts running on another node.
crashing
: The job was running, but then crashed with an intermittent error. Job's data has an error count which is incremented, and then a backoff penalty is computed and the job is rescheduled to try again at some point in the future.
completed
: One-Shot replications which have completed
failed
: This can happen when:
"source"
field.replication_id
.The set of states is slightly different than the ones from before. There are now fewer states as some of them have been combined together:
initializing
was combined with pending
error
was combined with crashing
couch_jobs
application has its own set of state definitions and they map to replicator states like so:
Replicator States | couch_jobs States |
---|---|
pending | pending |
running | running |
crashing | pending |
completed | finished |
failed | finished |
Jobs start in the pending
state, after either a _replicator
db doc update, or a POST to the /_replicate
endpoint. Continuous jobs, will normally toggle between pending
and running
states. One-Shot jobs may toggle between pending
and running a few times and then end up in completed
.
_replicator doc +-------+ POST /_replicate ---->+pending| +-------+ ^ | | v +---+---+ +--------+ +---------+running+<---->|crashing| | +---+---+ +--------+ | | | | v v +------+ +---------+ |failed| |completed| +------+ +---------+
Multiple replication jobs may specify replications which map to the same replication_id
. To handle these collisions there is an FDB subspace (..., LayerPrefix, ?REPLICATION_IDS, replication_id) -> job_id
to keep track of them. After the replication_id
is computed, each replication job checks if there is already another job pending or running with the same replication_id
. If the other job is transient, then the current job will reschedule itself as crashing
. If the other job is persistent, the current job will fail permanently as failed
.
_replicator
documents in CouchDB <= 3.x were parsed and validated in a two-step process:
In a validate-doc-update (VDU) javascript function from a programmatically inserted _design document. This validation happened when the document was updated, and performed some rough checks on field names and value types. If this validation failed, the document update operation was rejected.
Inside replicator‘s Erlang code when it was translated to an internal record used by the replication application. This validation was more thorough but didn’t have very friendly error messages. If validation failed here, the job would be marked as failed
.
For CouchDB 4.x the proposal is to use only the Erlang parser. It would be called from the before_doc_update
callback. This is a callback which runs before every document update. If validation fails there it would reject the document update operation. This should reduce code duplication and also provide better feedback to the users directly when they update the _replicator
documents.
In CouchDB <= 3.x transient replication jobs ran in memory on a particular node in the cluster. If the node where the replication job ran crashes, the job would simply disappear without a trace. It was up to the user to periodically monitor the job status and re-create the job. In the current design, transient
jobs are persisted to FDB as couch_jobs
records, and so would survive node restarts. Also after transient jobs complete or failed, they used to disappear immediately. This design proposes keeping them around for a configurable emount of time to allow users to retrieve their status via _scheduler/jobs/$id
API.
_active_tasks
, _scheduler/jobs
and _scheduler/docs
endpoint are handled by traversing the replication job‘s data using a new couch_jobs:fold_jobs/4
API function to retrieve each job’s data. _active_tasks
implementation already works that way and _scheduler/*
endpoint will work similarly.
Configuration option [replicator] update_docs = false
was introduced with the scheduling replicator in a 2.x release. It controls whether to update replication documents with transient states like triggered
and error
. It defaulted to false
and was mainly for compatibility with older monitoring user scripts. That behavior now becomes hard-coded such that replication documents are only updated with terminal states of failed
and completed
. Users should use _scheduler/docs
API to check for completion status instead.
Advantages:
Simplicity: re-using couch_jobs
means having a lot less code to maintain in couch_replicator
. In the draft implementation there are about 3000 lines of code saved compared to the replicator application in CouchDB 3.x
Simpler endpoint and monitoring implementation
Fewer replication job states to keep track of
Transient replications can survive node crashes and restarts
Simplified and improved validation logic
Using node types allows tightening firewall rules such that only replication
nodes are the ones which may make arbitrary requests outside the cluster, and frontend_api
nodes are the only ones that may accept incoming connections.
Disadvantages:
Behavior changes for transient jobs
Centralized job queue might mean handling some number of conflicts generated in the FDB backend when jobs are accepted. These are mitigated using the startup_jitter
configuration parameter and a configurable number of max acceptors per node.
In monitoring API responses, running
job state might not immediately reflect the running process state on the replication node. If the node crashes, it might take up to a minute or two until the job is re-enqueued by the couch_jobs
activity monitor.
Behavior changes for transient jobs
A delay in running
state as reflected in monitoring API responses
[replicator] update_docs = false
configuration option becomes hard-coded
couch_jobs : New APIs to fold jobs and get pending count job estimate
fabric2_db : Adding EPI db create/delete callbacks
couch_replicator :
couch_replicator_scheduler*
modulescouch_replicator_doc_processor_*
modulescouch_replicator
: job creation and a general API entry-point for couch_replicator.couch_replicator_job
: runs each replication jobcouch_replicator_job_server
: replication job monitoring gen_servercouch_replicator_parse
: parses replication document and HTTP _replicate
POST bodiesN/A
N/A
Ability to confine replication jobs to run on replication
nodes improves the security posture. It is possible to set up firewall rules which allow egress traffic sent out only from those nodes.