| import changesHandler from './changesHandler'; |
| import { |
| clone, |
| filterChange, |
| uuid |
| } from 'pouchdb-utils'; |
| import { |
| ATTACH_STORE, |
| BY_SEQ_STORE, |
| DOC_STORE |
| } from './constants'; |
| import { |
| decodeDoc, |
| decodeMetadata, |
| fetchAttachmentsIfNecessary, |
| idbError, |
| postProcessAttachments, |
| openTransactionSafely |
| } from './utils'; |
| import runBatchedCursor from './runBatchedCursor'; |
| |
| function changes(opts, api, dbName, idb) { |
| opts = clone(opts); |
| |
| if (opts.continuous) { |
| var id = dbName + ':' + uuid(); |
| changesHandler.addListener(dbName, id, api, opts); |
| changesHandler.notify(dbName); |
| return { |
| cancel: function () { |
| changesHandler.removeListener(dbName, id); |
| } |
| }; |
| } |
| |
| var docIds = opts.doc_ids && new Set(opts.doc_ids); |
| |
| opts.since = opts.since || 0; |
| var lastSeq = opts.since; |
| |
| var limit = 'limit' in opts ? opts.limit : -1; |
| if (limit === 0) { |
| limit = 1; // per CouchDB _changes spec |
| } |
| |
| var results = []; |
| var numResults = 0; |
| var filter = filterChange(opts); |
| var docIdsToMetadata = new Map(); |
| |
| var txn; |
| var bySeqStore; |
| var docStore; |
| var docIdRevIndex; |
| |
| function onBatch(batchKeys, batchValues, cursor) { |
| if (!cursor || !batchKeys.length) { // done |
| return; |
| } |
| |
| var winningDocs = new Array(batchKeys.length); |
| var metadatas = new Array(batchKeys.length); |
| |
| function processMetadataAndWinningDoc(metadata, winningDoc) { |
| var change = opts.processChange(winningDoc, metadata, opts); |
| lastSeq = change.seq = metadata.seq; |
| |
| var filtered = filter(change); |
| if (typeof filtered === 'object') { // anything but true/false indicates error |
| return Promise.reject(filtered); |
| } |
| |
| if (!filtered) { |
| return Promise.resolve(); |
| } |
| numResults++; |
| if (opts.return_docs) { |
| results.push(change); |
| } |
| // process the attachment immediately |
| // for the benefit of live listeners |
| if (opts.attachments && opts.include_docs) { |
| return new Promise(function (resolve) { |
| fetchAttachmentsIfNecessary(winningDoc, opts, txn, function () { |
| postProcessAttachments([change], opts.binary).then(function () { |
| resolve(change); |
| }); |
| }); |
| }); |
| } else { |
| return Promise.resolve(change); |
| } |
| } |
| |
| function onBatchDone() { |
| var promises = []; |
| for (var i = 0, len = winningDocs.length; i < len; i++) { |
| if (numResults === limit) { |
| break; |
| } |
| var winningDoc = winningDocs[i]; |
| if (!winningDoc) { |
| continue; |
| } |
| var metadata = metadatas[i]; |
| promises.push(processMetadataAndWinningDoc(metadata, winningDoc)); |
| } |
| |
| Promise.all(promises).then(function (changes) { |
| for (var i = 0, len = changes.length; i < len; i++) { |
| if (changes[i]) { |
| opts.onChange(changes[i]); |
| } |
| } |
| }).catch(opts.complete); |
| |
| if (numResults !== limit) { |
| cursor.continue(); |
| } |
| } |
| |
| // Fetch all metadatas/winningdocs from this batch in parallel, then process |
| // them all only once all data has been collected. This is done in parallel |
| // because it's faster than doing it one-at-a-time. |
| var numDone = 0; |
| batchValues.forEach(function (value, i) { |
| var doc = decodeDoc(value); |
| var seq = batchKeys[i]; |
| fetchWinningDocAndMetadata(doc, seq, function (metadata, winningDoc) { |
| metadatas[i] = metadata; |
| winningDocs[i] = winningDoc; |
| if (++numDone === batchKeys.length) { |
| onBatchDone(); |
| } |
| }); |
| }); |
| } |
| |
| function onGetMetadata(doc, seq, metadata, cb) { |
| if (metadata.seq !== seq) { |
| // some other seq is later |
| return cb(); |
| } |
| |
| if (metadata.winningRev === doc._rev) { |
| // this is the winning doc |
| return cb(metadata, doc); |
| } |
| |
| // fetch winning doc in separate request |
| var docIdRev = doc._id + '::' + metadata.winningRev; |
| var req = docIdRevIndex.get(docIdRev); |
| req.onsuccess = function (e) { |
| cb(metadata, decodeDoc(e.target.result)); |
| }; |
| } |
| |
| function fetchWinningDocAndMetadata(doc, seq, cb) { |
| if (docIds && !docIds.has(doc._id)) { |
| return cb(); |
| } |
| |
| var metadata = docIdsToMetadata.get(doc._id); |
| if (metadata) { // cached |
| return onGetMetadata(doc, seq, metadata, cb); |
| } |
| // metadata not cached, have to go fetch it |
| docStore.get(doc._id).onsuccess = function (e) { |
| metadata = decodeMetadata(e.target.result); |
| docIdsToMetadata.set(doc._id, metadata); |
| onGetMetadata(doc, seq, metadata, cb); |
| }; |
| } |
| |
| function finish() { |
| opts.complete(null, { |
| results, |
| last_seq: lastSeq |
| }); |
| } |
| |
| function onTxnComplete() { |
| if (!opts.continuous && opts.attachments) { |
| // cannot guarantee that postProcessing was already done, |
| // so do it again |
| postProcessAttachments(results).then(finish); |
| } else { |
| finish(); |
| } |
| } |
| |
| var objectStores = [DOC_STORE, BY_SEQ_STORE]; |
| if (opts.attachments) { |
| objectStores.push(ATTACH_STORE); |
| } |
| var txnResult = openTransactionSafely(idb, objectStores, 'readonly'); |
| if (txnResult.error) { |
| return opts.complete(txnResult.error); |
| } |
| txn = txnResult.txn; |
| txn.onabort = idbError(opts.complete); |
| txn.oncomplete = onTxnComplete; |
| |
| bySeqStore = txn.objectStore(BY_SEQ_STORE); |
| docStore = txn.objectStore(DOC_STORE); |
| docIdRevIndex = bySeqStore.index('_doc_id_rev'); |
| |
| var keyRange = (opts.since && !opts.descending) ? |
| IDBKeyRange.lowerBound(opts.since, true) : null; |
| |
| runBatchedCursor(bySeqStore, keyRange, opts.descending, limit, onBatch); |
| } |
| |
| export default changes; |