| import { explainError } from 'pouchdb-utils'; |
| import { collate } from 'pouchdb-collate'; |
| |
| const CHECKPOINT_VERSION = 1; |
| const REPLICATOR = "pouchdb"; |
| // This is an arbitrary number to limit the |
| // amount of replication history we save in the checkpoint. |
| // If we save too much, the checkpoint docs will become very big, |
| // if we save fewer, we'll run a greater risk of having to |
| // read all the changes from 0 when checkpoint PUTs fail |
| // CouchDB 2.0 has a more involved history pruning, |
| // but let's go for the simple version for now. |
| const CHECKPOINT_HISTORY_SIZE = 5; |
| const LOWEST_SEQ = 0; |
| |
| class CheckpointerInternal { |
| constructor(src, target, id, returnValue, opts = { |
| writeSourceCheckpoint: true, |
| writeTargetCheckpoint: true, |
| }) { |
| this.src = src; |
| this.target = target; |
| this.id = id; |
| this.returnValue = returnValue; |
| this.opts = opts; |
| |
| if (typeof opts.writeSourceCheckpoint === "undefined") { |
| opts.writeSourceCheckpoint = true; |
| } |
| |
| if (typeof opts.writeTargetCheckpoint === "undefined") { |
| opts.writeTargetCheckpoint = true; |
| } |
| } |
| |
| async writeCheckpoint(checkpoint, session) { |
| // update target before source every time |
| // because otherwise, compareHistory will pick a too new seq from source |
| // after an error writing a checkpoint to the target |
| await this._updateTarget(checkpoint, session); |
| return this._updateSource(checkpoint, session); |
| } |
| |
| async _updateTarget(checkpoint, session) { |
| if (this.opts.writeTargetCheckpoint) { |
| return await this._updateCheckpoint(this.target, this.id, checkpoint, |
| session, this.returnValue); |
| } return true; |
| } |
| |
| async _updateSource(checkpoint, session) { |
| if (this.opts.writeSourceCheckpoint) { |
| try { |
| return await this._updateCheckpoint(this.src, this.id, checkpoint, |
| session, this.returnValue); |
| } catch (err) { |
| if (isForbiddenError(err)) { |
| this.opts.writeSourceCheckpoint = false; |
| return true; |
| } |
| throw err; |
| } |
| } |
| } |
| |
| async _updateCheckpoint(db, id, checkpoint, session, returnValue) { |
| //retrieve checkpoint doc from db or create a new one |
| const doc = await this._initCheckpointDoc(db, id, session); |
| |
| if (returnValue.cancelled) { |
| return; |
| } |
| // if the checkpoint has not changed, do not update |
| if (doc.last_seq === checkpoint) { |
| return; |
| } |
| |
| // Filter out current entry for this replication |
| doc.history = (doc.history || []).filter(item => item.session_id !== session); |
| |
| // Add the latest checkpoint to history |
| doc.history.unshift({ |
| last_seq: checkpoint, |
| session_id: session |
| }); |
| |
| // Just take the last pieces in history, to |
| // avoid really big checkpoint docs. |
| // see comment on history size above |
| doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE); |
| |
| doc.version = CHECKPOINT_VERSION; |
| doc.replicator = REPLICATOR; |
| |
| doc.session_id = session; |
| doc.last_seq = checkpoint; |
| |
| try { |
| return await db.put(doc); |
| } catch (err) { |
| if (err.status === 409) { |
| // retry; someone is trying to write a checkpoint simultaneously |
| return this._updateCheckpoint(db, id, checkpoint, session, returnValue); |
| } |
| throw err; |
| } |
| } |
| |
| async _initCheckpointDoc(db, id, session) { |
| try { |
| return await db.get(id); |
| } catch (err) { |
| if (err.status === 404) { |
| if (db.adapter === 'http' || db.adapter === 'https') { |
| explainError( |
| 404, 'PouchDB is just checking if a remote checkpoint exists.' |
| ); |
| } |
| return { |
| session_id: session, |
| _id: id, |
| history: [], |
| replicator: REPLICATOR, |
| version: CHECKPOINT_VERSION |
| }; |
| } else { |
| throw err; |
| } |
| } |
| } |
| |
| async getCheckpoint() { |
| if (!this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) { |
| return LOWEST_SEQ; |
| } |
| |
| if (this.opts && this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) { |
| try { |
| const sourceDoc = await this.src.get(this.id); |
| return sourceDoc.last_seq || LOWEST_SEQ; |
| } catch (err) { |
| /* istanbul ignore if */ |
| if (err.status !== 404) { |
| throw err; |
| } |
| return LOWEST_SEQ; |
| } |
| } |
| |
| let targetDoc; |
| try { |
| targetDoc = await this.target.get(this.id); |
| } catch (err) { |
| if (err.status !== 404) { |
| throw err; |
| } |
| return LOWEST_SEQ; |
| } |
| |
| if (this.opts && this.opts.writeTargetCheckpoint && !this.opts.writeSourceCheckpoint) { |
| return targetDoc.last_seq || LOWEST_SEQ; |
| } |
| |
| try { |
| const sourceDoc = await this.src.get(this.id); |
| // Since we can't migrate an old version doc to a new one |
| // (no session id), we just go with the lowest seq in this case |
| /* istanbul ignore if */ |
| if (targetDoc.version !== sourceDoc.version) { |
| return LOWEST_SEQ; |
| } |
| const version = targetDoc.version ? targetDoc.version.toString() : "undefined"; |
| |
| if (version in comparisons) { |
| return comparisons[version](targetDoc, sourceDoc); |
| } |
| /* istanbul ignore next */ |
| return LOWEST_SEQ; |
| } catch (err) { |
| if (err.status === 404 && targetDoc.last_seq) { |
| try { |
| await this.src.put({ |
| _id: this.id, |
| last_seq: LOWEST_SEQ |
| }); |
| return LOWEST_SEQ; |
| } catch (err) { |
| if (isForbiddenError(err)) { |
| this.opts.writeSourceCheckpoint = false; |
| return targetDoc.last_seq; |
| } |
| /* istanbul ignore next */ |
| return LOWEST_SEQ; |
| } |
| } |
| //missing sourceDoc on initial replication returns LOWEST_SEQ |
| if (err.status === 404) { |
| return LOWEST_SEQ; |
| } |
| throw err; |
| } |
| } |
| } |
| |
| const comparisons = { |
| "undefined": (targetDoc, sourceDoc) => { |
| // This is the previous comparison function |
| /* istanbul ignore next */ |
| return collate(targetDoc.last_seq, sourceDoc.last_seq) === 0 ? sourceDoc.last_seq : 0; |
| }, |
| "1": (targetDoc, sourceDoc) => { |
| // This is the comparison function ported from CouchDB |
| return compareReplicationLogs(sourceDoc, targetDoc).last_seq; |
| } |
| }; |
| |
| // This checkpoint comparison is ported from CouchDBs source |
| // they come from here: |
| // https://github.com/apache/couchdb-couch-replicator/blob/master/src/couch_replicator.erl#L863-L906 |
| |
| function compareReplicationLogs(srcDoc, tgtDoc) { |
| if (srcDoc.session_id === tgtDoc.session_id) { |
| return { |
| last_seq: srcDoc.last_seq, |
| history: srcDoc.history |
| }; |
| } |
| |
| return compareReplicationHistory(srcDoc.history, tgtDoc.history); |
| } |
| |
| function compareReplicationHistory(sourceHistory, targetHistory) { |
| // the erlang loop via function arguments is not so easy to repeat in JS |
| // therefore, doing this as recursion |
| const [S, ...sourceRest] = sourceHistory; |
| const [T, ...targetRest] = targetHistory; |
| |
| if (!S || targetHistory.length === 0) { |
| return { |
| last_seq: LOWEST_SEQ, |
| history: [] |
| }; |
| } |
| |
| const sourceId = S.session_id; |
| /* istanbul ignore if */ |
| if (hasSessionId(sourceId, targetHistory)) { |
| return { |
| last_seq: S.last_seq, |
| history: sourceHistory |
| }; |
| } |
| |
| const targetId = T.session_id; |
| if (hasSessionId(targetId, sourceRest)) { |
| return { |
| last_seq: T.last_seq, |
| history: targetRest |
| }; |
| } |
| |
| return compareReplicationHistory(sourceRest, targetRest); |
| } |
| |
| function hasSessionId(sessionId, history) { |
| if (!sessionId || history.length === 0) { |
| return false; |
| } |
| return history.some(entry => entry.session_id === sessionId); |
| } |
| |
| function isForbiddenError(err) { |
| return typeof err.status === 'number' && Math.floor(err.status / 100) === 4; |
| } |
| |
| function Checkpointer(src, target, id, returnValue, opts) { |
| if (!(this instanceof CheckpointerInternal)) { |
| return new CheckpointerInternal(src, target, id, returnValue, opts); |
| } |
| return Checkpointer; |
| } |
| |
| export default Checkpointer; |