| import { |
| guardedConsole, |
| nextTick, |
| isRemote |
| } from 'pouchdb-utils'; |
| |
| import { |
| base64StringToBlobOrBuffer as b64ToBluffer |
| } from 'pouchdb-binary-utils'; |
| |
| import { |
| collate, |
| toIndexableString, |
| normalizeKey, |
| parseIndexableString |
| } from 'pouchdb-collate'; |
| |
| import { generateErrorFromResponse } from 'pouchdb-errors'; |
| import { Headers } from 'pouchdb-fetch'; |
| import TaskQueue from './taskqueue'; |
| import createView from './createView'; |
| import { |
| callbackify, |
| sequentialize, |
| uniq, |
| fin, |
| promisedCallback, |
| mapToKeysArray, |
| QueryParseError, |
| NotFoundError, |
| BuiltInError |
| } from 'pouchdb-mapreduce-utils'; |
| |
| const persistentQueues = {}; |
| const tempViewQueue = new TaskQueue(); |
| const CHANGES_BATCH_SIZE = 50; |
| |
| function parseViewName(name) { |
| // can be either 'ddocname/viewname' or just 'viewname' |
| // (where the ddoc name is the same) |
| return name.indexOf('/') === -1 ? [name, name] : name.split('/'); |
| } |
| |
| function isGenOne(changes) { |
| // only return true if the current change is 1- |
| // and there are no other leafs |
| return changes.length === 1 && /^1-/.test(changes[0].rev); |
| } |
| |
| function emitError(db, e, data) { |
| try { |
| db.emit('error', e); |
| } catch (err) { |
| guardedConsole('error', |
| 'The user\'s map/reduce function threw an uncaught error.\n' + |
| 'You can debug this error by doing:\n' + |
| 'myDatabase.on(\'error\', function (err) { debugger; });\n' + |
| 'Please double-check your map/reduce function.'); |
| guardedConsole('error', e, data); |
| } |
| } |
| |
| /** |
| * Returns an "abstract" mapreduce object of the form: |
| * |
| * { |
| * query: queryFun, |
| * viewCleanup: viewCleanupFun |
| * } |
| * |
| * Arguments are: |
| * |
| * localDoc: string |
| * This is for the local doc that gets saved in order to track the |
| * "dependent" DBs and clean them up for viewCleanup. It should be |
| * unique, so that indexer plugins don't collide with each other. |
| * mapper: function (mapFunDef, emit) |
| * Returns a map function based on the mapFunDef, which in the case of |
| * normal map/reduce is just the de-stringified function, but may be |
| * something else, such as an object in the case of pouchdb-find. |
| * reducer: function (reduceFunDef) |
| * Ditto, but for reducing. Modules don't have to support reducing |
| * (e.g. pouchdb-find). |
| * ddocValidator: function (ddoc, viewName) |
| * Throws an error if the ddoc or viewName is not valid. |
| * This could be a way to communicate to the user that the configuration for the |
| * indexer is invalid. |
| */ |
| function createAbstractMapReduce(localDocName, mapper, reducer, ddocValidator) { |
| |
| function tryMap(db, fun, doc) { |
| // emit an event if there was an error thrown by a map function. |
| // putting try/catches in a single function also avoids deoptimizations. |
| try { |
| fun(doc); |
| } catch (e) { |
| emitError(db, e, {fun, doc}); |
| } |
| } |
| |
| function tryReduce(db, fun, keys, values, rereduce) { |
| // same as above, but returning the result or an error. there are two separate |
| // functions to avoid extra memory allocations since the tryCode() case is used |
| // for custom map functions (common) vs this function, which is only used for |
| // custom reduce functions (rare) |
| try { |
| return {output : fun(keys, values, rereduce)}; |
| } catch (e) { |
| emitError(db, e, {fun, keys, values, rereduce}); |
| return {error: e}; |
| } |
| } |
| |
| function sortByKeyThenValue(x, y) { |
| const keyCompare = collate(x.key, y.key); |
| return keyCompare !== 0 ? keyCompare : collate(x.value, y.value); |
| } |
| |
| function sliceResults(results, limit, skip) { |
| skip = skip || 0; |
| if (typeof limit === 'number') { |
| return results.slice(skip, limit + skip); |
| } else if (skip > 0) { |
| return results.slice(skip); |
| } |
| return results; |
| } |
| |
| function rowToDocId(row) { |
| const val = row.value; |
| // Users can explicitly specify a joined doc _id, or it |
| // defaults to the doc _id that emitted the key/value. |
| const docId = (val && typeof val === 'object' && val._id) || row.id; |
| return docId; |
| } |
| |
| function readAttachmentsAsBlobOrBuffer(res) { |
| for (const row of res.rows) { |
| const atts = row.doc && row.doc._attachments; |
| if (!atts) { |
| continue; |
| } |
| for (const filename of Object.keys(atts)) { |
| const att = atts[filename]; |
| atts[filename].data = b64ToBluffer(att.data, att.content_type); |
| } |
| } |
| } |
| |
| function postprocessAttachments(opts) { |
| return function (res) { |
| if (opts.include_docs && opts.attachments && opts.binary) { |
| readAttachmentsAsBlobOrBuffer(res); |
| } |
| return res; |
| }; |
| } |
| |
| function addHttpParam(paramName, opts, params, asJson) { |
| // add an http param from opts to params, optionally json-encoded |
| let val = opts[paramName]; |
| if (typeof val !== 'undefined') { |
| if (asJson) { |
| val = encodeURIComponent(JSON.stringify(val)); |
| } |
| params.push(paramName + '=' + val); |
| } |
| } |
| |
| function coerceInteger(integerCandidate) { |
| if (typeof integerCandidate !== 'undefined') { |
| const asNumber = Number(integerCandidate); |
| // prevents e.g. '1foo' or '1.1' being coerced to 1 |
| if (!isNaN(asNumber) && asNumber === parseInt(integerCandidate, 10)) { |
| return asNumber; |
| } else { |
| return integerCandidate; |
| } |
| } |
| } |
| |
| function coerceOptions(opts) { |
| opts.group_level = coerceInteger(opts.group_level); |
| opts.limit = coerceInteger(opts.limit); |
| opts.skip = coerceInteger(opts.skip); |
| return opts; |
| } |
| |
| function checkPositiveInteger(number) { |
| if (number) { |
| if (typeof number !== 'number') { |
| return new QueryParseError(`Invalid value for integer: "${number}"`); |
| } |
| if (number < 0) { |
| return new QueryParseError(`Invalid value for positive integer: "${number}"`); |
| } |
| } |
| } |
| |
| function checkQueryParseError(options, fun) { |
| const startkeyName = options.descending ? 'endkey' : 'startkey'; |
| const endkeyName = options.descending ? 'startkey' : 'endkey'; |
| |
| if (typeof options[startkeyName] !== 'undefined' && |
| typeof options[endkeyName] !== 'undefined' && |
| collate(options[startkeyName], options[endkeyName]) > 0) { |
| throw new QueryParseError('No rows can match your key range, ' + |
| 'reverse your start_key and end_key or set {descending : true}'); |
| } else if (fun.reduce && options.reduce !== false) { |
| if (options.include_docs) { |
| throw new QueryParseError('{include_docs:true} is invalid for reduce'); |
| } else if (options.keys && options.keys.length > 1 && |
| !options.group && !options.group_level) { |
| throw new QueryParseError('Multi-key fetches for reduce views must use ' + |
| '{group: true}'); |
| } |
| } |
| for (const optionName of ['group_level', 'limit', 'skip']) { |
| const error = checkPositiveInteger(options[optionName]); |
| if (error) { |
| throw error; |
| } |
| } |
| } |
| |
| async function httpQuery(db, fun, opts) { |
| // List of parameters to add to the PUT request |
| let params = []; |
| let body; |
| let method = 'GET'; |
| let ok; |
| |
| // If opts.reduce exists and is defined, then add it to the list |
| // of parameters. |
| // If reduce=false then the results are that of only the map function |
| // not the final result of map and reduce. |
| addHttpParam('reduce', opts, params); |
| addHttpParam('include_docs', opts, params); |
| addHttpParam('attachments', opts, params); |
| addHttpParam('limit', opts, params); |
| addHttpParam('descending', opts, params); |
| addHttpParam('group', opts, params); |
| addHttpParam('group_level', opts, params); |
| addHttpParam('skip', opts, params); |
| addHttpParam('stale', opts, params); |
| addHttpParam('conflicts', opts, params); |
| addHttpParam('startkey', opts, params, true); |
| addHttpParam('start_key', opts, params, true); |
| addHttpParam('endkey', opts, params, true); |
| addHttpParam('end_key', opts, params, true); |
| addHttpParam('inclusive_end', opts, params); |
| addHttpParam('key', opts, params, true); |
| addHttpParam('update_seq', opts, params); |
| |
| // Format the list of parameters into a valid URI query string |
| params = params.join('&'); |
| params = params === '' ? '' : '?' + params; |
| |
| // If keys are supplied, issue a POST to circumvent GET query string limits |
| // see http://wiki.apache.org/couchdb/HTTP_view_API#Querying_Options |
| if (typeof opts.keys !== 'undefined') { |
| const MAX_URL_LENGTH = 2000; |
| // according to http://stackoverflow.com/a/417184/680742, |
| // the de facto URL length limit is 2000 characters |
| |
| const keysAsString = `keys=${encodeURIComponent(JSON.stringify(opts.keys))}`; |
| if (keysAsString.length + params.length + 1 <= MAX_URL_LENGTH) { |
| // If the keys are short enough, do a GET. we do this to work around |
| // Safari not understanding 304s on POSTs (see pouchdb/pouchdb#1239) |
| params += (params[0] === '?' ? '&' : '?') + keysAsString; |
| } else { |
| method = 'POST'; |
| if (typeof fun === 'string') { |
| body = {keys: opts.keys}; |
| } else { // fun is {map : mapfun}, so append to this |
| fun.keys = opts.keys; |
| } |
| } |
| } |
| |
| // We are referencing a query defined in the design doc |
| if (typeof fun === 'string') { |
| const parts = parseViewName(fun); |
| |
| const response = await db.fetch('_design/' + parts[0] + '/_view/' + parts[1] + params, { |
| headers: new Headers({'Content-Type': 'application/json'}), |
| method, |
| body: JSON.stringify(body) |
| }); |
| ok = response.ok; |
| // status = response.status; |
| const result = await response.json(); |
| |
| if (!ok) { |
| result.status = response.status; |
| throw generateErrorFromResponse(result); |
| } |
| |
| // fail the entire request if the result contains an error |
| for (const row of result.rows) { |
| /* istanbul ignore if */ |
| if (row.value && row.value.error && row.value.error === "builtin_reduce_error") { |
| throw new Error(row.reason); |
| } |
| } |
| |
| return new Promise(function (resolve) { |
| resolve(result); |
| }).then(postprocessAttachments(opts)); |
| } |
| |
| // We are using a temporary view, terrible for performance, good for testing |
| body = body || {}; |
| for (const key of Object.keys(fun)) { |
| if (Array.isArray(fun[key])) { |
| body[key] = fun[key]; |
| } else { |
| body[key] = fun[key].toString(); |
| } |
| } |
| |
| const response = await db.fetch('_temp_view' + params, { |
| headers: new Headers({'Content-Type': 'application/json'}), |
| method: 'POST', |
| body: JSON.stringify(body) |
| }); |
| |
| ok = response.ok; |
| // status = response.status; |
| const result = await response.json(); |
| if (!ok) { |
| result.status = response.status; |
| throw generateErrorFromResponse(result); |
| } |
| |
| return new Promise(function (resolve) { |
| resolve(result); |
| }).then(postprocessAttachments(opts)); |
| } |
| |
| // custom adapters can define their own api._query |
| // and override the default behavior |
| /* istanbul ignore next */ |
| function customQuery(db, fun, opts) { |
| return new Promise(function (resolve, reject) { |
| db._query(fun, opts, function (err, res) { |
| if (err) { |
| return reject(err); |
| } |
| resolve(res); |
| }); |
| }); |
| } |
| |
| // custom adapters can define their own api._viewCleanup |
| // and override the default behavior |
| /* istanbul ignore next */ |
| function customViewCleanup(db) { |
| return new Promise(function (resolve, reject) { |
| db._viewCleanup(function (err, res) { |
| if (err) { |
| return reject(err); |
| } |
| resolve(res); |
| }); |
| }); |
| } |
| |
| function defaultsTo(value) { |
| return function (reason) { |
| /* istanbul ignore else */ |
| if (reason.status === 404) { |
| return value; |
| } else { |
| throw reason; |
| } |
| }; |
| } |
| |
| // returns a promise for a list of docs to update, based on the input docId. |
| // the order doesn't matter, because post-3.2.0, bulkDocs |
| // is an atomic operation in all three adapters. |
| async function getDocsToPersist(docId, view, docIdsToChangesAndEmits) { |
| const metaDocId = '_local/doc_' + docId; |
| const defaultMetaDoc = {_id: metaDocId, keys: []}; |
| const docData = docIdsToChangesAndEmits.get(docId); |
| const indexableKeysToKeyValues = docData[0]; |
| const changes = docData[1]; |
| |
| function getMetaDoc() { |
| if (isGenOne(changes)) { |
| // generation 1, so we can safely assume initial state |
| // for performance reasons (avoids unnecessary GETs) |
| return Promise.resolve(defaultMetaDoc); |
| } |
| return view.db.get(metaDocId).catch(defaultsTo(defaultMetaDoc)); |
| } |
| |
| function getKeyValueDocs(metaDoc) { |
| if (!metaDoc.keys.length) { |
| // no keys, no need for a lookup |
| return Promise.resolve({rows: []}); |
| } |
| return view.db.allDocs({ |
| keys: metaDoc.keys, |
| include_docs: true |
| }); |
| } |
| |
| function processKeyValueDocs(metaDoc, kvDocsRes) { |
| const kvDocs = []; |
| const oldKeys = new Set(); |
| |
| for (const row of kvDocsRes.rows) { |
| const doc = row.doc; |
| if (!doc) { // deleted |
| continue; |
| } |
| kvDocs.push(doc); |
| oldKeys.add(doc._id); |
| doc._deleted = !indexableKeysToKeyValues.has(doc._id); |
| if (!doc._deleted) { |
| const keyValue = indexableKeysToKeyValues.get(doc._id); |
| if ('value' in keyValue) { |
| doc.value = keyValue.value; |
| } |
| } |
| } |
| const newKeys = mapToKeysArray(indexableKeysToKeyValues); |
| for (const key of newKeys) { |
| if (!oldKeys.has(key)) { |
| // new doc |
| const kvDoc = { |
| _id: key |
| }; |
| const keyValue = indexableKeysToKeyValues.get(key); |
| if ('value' in keyValue) { |
| kvDoc.value = keyValue.value; |
| } |
| kvDocs.push(kvDoc); |
| } |
| } |
| metaDoc.keys = uniq(newKeys.concat(metaDoc.keys)); |
| kvDocs.push(metaDoc); |
| |
| return kvDocs; |
| } |
| |
| const metaDoc = await getMetaDoc(); |
| const keyValueDocs = await getKeyValueDocs(metaDoc); |
| return processKeyValueDocs(metaDoc, keyValueDocs); |
| } |
| |
| function updatePurgeSeq(view) { |
| // with this approach, we just assume to have processed all missing purges and write the latest |
| // purgeSeq into the _local/purgeSeq doc. |
| return view.sourceDB.get('_local/purges').then(function (res) { |
| const purgeSeq = res.purgeSeq; |
| return view.db.get('_local/purgeSeq').then(function (res) { |
| return res._rev; |
| }) |
| .catch(defaultsTo(undefined)) |
| .then(function (rev) { |
| return view.db.put({ |
| _id: '_local/purgeSeq', |
| _rev: rev, |
| purgeSeq, |
| }); |
| }); |
| }).catch(function (err) { |
| if (err.status !== 404) { |
| throw err; |
| } |
| }); |
| } |
| |
| // updates all emitted key/value docs and metaDocs in the mrview database |
| // for the given batch of documents from the source database |
| function saveKeyValues(view, docIdsToChangesAndEmits, seq) { |
| var seqDocId = '_local/lastSeq'; |
| return view.db.get(seqDocId) |
| .catch(defaultsTo({_id: seqDocId, seq: 0})) |
| .then(function (lastSeqDoc) { |
| var docIds = mapToKeysArray(docIdsToChangesAndEmits); |
| return Promise.all(docIds.map(function (docId) { |
| return getDocsToPersist(docId, view, docIdsToChangesAndEmits); |
| })).then(function (listOfDocsToPersist) { |
| var docsToPersist = listOfDocsToPersist.flat(); |
| lastSeqDoc.seq = seq; |
| docsToPersist.push(lastSeqDoc); |
| // write all docs in a single operation, update the seq once |
| return view.db.bulkDocs({docs : docsToPersist}); |
| }) |
| // TODO: this should be placed somewhere else, probably? we're querying both docs twice |
| // (first time when getting the actual purges). |
| .then(() => updatePurgeSeq(view)); |
| }); |
| } |
| |
| function getQueue(view) { |
| const viewName = typeof view === 'string' ? view : view.name; |
| let queue = persistentQueues[viewName]; |
| if (!queue) { |
| queue = persistentQueues[viewName] = new TaskQueue(); |
| } |
| return queue; |
| } |
| |
| async function updateView(view, opts) { |
| return sequentialize(getQueue(view), function () { |
| return updateViewInQueue(view, opts); |
| })(); |
| } |
| |
| async function updateViewInQueue(view, opts) { |
| // bind the emit function once |
| let mapResults; |
| let doc; |
| let taskId; |
| |
| function emit(key, value) { |
| const output = {id: doc._id, key: normalizeKey(key)}; |
| // Don't explicitly store the value unless it's defined and non-null. |
| // This saves on storage space, because often people don't use it. |
| if (typeof value !== 'undefined' && value !== null) { |
| output.value = normalizeKey(value); |
| } |
| mapResults.push(output); |
| } |
| |
| const mapFun = mapper(view.mapFun, emit); |
| |
| let currentSeq = view.seq || 0; |
| |
| function createTask() { |
| return view.sourceDB.info().then(function (info) { |
| taskId = view.sourceDB.activeTasks.add({ |
| name: 'view_indexing', |
| total_items: info.update_seq - currentSeq, |
| }); |
| }); |
| } |
| |
| function processChange(docIdsToChangesAndEmits, seq) { |
| return function () { |
| return saveKeyValues(view, docIdsToChangesAndEmits, seq); |
| }; |
| } |
| |
| let indexed_docs = 0; |
| const progress = { |
| view: view.name, |
| indexed_docs |
| }; |
| view.sourceDB.emit('indexing', progress); |
| |
| const queue = new TaskQueue(); |
| |
| async function processNextBatch() { |
| const response = await view.sourceDB.changes({ |
| return_docs: true, |
| conflicts: true, |
| include_docs: true, |
| style: 'all_docs', |
| since: currentSeq, |
| limit: opts.changes_batch_size |
| }); |
| const purges = await getRecentPurges(); |
| return processBatch(response, purges); |
| } |
| |
| function getRecentPurges() { |
| return view.db.get('_local/purgeSeq').then(function (res) { |
| return res.purgeSeq; |
| }) |
| .catch(defaultsTo(-1)) |
| .then(function (purgeSeq) { |
| return view.sourceDB.get('_local/purges').then(function (res) { |
| const recentPurges = res.purges.filter(function (purge, index) { |
| return index > purgeSeq; |
| }).map((purge) => purge.docId); |
| |
| const uniquePurges = recentPurges.filter(function (docId, index) { |
| return recentPurges.indexOf(docId) === index; |
| }); |
| |
| return Promise.all(uniquePurges.map(function (docId) { |
| return view.sourceDB.get(docId).then(function (doc) { |
| return { docId, doc }; |
| }) |
| .catch(defaultsTo({ docId })); |
| })); |
| }) |
| .catch(defaultsTo([])); |
| }); |
| } |
| |
| function processBatch(response, purges) { |
| const results = response.results; |
| if (!results.length && !purges.length) { |
| return; |
| } |
| |
| for (const purge of purges) { |
| const index = results.findIndex(function (change) { |
| return change.id === purge.docId; |
| }); |
| if (index < 0) { |
| // mimic a db.remove() on the changes feed |
| const entry = { |
| _id: purge.docId, |
| doc: { |
| _id: purge.docId, |
| _deleted: 1, |
| }, |
| changes: [], |
| }; |
| |
| if (purge.doc) { |
| // update with new winning rev after purge |
| entry.doc = purge.doc; |
| entry.changes.push({ rev: purge.doc._rev }); |
| } |
| |
| results.push(entry); |
| } |
| } |
| |
| const docIdsToChangesAndEmits = createDocIdsToChangesAndEmits(results); |
| |
| queue.add(processChange(docIdsToChangesAndEmits, currentSeq)); |
| |
| indexed_docs = indexed_docs + results.length; |
| const progress = { |
| view: view.name, |
| last_seq: response.last_seq, |
| results_count: results.length, |
| indexed_docs |
| }; |
| view.sourceDB.emit('indexing', progress); |
| view.sourceDB.activeTasks.update(taskId, {completed_items: indexed_docs}); |
| |
| if (results.length < opts.changes_batch_size) { |
| return; |
| } |
| return processNextBatch(); |
| } |
| |
| function createDocIdsToChangesAndEmits(results) { |
| const docIdsToChangesAndEmits = new Map(); |
| for (const change of results) { |
| if (change.doc._id[0] !== '_') { |
| mapResults = []; |
| doc = change.doc; |
| |
| if (!doc._deleted) { |
| tryMap(view.sourceDB, mapFun, doc); |
| } |
| mapResults.sort(sortByKeyThenValue); |
| |
| const indexableKeysToKeyValues = createIndexableKeysToKeyValues(mapResults); |
| docIdsToChangesAndEmits.set(change.doc._id, [ |
| indexableKeysToKeyValues, |
| change.changes |
| ]); |
| } |
| currentSeq = change.seq; |
| } |
| return docIdsToChangesAndEmits; |
| } |
| |
| function createIndexableKeysToKeyValues(mapResults) { |
| const indexableKeysToKeyValues = new Map(); |
| let lastKey; |
| for (let i = 0, len = mapResults.length; i < len; i++) { |
| const emittedKeyValue = mapResults[i]; |
| const complexKey = [emittedKeyValue.key, emittedKeyValue.id]; |
| if (i > 0 && collate(emittedKeyValue.key, lastKey) === 0) { |
| complexKey.push(i); // dup key+id, so make it unique |
| } |
| indexableKeysToKeyValues.set(toIndexableString(complexKey), emittedKeyValue); |
| lastKey = emittedKeyValue.key; |
| } |
| return indexableKeysToKeyValues; |
| } |
| |
| try { |
| await createTask(); |
| await processNextBatch(); |
| await queue.finish(); |
| view.seq = currentSeq; |
| view.sourceDB.activeTasks.remove(taskId); |
| } catch (error) { |
| view.sourceDB.activeTasks.remove(taskId, error); |
| } |
| } |
| |
| function reduceView(view, results, options) { |
| if (options.group_level === 0) { |
| delete options.group_level; |
| } |
| |
| const shouldGroup = options.group || options.group_level; |
| const reduceFun = reducer(view.reduceFun); |
| const groups = []; |
| const lvl = isNaN(options.group_level) |
| ? Number.POSITIVE_INFINITY |
| : options.group_level; |
| |
| for (const result of results) { |
| const last = groups[groups.length - 1]; |
| let groupKey = shouldGroup ? result.key : null; |
| |
| // only set group_level for array keys |
| if (shouldGroup && Array.isArray(groupKey)) { |
| groupKey = groupKey.slice(0, lvl); |
| } |
| |
| if (last && collate(last.groupKey, groupKey) === 0) { |
| last.keys.push([result.key, result.id]); |
| last.values.push(result.value); |
| continue; |
| } |
| groups.push({ |
| keys: [[result.key, result.id]], |
| values: [result.value], |
| groupKey |
| }); |
| } |
| |
| results = []; |
| for (const group of groups) { |
| const reduceTry = tryReduce(view.sourceDB, reduceFun, group.keys, group.values, false); |
| if (reduceTry.error && reduceTry.error instanceof BuiltInError) { |
| // CouchDB returns an error if a built-in errors out |
| throw reduceTry.error; |
| } |
| results.push({ |
| // CouchDB just sets the value to null if a non-built-in errors out |
| value: reduceTry.error ? null : reduceTry.output, |
| key: group.groupKey |
| }); |
| } |
| // no total_rows/offset when reducing |
| return { rows: sliceResults(results, options.limit, options.skip) }; |
| } |
| |
| function queryView(view, opts) { |
| return sequentialize(getQueue(view), function () { |
| return queryViewInQueue(view, opts); |
| })(); |
| } |
| |
| async function queryViewInQueue(view, opts) { |
| let totalRows; |
| const shouldReduce = view.reduceFun && opts.reduce !== false; |
| const skip = opts.skip || 0; |
| if (typeof opts.keys !== 'undefined' && !opts.keys.length) { |
| // equivalent query |
| opts.limit = 0; |
| delete opts.keys; |
| } |
| |
| async function fetchFromView(viewOpts) { |
| viewOpts.include_docs = true; |
| const res = await view.db.allDocs(viewOpts); |
| totalRows = res.total_rows; |
| |
| return res.rows.map(function (result) { |
| // implicit migration - in older versions of PouchDB, |
| // we explicitly stored the doc as {id: ..., key: ..., value: ...} |
| // this is tested in a migration test |
| /* istanbul ignore next */ |
| if ('value' in result.doc && typeof result.doc.value === 'object' && |
| result.doc.value !== null) { |
| const keys = Object.keys(result.doc.value).sort(); |
| // this detection method is not perfect, but it's unlikely the user |
| // emitted a value which was an object with these 3 exact keys |
| const expectedKeys = ['id', 'key', 'value']; |
| if (!(keys < expectedKeys || keys > expectedKeys)) { |
| return result.doc.value; |
| } |
| } |
| |
| const parsedKeyAndDocId = parseIndexableString(result.doc._id); |
| return { |
| key: parsedKeyAndDocId[0], |
| id: parsedKeyAndDocId[1], |
| value: ('value' in result.doc ? result.doc.value : null) |
| }; |
| }); |
| } |
| |
| async function onMapResultsReady(rows) { |
| let finalResults; |
| if (shouldReduce) { |
| finalResults = reduceView(view, rows, opts); |
| } else if (typeof opts.keys === 'undefined') { |
| finalResults = { |
| total_rows: totalRows, |
| offset: skip, |
| rows |
| }; |
| } else { |
| // support limit, skip for keys query |
| finalResults = { |
| total_rows: totalRows, |
| offset: skip, |
| rows: sliceResults(rows,opts.limit,opts.skip) |
| }; |
| } |
| /* istanbul ignore if */ |
| if (opts.update_seq) { |
| finalResults.update_seq = view.seq; |
| } |
| if (opts.include_docs) { |
| const docIds = uniq(rows.map(rowToDocId)); |
| |
| const allDocsRes = await view.sourceDB.allDocs({ |
| keys: docIds, |
| include_docs: true, |
| conflicts: opts.conflicts, |
| attachments: opts.attachments, |
| binary: opts.binary |
| }); |
| const docIdsToDocs = new Map(); |
| for (const row of allDocsRes.rows) { |
| docIdsToDocs.set(row.id, row.doc); |
| } |
| for (const row of rows) { |
| const docId = rowToDocId(row); |
| const doc = docIdsToDocs.get(docId); |
| if (doc) { |
| row.doc = doc; |
| } |
| } |
| } |
| return finalResults; |
| } |
| |
| if (typeof opts.keys !== 'undefined') { |
| const keys = opts.keys; |
| const fetchPromises = keys.map(function (key) { |
| const viewOpts = { |
| startkey : toIndexableString([key]), |
| endkey : toIndexableString([key, {}]) |
| }; |
| /* istanbul ignore if */ |
| if (opts.update_seq) { |
| viewOpts.update_seq = true; |
| } |
| return fetchFromView(viewOpts); |
| }); |
| const result = await Promise.all(fetchPromises); |
| const flattenedResult = result.flat(); |
| return onMapResultsReady(flattenedResult); |
| } else { // normal query, no 'keys' |
| const viewOpts = { |
| descending : opts.descending |
| }; |
| /* istanbul ignore if */ |
| if (opts.update_seq) { |
| viewOpts.update_seq = true; |
| } |
| let startkey; |
| let endkey; |
| if ('start_key' in opts) { |
| startkey = opts.start_key; |
| } |
| if ('startkey' in opts) { |
| startkey = opts.startkey; |
| } |
| if ('end_key' in opts) { |
| endkey = opts.end_key; |
| } |
| if ('endkey' in opts) { |
| endkey = opts.endkey; |
| } |
| if (typeof startkey !== 'undefined') { |
| viewOpts.startkey = opts.descending ? |
| toIndexableString([startkey, {}]) : |
| toIndexableString([startkey]); |
| } |
| if (typeof endkey !== 'undefined') { |
| let inclusiveEnd = opts.inclusive_end !== false; |
| if (opts.descending) { |
| inclusiveEnd = !inclusiveEnd; |
| } |
| |
| viewOpts.endkey = toIndexableString( |
| inclusiveEnd ? [endkey, {}] : [endkey]); |
| } |
| if (typeof opts.key !== 'undefined') { |
| const keyStart = toIndexableString([opts.key]); |
| const keyEnd = toIndexableString([opts.key, {}]); |
| if (viewOpts.descending) { |
| viewOpts.endkey = keyStart; |
| viewOpts.startkey = keyEnd; |
| } else { |
| viewOpts.startkey = keyStart; |
| viewOpts.endkey = keyEnd; |
| } |
| } |
| if (!shouldReduce) { |
| if (typeof opts.limit === 'number') { |
| viewOpts.limit = opts.limit; |
| } |
| viewOpts.skip = skip; |
| } |
| |
| const result = await fetchFromView(viewOpts); |
| return onMapResultsReady(result); |
| } |
| } |
| |
| async function httpViewCleanup(db) { |
| const response = await db.fetch('_view_cleanup', { |
| headers: new Headers({'Content-Type': 'application/json'}), |
| method: 'POST' |
| }); |
| return response.json(); |
| } |
| |
| async function localViewCleanup(db) { |
| try { |
| const metaDoc = await db.get('_local/' + localDocName); |
| const docsToViews = new Map(); |
| |
| for (const fullViewName of Object.keys(metaDoc.views)) { |
| const parts = parseViewName(fullViewName); |
| const designDocName = '_design/' + parts[0]; |
| const viewName = parts[1]; |
| let views = docsToViews.get(designDocName); |
| if (!views) { |
| views = new Set(); |
| docsToViews.set(designDocName, views); |
| } |
| views.add(viewName); |
| } |
| const opts = { |
| keys : mapToKeysArray(docsToViews), |
| include_docs : true |
| }; |
| |
| const res = await db.allDocs(opts); |
| const viewsToStatus = {}; |
| for (const row of res.rows) { |
| const ddocName = row.key.substring(8); // cuts off '_design/' |
| for (const viewName of docsToViews.get(row.key)) { |
| let fullViewName = ddocName + '/' + viewName; |
| /* istanbul ignore if */ |
| if (!metaDoc.views[fullViewName]) { |
| // new format, without slashes, to support PouchDB 2.2.0 |
| // migration test in pouchdb's browser.migration.js verifies this |
| fullViewName = viewName; |
| } |
| const viewDBNames = Object.keys(metaDoc.views[fullViewName]); |
| // design doc deleted, or view function nonexistent |
| const statusIsGood = row.doc && row.doc.views && |
| row.doc.views[viewName]; |
| for (const viewDBName of viewDBNames) { |
| viewsToStatus[viewDBName] = viewsToStatus[viewDBName] || statusIsGood; |
| } |
| } |
| } |
| |
| const dbsToDelete = Object.keys(viewsToStatus) |
| .filter(function (viewDBName) { return !viewsToStatus[viewDBName]; }); |
| |
| const destroyPromises = dbsToDelete.map(function (viewDBName) { |
| return sequentialize(getQueue(viewDBName), function () { |
| return new db.constructor(viewDBName, db.__opts).destroy(); |
| })(); |
| }); |
| |
| return Promise.all(destroyPromises).then(function () { |
| return {ok: true}; |
| }); |
| } catch (err) { |
| if (err.status === 404) { |
| return {ok: true}; |
| } else { |
| throw err; |
| } |
| } |
| } |
| |
| async function queryPromised(db, fun, opts) { |
| /* istanbul ignore next */ |
| if (typeof db._query === 'function') { |
| return customQuery(db, fun, opts); |
| } |
| if (isRemote(db)) { |
| return httpQuery(db, fun, opts); |
| } |
| |
| const updateViewOpts = { |
| changes_batch_size: db.__opts.view_update_changes_batch_size || CHANGES_BATCH_SIZE |
| }; |
| |
| if (typeof fun !== 'string') { |
| // temp_view |
| checkQueryParseError(opts, fun); |
| |
| tempViewQueue.add(async function () { |
| const view = await createView( |
| /* sourceDB */ db, |
| /* viewName */ 'temp_view/temp_view', |
| /* mapFun */ fun.map, |
| /* reduceFun */ fun.reduce, |
| /* temporary */ true, |
| /* localDocName */ localDocName); |
| |
| return fin(updateView(view, updateViewOpts).then( |
| function () { return queryView(view, opts); }), |
| function () { return view.db.destroy(); } |
| ); |
| }); |
| return tempViewQueue.finish(); |
| } else { |
| // persistent view |
| const fullViewName = fun; |
| const parts = parseViewName(fullViewName); |
| const designDocName = parts[0]; |
| const viewName = parts[1]; |
| |
| const doc = await db.get('_design/' + designDocName); |
| fun = doc.views && doc.views[viewName]; |
| |
| if (!fun) { |
| // basic validator; it's assumed that every subclass would want this |
| throw new NotFoundError(`ddoc ${doc._id} has no view named ${viewName}`); |
| } |
| |
| ddocValidator(doc, viewName); |
| checkQueryParseError(opts, fun); |
| |
| const view = await createView( |
| /* sourceDB */ db, |
| /* viewName */ fullViewName, |
| /* mapFun */ fun.map, |
| /* reduceFun */ fun.reduce, |
| /* temporary */ false, |
| /* localDocName */ localDocName); |
| |
| if (opts.stale === 'ok' || opts.stale === 'update_after') { |
| if (opts.stale === 'update_after') { |
| nextTick(function () { |
| updateView(view, updateViewOpts); |
| }); |
| } |
| return queryView(view, opts); |
| } else { // stale not ok |
| await updateView(view, updateViewOpts); |
| return queryView(view, opts); |
| } |
| } |
| } |
| |
| function abstractQuery(fun, opts, callback) { |
| const db = this; |
| if (typeof opts === 'function') { |
| callback = opts; |
| opts = {}; |
| } |
| opts = opts ? coerceOptions(opts) : {}; |
| |
| if (typeof fun === 'function') { |
| fun = {map : fun}; |
| } |
| |
| const promise = Promise.resolve().then(function () { |
| return queryPromised(db, fun, opts); |
| }); |
| promisedCallback(promise, callback); |
| return promise; |
| } |
| |
| const abstractViewCleanup = callbackify(function () { |
| const db = this; |
| /* istanbul ignore next */ |
| if (typeof db._viewCleanup === 'function') { |
| return customViewCleanup(db); |
| } |
| if (isRemote(db)) { |
| return httpViewCleanup(db); |
| } |
| return localViewCleanup(db); |
| }); |
| |
| return { |
| query: abstractQuery, |
| viewCleanup: abstractViewCleanup |
| }; |
| } |
| |
| export default createAbstractMapReduce; |