| import changesHandler from './changesHandler'; |
| import { |
| clone, |
| filterChange, |
| uuid |
| } from 'pouchdb-utils'; |
| import { |
| Map, |
| Set |
| } from 'pouchdb-collections'; |
| import { |
| ATTACH_STORE, |
| BY_SEQ_STORE, |
| DOC_STORE |
| } from './constants'; |
| import { |
| decodeDoc, |
| decodeMetadata, |
| fetchAttachmentsIfNecessary, |
| idbError, |
| postProcessAttachments, |
| openTransactionSafely |
| } from './utils'; |
| |
| |
| function changes(opts, api, dbName, idb) { |
| opts = clone(opts); |
| |
| if (opts.continuous) { |
| var id = dbName + ':' + uuid(); |
| changesHandler.addListener(dbName, id, api, opts); |
| changesHandler.notify(dbName); |
| return { |
| cancel: function () { |
| changesHandler.removeListener(dbName, id); |
| } |
| }; |
| } |
| |
| var docIds = opts.doc_ids && new Set(opts.doc_ids); |
| |
| opts.since = opts.since || 0; |
| var lastSeq = opts.since; |
| |
| var limit = 'limit' in opts ? opts.limit : -1; |
| if (limit === 0) { |
| limit = 1; // per CouchDB _changes spec |
| } |
| var returnDocs; |
| if ('return_docs' in opts) { |
| returnDocs = opts.return_docs; |
| } else if ('returnDocs' in opts) { |
| // TODO: Remove 'returnDocs' in favor of 'return_docs' in a future release |
| returnDocs = opts.returnDocs; |
| } else { |
| returnDocs = true; |
| } |
| |
| var results = []; |
| var numResults = 0; |
| var filter = filterChange(opts); |
| var docIdsToMetadata = new Map(); |
| |
| var txn; |
| var bySeqStore; |
| var docStore; |
| var docIdRevIndex; |
| |
| function onGetCursor(cursor) { |
| |
| var doc = decodeDoc(cursor.value); |
| var seq = cursor.key; |
| |
| if (docIds && !docIds.has(doc._id)) { |
| return cursor.continue(); |
| } |
| |
| var metadata; |
| |
| function onGetMetadata() { |
| if (metadata.seq !== seq) { |
| // some other seq is later |
| return cursor.continue(); |
| } |
| |
| lastSeq = seq; |
| |
| if (metadata.winningRev === doc._rev) { |
| return onGetWinningDoc(doc); |
| } |
| |
| fetchWinningDoc(); |
| } |
| |
| function fetchWinningDoc() { |
| var docIdRev = doc._id + '::' + metadata.winningRev; |
| var req = docIdRevIndex.get(docIdRev); |
| req.onsuccess = function (e) { |
| onGetWinningDoc(decodeDoc(e.target.result)); |
| }; |
| } |
| |
| function onGetWinningDoc(winningDoc) { |
| |
| var change = opts.processChange(winningDoc, metadata, opts); |
| change.seq = metadata.seq; |
| |
| var filtered = filter(change); |
| if (typeof filtered === 'object') { |
| return opts.complete(filtered); |
| } |
| |
| if (filtered) { |
| numResults++; |
| if (returnDocs) { |
| results.push(change); |
| } |
| // process the attachment immediately |
| // for the benefit of live listeners |
| if (opts.attachments && opts.include_docs) { |
| fetchAttachmentsIfNecessary(winningDoc, opts, txn, function () { |
| postProcessAttachments([change], opts.binary).then(function () { |
| opts.onChange(change); |
| }); |
| }); |
| } else { |
| opts.onChange(change); |
| } |
| } |
| if (numResults !== limit) { |
| cursor.continue(); |
| } |
| } |
| |
| metadata = docIdsToMetadata.get(doc._id); |
| if (metadata) { // cached |
| return onGetMetadata(); |
| } |
| // metadata not cached, have to go fetch it |
| docStore.get(doc._id).onsuccess = function (event) { |
| metadata = decodeMetadata(event.target.result); |
| docIdsToMetadata.set(doc._id, metadata); |
| onGetMetadata(); |
| }; |
| } |
| |
| function onsuccess(event) { |
| var cursor = event.target.result; |
| |
| if (!cursor) { |
| return; |
| } |
| onGetCursor(cursor); |
| } |
| |
| function fetchChanges() { |
| var objectStores = [DOC_STORE, BY_SEQ_STORE]; |
| if (opts.attachments) { |
| objectStores.push(ATTACH_STORE); |
| } |
| var txnResult = openTransactionSafely(idb, objectStores, 'readonly'); |
| if (txnResult.error) { |
| return opts.complete(txnResult.error); |
| } |
| txn = txnResult.txn; |
| txn.onabort = idbError(opts.complete); |
| txn.oncomplete = onTxnComplete; |
| |
| bySeqStore = txn.objectStore(BY_SEQ_STORE); |
| docStore = txn.objectStore(DOC_STORE); |
| docIdRevIndex = bySeqStore.index('_doc_id_rev'); |
| |
| var req; |
| |
| if (opts.descending) { |
| req = bySeqStore.openCursor(null, 'prev'); |
| } else { |
| req = bySeqStore.openCursor(IDBKeyRange.lowerBound(opts.since, true)); |
| } |
| |
| req.onsuccess = onsuccess; |
| } |
| |
| fetchChanges(); |
| |
| function onTxnComplete() { |
| |
| function finish() { |
| opts.complete(null, { |
| results: results, |
| last_seq: lastSeq |
| }); |
| } |
| |
| if (!opts.continuous && opts.attachments) { |
| // cannot guarantee that postProcessing was already done, |
| // so do it again |
| postProcessAttachments(results).then(finish); |
| } else { |
| finish(); |
| } |
| } |
| } |
| |
| export default changes; |