blob: cd1b7bd9e28152460519b79bdedd15cae33c8a98 [file] [log] [blame] [view]
Developer Oriented Replicator Description
=========================================
This description of scheduling replicator's functionality is mainly geared to
CouchDB developers. It dives a bit into the internal and explains how
everything is connected together.
A natural place to start is the top application supervisor:
`couch_replicator_sup`. It's a `rest_for_one` restart strategy supervisor,
so if a child process terminates, the rest of the children in the hierarchy
following it are also terminated. This structure implies a useful constraint --
children lower in the list can safely call their siblings which are higher in
the list.
A description of each child:
* `couch_replication_event`: Starts a gen_event publication bus to publish
replication related events. This used to wait for replication jobs in
couch_replicator and in tests to monitor for replication events.
Notification is performed via the `couch_replicator_notifier:notify/1`
function.
* `couch_replicator_connection`: Maintains a global replication connection
pool. It allows reusing connections across replication tasks. The main
interface is `acquire/1` and `release/1`. The general idea is once a
connection is established, it is kept around for
`replicator.connection_close_interval` milliseconds in case another
replication task wants to re-use it. It is worth pointing out how linking
and monitoring is handled: workers are linked to the connection pool when
they are created. If they crash, the connection pool will receive an 'EXIT'
event and clean up after the worker. The connection pool also monitors
owners (by monitoring the `Pid` from the `From` argument in the call to
`acquire/1`) and cleans up if owner dies, and the pool receives a 'DOWN'
message. Another interesting thing is that connection establishment
(creation) happens in the owner process so the pool is not blocked on it.
* `couch_replicator_rate_limiter`: Implements a rate limiter to handle
connection throttling from sources or targets where requests return 429
error codes. Uses the Additive Increase / Multiplicative Decrease feedback
control algorithm to converge on the channel capacity. Implemented using a
16-way sharded ETS table to maintain connection state. The table sharding
code is split out to `couch_replicator_rate_limiter_tables` module. The
purpose of the module it to maintain and continually estimate sleep
intervals for each connection represented as a `{Method, Url}` pair. The
interval is updated accordingly on each call to `failure/1` or `success/1`
calls. For a successful request, a client should call `success/1`. Whenever
a 429 response is received the client should call `failure/1`. When no
failures are happening the code ensures the ETS tables are empty in
order to have a lower impact on a running system.
* `couch_replicator_scheduler`: This is the core component of the scheduling
replicator. It's main task is to switch between replication jobs, by
stopping some and starting others to ensure all of them make progress.
Replication jobs which fail are penalized using an exponential backoff.
That is, each consecutive failure will double the time penalty. This frees
up system resources for more useful work than just continuously trying to
run the same subset of failing jobs.
The main API function is `add_job/1`. Its argument is an instance of the
`#rep{}` record, which could be the result of a document update from a
`_replicator` db or the result of a POST to `_replicate` endpoint.
Each job internally is represented by the `#job{}` record. It contains the
original `#rep{}` but also, maintains an event history. The history is a
sequence of past events for each job. These are timestamped and ordered
such that the most recent event is at the head. History length is limited
based on the `replicator.max_history` configuration value. The default is
20 entries. History events types are:
* `added` : job was just added to the scheduler. This is the first event.
* `started` : job was started. This was an attempt to run the job.
* `stopped` : job was stopped by the scheduler.
* `crashed` : job has crashed (instead of stopping cleanly).
The core of the scheduling algorithm is the `reschedule/1` function. This
function is called every `replicator.interval` milliseconds (default is
60000 i.e. a minute). During each call the scheduler will try to stop some
jobs, start some new ones and will also try to keep the maximum number of
jobs running less than `replicator.max_jobs` (default 500). So the
functions does these operations (actual code paste):
```
Running = running_job_count(),
Pending = pending_job_count(),
stop_excess_jobs(State, Running),
start_pending_jobs(State, Running, Pending),
rotate_jobs(State, Running, Pending),
update_running_jobs_stats(State#state.stats_pid)
```
`Running` is the total number of currently running jobs. `Pending` is the
total number of jobs waiting to be run. `stop_excess_jobs` will stop any
exceeding the `replicator.max_jobs` configured limit. This code takes
effect if user reduces the `max_jobs` configuration value.
`start_pending_jobs` will start any jobs if there is more room available.
This will take effect on startup or when user increases the `max_jobs`
configuration value. `rotate_jobs` is where all the action happens. The
scheduler picks `replicator.max_churn` running jobs to stop and then picks
the same number of pending jobs to start. The default value of `max_churn`
is 20. So by default every minute, 20 running jobs are stopped, and 20 new
pending jobs are started.
Before moving on it is worth pointing out that scheduler treats continuous
and non-continuous replications differently. Normal (non-continuous)
replications once started will be allowed to run to completion. That
behavior is to preserve their semantics of replicating a snapshot of the
source database to the target. For example if new documents are added to
the source after the replication are started, those updates should not show
up on the target database. Stopping and restarting a normal replication
would violate that constraint. The only exception to the rule is the user
explicitly reduces `replicator.max_jobs` configuration value. Even then
scheduler will first attempt to stop as many continuous jobs as possible
and only if it has no choice left will it stop normal jobs.
Keeping that in mind and going back to the scheduling algorithm, the next
interesting part is how the scheduler picks which jobs to stop and which
ones to start:
* Stopping: When picking jobs to stop the scheduler will pick longest
running continuous jobs first. The sorting callback function to get the
longest running jobs is unsurprisingly called `longest_running/2`. To
pick the longest running jobs it looks at the most recent `started`
event. After it gets a sorted list by longest running, it simply picks
first few depending on the value of `max_churn` using `lists:sublist/2`.
Then those jobs are stopped.
* Starting: When starting the scheduler will pick the jobs which have been
waiting the longest. Surprisingly, in this case it also looks at the
`started` timestamp and picks the jobs which have the oldest `started`
timestamp. If there are 3 jobs, A[started=10], B[started=7],
C[started=9], then B will be picked first, then C then A. This ensures
that jobs are not starved, which is a classic scheduling pitfall.
In the code, the list of pending jobs is picked slightly differently than
how the list of running jobs is picked. `pending_jobs/1` uses `ets:foldl`
to iterate over all the pending jobs. As it iterates it tries to keep only
up to `max_churn` oldest items in the accumulator. The reason this is done
is that there could be a very large number of pending jobs and loading them
all in a list (making a copy from ETS) and then sorting it can be quite
expensive performance-wise. The tricky part of the iteration is happening
in `pending_maybe_replace/2`. A `gb_sets` ordered set is used to keep top-N
longest waiting jobs so far. The code has a comment with a helpful example
on how this algorithm works.
The last part is how the scheduler treats jobs which keep crashing. If a
job is started but then crashes then that job is considered unhealthy. The
main idea is to penalize such jobs such that they are forced to wait an
exponentially larger amount of time with each consecutive crash. A central
part to this algorithm is determining what forms a sequence of consecutive
crashes. If a job starts then quickly crashes, and after its next start it
crashes again, then that would become a sequence of 2 consecutive crashes.
The penalty then would be calculated by `backoff_micros/1` function where
the consecutive crash count would end up as the exponent. However for
practical concerns there is also maximum penalty specified and that's the
equivalent of 10 consecutive crashes. Timewise it ends up being about 8
hours. That means even a job which keep crashing will still get a chance to
retry once in 8 hours.
There is subtlety when calculating consecutive crashes and that is deciding
when the sequence stops. That is, figuring out when a job becomes healthy
again. The scheduler considers a job healthy again if it started and hasn't
crashed in a while. The "in a while" part is a configuration parameter
`replicator.health_threshold` defaulting to 2 minutes. This means if job
has been crashing, for example 5 times in a row, but then on the 6th
attempt it started and ran for more than 2 minutes then it is considered
healthy again. The next time it crashes its sequence of consecutive crashes
will restart at 1.
* `couch_replicator_doc_processor`: The doc processor component is in charge
of processing replication document updates, turning them into replication
jobs and adding those jobs to the scheduler. Unfortunately the only reason
there is even a `couch_replicator_doc_processor` gen_server, instead of
replication documents being turned to jobs and inserted into the scheduler
directly, is because of one corner case -- filtered replications using
custom (JavaScript mostly) filters. More about this later. It is better to
start with how updates flow through the doc processor:
Document updates come via the `db_change/3` callback from
`couch_multidb_changes`, then go to the `process_change/2` function.
In `process_change/2` a few decisions are made regarding how to proceed. The
first is "ownership" check. That is a check if the replication document
belongs on the current node. If not, then it is ignored. In a cluster, in
general there would be N copies of a document change and we only want to run
the replication once. Another check is to see if the update has arrived
during a time when the cluster is considered "unstable". If so, it is
ignored, because soon enough a rescan will be launched and all the documents
will be reprocessed anyway. Another noteworthy thing in `process_change/2`
is handling of upgrades from the previous version of the replicator when
transient states were written to the documents. Two such states were
`triggered` and `error`. Both of those states are removed from the document
then then update proceeds in the regular fashion. `failed` documents are
also ignored here. `failed` is a terminal state which indicates the document
was somehow unsuitable to become a replication job (it was malformed or a
duplicate). Otherwise the state update proceeds to `process_updated/2`.
`process_updated/2` is where replication document updates are parsed and
translated to `#rep{}` records. The interesting part here is that the
replication ID isn't calculated yet. Unsurprisingly the parsing function
used is called `parse_rep_doc_without_id/1`. Also note that up until now
everything is still running in the context of the `db_change/3` callback.
After replication filter type is determined the update gets passed to the
`couch_replicator_doc_processor` gen_server.
The `couch_replicator_doc_processor` gen_server's main role is to try to
calculate replication IDs for each `#rep{}` record passed to it, then add
that as a scheduler job. As noted before, `#rep{}` records parsed up until
this point lack a replication ID. The reason is replication ID calculation
includes a hash of the filter code. And because user defined replication
filters live in the source DB, which most likely involves a remote network
fetch there is a possibility of blocking and a need to handle various
network failures and retries. Because of that `replication_doc_processor`
dispatches all of that blocking and retrying to a separate `worker` process
(`couch_replicator_doc_processor_worker` module).
`couch_replicator_doc_processor_worker` is where replication IDs are
calculated for each individual doc update. There are two separate modules
which contain utilities related to replication ID calculation:
`couch_replicator_ids` and `couch_replicator_filters`. The first one
contains ID calculation algorithms and the second one knows how to parse and
fetch user filters from a remote source DB. One interesting thing about the
worker is that it is time-bounded and is guaranteed to not be stuck forever.
That's why it spawns an extra process with `spawn_monitor`, just so it can
do an `after` clause in receive and bound the maximum time this worker will
take.
A doc processor worker will either succeed or fail but never block for too
long. Success and failure are returned as exit values. Those are handled in
the `worker_returned/3` doc processor clauses. The most common pattern is
that a worker is spawned to add a replication job, it does so and returns a
`{ok, ReplicationID}` value in `worker_returned`.
In case of a filtered replication with custom user code there are two case to
consider:
1. Filter fetching code has failed. In that case worker returns an error.
But because the error could be a transient network error, another
worker is started to try again. It could fail and return an error
again, then another one is started and so on. However each consecutive
worker will do an exponential backoff, not unlike the scheduler code.
`error_backoff/1` is where the backoff period is calculated.
Consecutive errors are held in the `errcnt` field in the ETS table.
2. Fetching filter code succeeds, replication ID is calculated and job is
added to the scheduler. However, because this is a filtered replication
the source database could get an updated filter. Which means
replication ID could change again. So the worker is spawned to
periodically check the filter and see if it changed. In other words doc
processor will do the work of checking for filtered replications, get
an updated filter and will then refresh the replication job (remove the
old one and add a new one with a different ID). The filter checking
interval is determined by the `filter_backoff` function. An unusual
thing about that function is it calculates the period based on the size
of the ETS table. The idea there is for a few replications in a
cluster, it's ok to check filter changes often. But when there are lots
of replications running, having each one checking their filter often is
not a good idea.