blob: e4a3614158f4697ec3ce4f0d21d619c4dedfbf9f [file]
import { explainError } from 'pouchdb-utils';
import { collate } from 'pouchdb-collate';
const CHECKPOINT_VERSION = 1;
const REPLICATOR = "pouchdb";
// This is an arbitrary number to limit the
// amount of replication history we save in the checkpoint.
// If we save too much, the checkpoint docs will become very big,
// if we save fewer, we'll run a greater risk of having to
// read all the changes from 0 when checkpoint PUTs fail
// CouchDB 2.0 has a more involved history pruning,
// but let's go for the simple version for now.
const CHECKPOINT_HISTORY_SIZE = 5;
const LOWEST_SEQ = 0;
class CheckpointerInternal {
constructor(src, target, id, returnValue, opts = {
writeSourceCheckpoint: true,
writeTargetCheckpoint: true,
}) {
this.src = src;
this.target = target;
this.id = id;
this.returnValue = returnValue;
this.opts = opts;
if (typeof opts.writeSourceCheckpoint === "undefined") {
opts.writeSourceCheckpoint = true;
}
if (typeof opts.writeTargetCheckpoint === "undefined") {
opts.writeTargetCheckpoint = true;
}
}
async writeCheckpoint(checkpoint, session) {
// update target before source every time
// because otherwise, compareHistory will pick a too new seq from source
// after an error writing a checkpoint to the target
await this._updateTarget(checkpoint, session);
return this._updateSource(checkpoint, session);
}
async _updateTarget(checkpoint, session) {
if (this.opts.writeTargetCheckpoint) {
return await this._updateCheckpoint(this.target, this.id, checkpoint,
session, this.returnValue);
} return true;
}
async _updateSource(checkpoint, session) {
if (this.opts.writeSourceCheckpoint) {
try {
return await this._updateCheckpoint(this.src, this.id, checkpoint,
session, this.returnValue);
} catch (err) {
if (isForbiddenError(err)) {
this.opts.writeSourceCheckpoint = false;
return true;
}
throw err;
}
}
}
async _updateCheckpoint(db, id, checkpoint, session, returnValue) {
//retrieve checkpoint doc from db or create a new one
const doc = await this._initCheckpointDoc(db, id, session);
if (returnValue.cancelled) {
return;
}
// if the checkpoint has not changed, do not update
if (doc.last_seq === checkpoint) {
return;
}
// Filter out current entry for this replication
doc.history = (doc.history || []).filter(item => item.session_id !== session);
// Add the latest checkpoint to history
doc.history.unshift({
last_seq: checkpoint,
session_id: session
});
// Just take the last pieces in history, to
// avoid really big checkpoint docs.
// see comment on history size above
doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE);
doc.version = CHECKPOINT_VERSION;
doc.replicator = REPLICATOR;
doc.session_id = session;
doc.last_seq = checkpoint;
try {
return await db.put(doc);
} catch (err) {
if (err.status === 409) {
// retry; someone is trying to write a checkpoint simultaneously
return this._updateCheckpoint(db, id, checkpoint, session, returnValue);
}
throw err;
}
}
async _initCheckpointDoc(db, id, session) {
try {
return await db.get(id);
} catch (err) {
if (err.status === 404) {
if (db.adapter === 'http' || db.adapter === 'https') {
explainError(
404, 'PouchDB is just checking if a remote checkpoint exists.'
);
}
return {
session_id: session,
_id: id,
history: [],
replicator: REPLICATOR,
version: CHECKPOINT_VERSION
};
} else {
throw err;
}
}
}
async getCheckpoint() {
if (!this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) {
return LOWEST_SEQ;
}
if (this.opts && this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) {
try {
const sourceDoc = await this.src.get(this.id);
return sourceDoc.last_seq || LOWEST_SEQ;
} catch (err) {
/* istanbul ignore if */
if (err.status !== 404) {
throw err;
}
return LOWEST_SEQ;
}
}
let targetDoc;
try {
targetDoc = await this.target.get(this.id);
} catch (err) {
if (err.status !== 404) {
throw err;
}
return LOWEST_SEQ;
}
if (this.opts && this.opts.writeTargetCheckpoint && !this.opts.writeSourceCheckpoint) {
return targetDoc.last_seq || LOWEST_SEQ;
}
try {
const sourceDoc = await this.src.get(this.id);
// Since we can't migrate an old version doc to a new one
// (no session id), we just go with the lowest seq in this case
/* istanbul ignore if */
if (targetDoc.version !== sourceDoc.version) {
return LOWEST_SEQ;
}
const version = targetDoc.version ? targetDoc.version.toString() : "undefined";
if (version in comparisons) {
return comparisons[version](targetDoc, sourceDoc);
}
/* istanbul ignore next */
return LOWEST_SEQ;
} catch (err) {
if (err.status === 404 && targetDoc.last_seq) {
try {
await this.src.put({
_id: this.id,
last_seq: LOWEST_SEQ
});
return LOWEST_SEQ;
} catch (err) {
if (isForbiddenError(err)) {
this.opts.writeSourceCheckpoint = false;
return targetDoc.last_seq;
}
/* istanbul ignore next */
return LOWEST_SEQ;
}
}
//missing sourceDoc on initial replication returns LOWEST_SEQ
if (err.status === 404) {
return LOWEST_SEQ;
}
throw err;
}
}
}
const comparisons = {
"undefined": (targetDoc, sourceDoc) => {
// This is the previous comparison function
/* istanbul ignore next */
return collate(targetDoc.last_seq, sourceDoc.last_seq) === 0 ? sourceDoc.last_seq : 0;
},
"1": (targetDoc, sourceDoc) => {
// This is the comparison function ported from CouchDB
return compareReplicationLogs(sourceDoc, targetDoc).last_seq;
}
};
// This checkpoint comparison is ported from CouchDBs source
// they come from here:
// https://github.com/apache/couchdb-couch-replicator/blob/master/src/couch_replicator.erl#L863-L906
function compareReplicationLogs(srcDoc, tgtDoc) {
if (srcDoc.session_id === tgtDoc.session_id) {
return {
last_seq: srcDoc.last_seq,
history: srcDoc.history
};
}
return compareReplicationHistory(srcDoc.history, tgtDoc.history);
}
function compareReplicationHistory(sourceHistory, targetHistory) {
// the erlang loop via function arguments is not so easy to repeat in JS
// therefore, doing this as recursion
const [S, ...sourceRest] = sourceHistory;
const [T, ...targetRest] = targetHistory;
if (!S || targetHistory.length === 0) {
return {
last_seq: LOWEST_SEQ,
history: []
};
}
const sourceId = S.session_id;
/* istanbul ignore if */
if (hasSessionId(sourceId, targetHistory)) {
return {
last_seq: S.last_seq,
history: sourceHistory
};
}
const targetId = T.session_id;
if (hasSessionId(targetId, sourceRest)) {
return {
last_seq: T.last_seq,
history: targetRest
};
}
return compareReplicationHistory(sourceRest, targetRest);
}
function hasSessionId(sessionId, history) {
if (!sessionId || history.length === 0) {
return false;
}
return history.some(entry => entry.session_id === sessionId);
}
function isForbiddenError(err) {
return typeof err.status === 'number' && Math.floor(err.status / 100) === 4;
}
function Checkpointer(src, target, id, returnValue, opts) {
if (!(this instanceof CheckpointerInternal)) {
return new CheckpointerInternal(src, target, id, returnValue, opts);
}
return Checkpointer;
}
export default Checkpointer;