This is a technical description of the resharding logic. The discussion will focus on: job creation and life-cycle, data definitions, and on the state transition mechanisms.
Job creation happens in the mem3_reshard_httpd
API handler module. That module uses mem3_reshard_http_util
to do some validation, and eventually calls mem3_reshard:start_split_job/2
on one or more nodes in the cluster depending where the new jobs should run.
mem3_reshard:start_split_job/2
is the main Erlang API entry point. After some more validation it creates a #job{}
record and calls the mem3_reshard
job manager. The manager then will add the job to its jobs ets table, save it to a _local
document in the shards db, and most importantly, start a new resharding process. That process will be supervised by the mem3_reshard_job_sup
supervisor.
Each job will be running in a gen_server implemented in mem3_reshard_job
module. When splitting a shard, a job will go through a series of steps such as initial_copy
, build_indices
, update_shard_map
, etc. Between each step it will report progress and checkpoint with mem3_reshard
manager. A checkpoint involved the mem3_reshard
manager persisting that job's state to disk in _local
document in _dbs
db. Then job continues until completed
state or until it failed in the failed
state.
If a user stops shard splitting on the whole cluster, then all running jobs will stop. When shard splitting is resumed, they will try to recover from their last checkpoint.
A job can also be individually stopped or resumed. If a job is individually stopped it will stay so even if the global shard splitting state is running
. A user has to explicitly set that job to a running
state for it to resume. If a node with running jobs is turned off, when it is rebooted running jobs will resume from their last checkpoint.
This section focuses on record definition and how data is transformed to and from various formats.
Right below the mem3_reshard:start_split_job/1
API level a job is converted to a #job{}
record defined in the mem3_reshard.hrl
header file. That record is then used throughout most of the resharding code. The job manager mem3_reshard
stores it in its jobs ets table, then when a job process is spawn it single argument also just a #job{}
record. As a job process is executing it will periodically report state back to the mem3_reshard
manager as an updated #job{}
record.
Some interesting fields from the #job{}
record:
id
Uniquely identifies a job in a cluster. It is derived from the source shard name, node and a version (currently = 1).type
Currently the only type supported is split
but merge
or rebalance
might be added in the future.job_state
The running state of the job. Indicates if the job is running
, stopped
, completed
or failed
.split_state
Once the job is running this indicates how far along it got in the splitting process.source
Source shard file. If/when merge is implemented this will be a list.target
List of target shard files. This is expected to be a list of 2 items currently.history
A time-line of state transitions represented as a list of tuples.pid
When job is running this will be set to the pid of the process.In the mem3_reshard_job_store
module the #job{}
record is translated to an json document so it can be persisted to disk. Translation functions to and from a json in that module are also used by the HTTP API layer to return a job's state and other information to the user.
Another important piece of data is the global resharding state. When a user disables resharding on a cluster, a call is made to mem3_reshard
manager on each node and they store that in a #state{}
record. This record is defined in the mem3_reshard.hrl
module, and just like the #job{}
record can be translated to/from ejson in the mem3_reshard_store
and stored and loaded from disk.
Resharding logic has 3 separate states to keep track of:
Per-node resharding state. This state can be toggled between running
and stopped
. That toggle happens via the mem3_reshard:start/0
and mem3_reshard:stop/1
function. This state is kept in the #state{}
record of the mem3_reshard
manager gen_server. This state is also persisted to the local shard map database as a _local
document so that it is maintained through a node restart. When the state is running
then all jobs that are not individually stopped
, and have not failed or completed, will be running
. When the state is stopped
all the running jobs will be stopped
.
Job's running state held in the #job{}
job_state
field. This is the general running state of a resharding job. It can be new
, running
, stopped
, completed
or failed
. This state is most relevant for the mem3_reshard
manager. In other words, it is the mem3_reshard
gen_server that starts the job, monitors it to see if it exits successfully on completion or with an error.
Job‘s internal splitting state. This state tracks the steps taken during shard splitting by each job. This state is mostly relevant for the mem3_reshard_job
module. This state is kept in #job{}
‘s split_state
field. The progression of these states is linear going from one state to the next. That’s reflected in the code as well, when one state is done, mem3_reshard_job:get_next_state/1
is called which returns the next state in the list. The list is defined in the SPLIT_STATES
macro. This simplistic transition is also one of the reasons why a gen_fsm wasn’t considered for mem3_reshard_job
logic.
Another interesting aspect is how the split_state
transitions happen in the mem3_reshard_job
module. What follows is an examination of that.
A job starts running in the new
state or from a previously checkpointed state. In the later case, the job goes through some recovery logic (see ?STATE_RESTART
macro in mem3_reshard_job
) where it tries to resume its work from where it left of. It means, for example, if it was in the initial_copy
state and was interrupted it might have to reset the target files and copy everything again. After recovery, the state execution logic is driven by run(#job{})
which ends up calling ?MODULE:State(#job{})
state specific functions for each state.
In mem3_reshard_job:switch_to_next_state/2
job's history is updated, any current state_info
is cleared, job state is switched in the #job{}
record. Then, the new state is checkpointed in the checkpoint/1
function. Checkpoint will cast a message to the mem3_reshard
manager. After that message is sent the job process sits and waits.
In the meantime mem3_reshard
manager checkpoints the state, which means it updates both its ETS table with the new #job{}
record, persists the state with the mem3_reshard_store
module, then, finally, it notifies the job process that checkpointing is done by calling mem3_reshard_job:checkpoint_done/1
.
mem3_reshard_job:checkpoint_done/1
function call sends a checkpoint_done
message to the job's process, at which point it starts executing that state.
Most states in mem3_reshard_job
try not to block the main job process and instead launch worker processes to perform long running operations. It is usually just one worker process but it could be multiple as well. After that it waits for the workers to finish and inspects their exit signal (see wait_for_workers/1
function). When all the workers exit for a particular split_state
, the job is switched to the next state with switch_to_next_state/1
and the whole thing repeats until the completed
state is reached when the whole job exits normally.
If the source is updated at high rate and the cluster is under load, there is a possibility for the resharding jobs to take longer to finish. The cluster would have to be running at the limit where both compaction and internal replication will have difficulty catching up as fundamentally the logic used for the initial bulk copy is similar the compaction code, and topoff states are just reusing the internal replicator code. Eventually when the load subsides the jobs should catch up and finish.
These are mostly random notes about various modules involved in resharding. Most, but not all, are in the mem3
application.
mem3_reshard
: Main API entry point and the job manager.
mem3_reshard_job
: Individual job logic.
mem3_reshard_dbdoc
: Responsible for updating shard doc in the _db
's database. Besides just having a bunch of utility function there is a gen_server spawned which is used to update shard documents in a cluster in such a way as to minimize the risk of conflicts. That is accomplished by having each shard updater calling only one such updater for the whole cluster. This coordinator is picked by sorting the list of all the live mem3 nodes and picking the first one in the list.
mem3_reshard_httpd
: API endpoint definitions.
mem3_reshard_api
: Cluster API endpoint. This module is responsible for sending requests to all the nodes in a cluster and gathering results.
mem3_reshard_index
: This is a helper module used by workers in the build_indices
state.
mem3_reshard_job_sup
: Simple one for one supervisor which keeps track of running jobs.
mem3_reshard_store
: State persistence module. It knows how to save/restore #job{}
and #state{}
records to/from _local
docs. It is also re-used for serializing #job{}
into ejson by the HTTP API module.
mem3_reshard_validate
: Validate that source exists, target ranges don't have gaps in them, etc.
couch_db_split
: This module is not in mem3
app but it does all the heavy lifting during the initial data copy. Given a source db and some targets, and a function to decide which doc go to which target, it will copy all data from the source to the targets. It's best to think of this module as a form of compactor. Unlike couch_bt_engine_compactor
this one lives above the couch_db_engine
API, and instead of copying data to one new file it copies it to 2 or more. Unsurprisingly because of that it uses some lower level couch_db_engine
API directly, including linking to a couch_db_updater, force setting db update sequences and others.