blob: 1c1840cbed487e24149cefd03486bb091badd2ce [file] [log] [blame]
import changesHandler from './changesHandler';
import {
clone,
filterChange,
uuid
} from 'pouchdb-utils';
import {
ATTACH_STORE,
BY_SEQ_STORE,
DOC_STORE
} from './constants';
import {
decodeDoc,
decodeMetadata,
fetchAttachmentsIfNecessary,
idbError,
postProcessAttachments,
openTransactionSafely
} from './utils';
import runBatchedCursor from './runBatchedCursor';
function changes(opts, api, dbName, idb) {
opts = clone(opts);
if (opts.continuous) {
var id = dbName + ':' + uuid();
changesHandler.addListener(dbName, id, api, opts);
changesHandler.notify(dbName);
return {
cancel: function () {
changesHandler.removeListener(dbName, id);
}
};
}
var docIds = opts.doc_ids && new Set(opts.doc_ids);
opts.since = opts.since || 0;
var lastSeq = opts.since;
var limit = 'limit' in opts ? opts.limit : -1;
if (limit === 0) {
limit = 1; // per CouchDB _changes spec
}
var results = [];
var numResults = 0;
var filter = filterChange(opts);
var docIdsToMetadata = new Map();
var txn;
var bySeqStore;
var docStore;
var docIdRevIndex;
function onBatch(batchKeys, batchValues, cursor) {
if (!cursor || !batchKeys.length) { // done
return;
}
var winningDocs = new Array(batchKeys.length);
var metadatas = new Array(batchKeys.length);
function processMetadataAndWinningDoc(metadata, winningDoc) {
var change = opts.processChange(winningDoc, metadata, opts);
lastSeq = change.seq = metadata.seq;
var filtered = filter(change);
if (typeof filtered === 'object') { // anything but true/false indicates error
return Promise.reject(filtered);
}
if (!filtered) {
return Promise.resolve();
}
numResults++;
if (opts.return_docs) {
results.push(change);
}
// process the attachment immediately
// for the benefit of live listeners
if (opts.attachments && opts.include_docs) {
return new Promise(function (resolve) {
fetchAttachmentsIfNecessary(winningDoc, opts, txn, function () {
postProcessAttachments([change], opts.binary).then(function () {
resolve(change);
});
});
});
} else {
return Promise.resolve(change);
}
}
function onBatchDone() {
var promises = [];
for (var i = 0, len = winningDocs.length; i < len; i++) {
if (numResults === limit) {
break;
}
var winningDoc = winningDocs[i];
if (!winningDoc) {
continue;
}
var metadata = metadatas[i];
promises.push(processMetadataAndWinningDoc(metadata, winningDoc));
}
Promise.all(promises).then(function (changes) {
for (var i = 0, len = changes.length; i < len; i++) {
if (changes[i]) {
opts.onChange(changes[i]);
}
}
}).catch(opts.complete);
if (numResults !== limit) {
cursor.continue();
}
}
// Fetch all metadatas/winningdocs from this batch in parallel, then process
// them all only once all data has been collected. This is done in parallel
// because it's faster than doing it one-at-a-time.
var numDone = 0;
batchValues.forEach(function (value, i) {
var doc = decodeDoc(value);
var seq = batchKeys[i];
fetchWinningDocAndMetadata(doc, seq, function (metadata, winningDoc) {
metadatas[i] = metadata;
winningDocs[i] = winningDoc;
if (++numDone === batchKeys.length) {
onBatchDone();
}
});
});
}
function onGetMetadata(doc, seq, metadata, cb) {
if (metadata.seq !== seq) {
// some other seq is later
return cb();
}
if (metadata.winningRev === doc._rev) {
// this is the winning doc
return cb(metadata, doc);
}
// fetch winning doc in separate request
var docIdRev = doc._id + '::' + metadata.winningRev;
var req = docIdRevIndex.get(docIdRev);
req.onsuccess = function (e) {
cb(metadata, decodeDoc(e.target.result));
};
}
function fetchWinningDocAndMetadata(doc, seq, cb) {
if (docIds && !docIds.has(doc._id)) {
return cb();
}
var metadata = docIdsToMetadata.get(doc._id);
if (metadata) { // cached
return onGetMetadata(doc, seq, metadata, cb);
}
// metadata not cached, have to go fetch it
docStore.get(doc._id).onsuccess = function (e) {
metadata = decodeMetadata(e.target.result);
docIdsToMetadata.set(doc._id, metadata);
onGetMetadata(doc, seq, metadata, cb);
};
}
function finish() {
opts.complete(null, {
results,
last_seq: lastSeq
});
}
function onTxnComplete() {
if (!opts.continuous && opts.attachments) {
// cannot guarantee that postProcessing was already done,
// so do it again
postProcessAttachments(results).then(finish);
} else {
finish();
}
}
var objectStores = [DOC_STORE, BY_SEQ_STORE];
if (opts.attachments) {
objectStores.push(ATTACH_STORE);
}
var txnResult = openTransactionSafely(idb, objectStores, 'readonly');
if (txnResult.error) {
return opts.complete(txnResult.error);
}
txn = txnResult.txn;
txn.onabort = idbError(opts.complete);
txn.oncomplete = onTxnComplete;
bySeqStore = txn.objectStore(BY_SEQ_STORE);
docStore = txn.objectStore(DOC_STORE);
docIdRevIndex = bySeqStore.index('_doc_id_rev');
var keyRange = (opts.since && !opts.descending) ?
IDBKeyRange.lowerBound(opts.since, true) : null;
runBatchedCursor(bySeqStore, keyRange, opts.descending, limit, onBatch);
}
export default changes;