blob: beee21e65b5ecf262cb30541d88c315d608a2e6e [file] [log] [blame]
'use strict';
import { DOC_STORE, processAttachment } from './util';
import { uuid, filterChange } from 'pouchdb-utils';
export default function (txn, idbChanges, api, dbOpts, opts) {
if (txn.error) {
return opts.complete(txn.error);
}
if (opts.continuous) {
const id = dbOpts.name + ':' + uuid();
idbChanges.addListener(dbOpts.name, id, api, opts);
idbChanges.notify(dbOpts.name);
return {
cancel: function () {
idbChanges.removeListener(dbOpts.name, id);
}
};
}
let limit = 'limit' in opts ? opts.limit : -1;
if (limit === 0) {
limit = 1;
}
const store = txn.txn.objectStore(DOC_STORE).index('seq');
const filter = filterChange(opts);
let received = 0;
let lastSeq = opts.since || 0;
const results = [];
const processing = [];
function onReqSuccess(e) {
if (!e.target.result) { return; }
const cursor = e.target.result;
const doc = cursor.value;
// Overwrite doc.data, which may have been rewritten (see rewrite.js) with
// the clean version for that rev
doc.data = doc.revs[doc.rev].data;
doc.data._id = doc.id;
doc.data._rev = doc.rev;
if (doc.deleted) {
doc.data._deleted = true;
}
if (opts.doc_ids && opts.doc_ids.indexOf(doc.id) === -1) {
return cursor.continue();
}
// WARNING: expecting possible old format
const change = opts.processChange(doc.data, doc, opts);
change.seq = doc.seq;
lastSeq = doc.seq;
const filtered = filter(change);
// If its an error
if (typeof filtered === 'object') {
return opts.complete(filtered);
}
if (filtered) {
received++;
if (opts.return_docs) {
results.push(change);
}
if (opts.include_docs && opts.attachments && doc.data._attachments) {
const promises = [];
for (const name in doc.data._attachments) {
const p = processAttachment(name, doc, change.doc, opts.binary, api.blobSupport);
// We add the processing promise to 2 arrays, one tracks all
// the promises needed before we fire onChange, the other
// ensure we process all attachments before onComplete
promises.push(p);
processing.push(p);
}
Promise.all(promises).then(function () {
opts.onChange(change);
});
} else {
opts.onChange(change);
}
}
if (received !== limit) {
cursor.continue();
}
}
function onTxnComplete() {
Promise.all(processing).then(function () {
opts.complete(null, {
results,
last_seq: lastSeq
});
});
}
let req;
if (opts.descending) {
req = store.openCursor(null, 'prev');
} else {
req = store.openCursor(IDBKeyRange.lowerBound(opts.since, true));
}
txn.txn.oncomplete = onTxnComplete;
req.onsuccess = onReqSuccess;
}