blob: b5e8979471a4c9124d4c02409e19a5cac7e33ea2 [file] [log] [blame]
import levelup from 'levelup';
import sublevel from 'sublevel-pouchdb';
import { obj as through } from 'through2';
import getArguments from 'argsarray';
import Deque from 'double-ended-queue';
import bufferFrom from 'buffer-from'; // ponyfill for Node <6
import {
clone,
changesHandler as Changes,
filterChange,
functionName,
uuid,
nextTick
} from 'pouchdb-utils';
import {
isDeleted,
isLocalId,
parseDoc,
processDocs
} from 'pouchdb-adapter-utils';
import {
winningRev as calculateWinningRev,
traverseRevTree,
compactTree,
collectConflicts,
latest as getLatest
} from 'pouchdb-merge';
import {
safeJsonParse,
safeJsonStringify
} from 'pouchdb-json';
import {
binaryMd5
} from 'pouchdb-md5';
import {
atob,
binaryStringToBlobOrBuffer as binStringToBluffer
} from 'pouchdb-binary-utils';
import readAsBluffer from './readAsBlobOrBuffer';
import prepareAttachmentForStorage from './prepareAttachmentForStorage';
import createEmptyBluffer from './createEmptyBlobOrBuffer';
import LevelTransaction from './transaction';
import {
MISSING_DOC,
REV_CONFLICT,
NOT_OPEN,
BAD_ARG,
MISSING_STUB,
createError
} from 'pouchdb-errors';
var DOC_STORE = 'document-store';
var BY_SEQ_STORE = 'by-sequence';
var ATTACHMENT_STORE = 'attach-store';
var BINARY_STORE = 'attach-binary-store';
var LOCAL_STORE = 'local-store';
var META_STORE = 'meta-store';
// leveldb barks if we try to open a db multiple times
// so we cache opened connections here for initstore()
var dbStores = new Map();
// store the value of update_seq in the by-sequence store the key name will
// never conflict, since the keys in the by-sequence store are integers
var UPDATE_SEQ_KEY = '_local_last_update_seq';
var DOC_COUNT_KEY = '_local_doc_count';
var UUID_KEY = '_local_uuid';
var MD5_PREFIX = 'md5-';
var safeJsonEncoding = {
encode: safeJsonStringify,
decode: safeJsonParse,
buffer: false,
type: 'cheap-json'
};
var levelChanges = new Changes();
// winningRev and deleted are performance-killers, but
// in newer versions of PouchDB, they are cached on the metadata
function getWinningRev(metadata) {
return 'winningRev' in metadata ?
metadata.winningRev : calculateWinningRev(metadata);
}
function getIsDeleted(metadata, winningRev) {
return 'deleted' in metadata ?
metadata.deleted : isDeleted(metadata, winningRev);
}
function fetchAttachment(att, stores, opts) {
var type = att.content_type;
return new Promise(function (resolve, reject) {
stores.binaryStore.get(att.digest, function (err, buffer) {
var data;
if (err) {
/* istanbul ignore if */
if (err.name !== 'NotFoundError') {
return reject(err);
} else {
// empty
if (!opts.binary) {
data = '';
} else {
data = binStringToBluffer('', type);
}
}
} else { // non-empty
if (opts.binary) {
data = readAsBluffer(buffer, type);
} else {
data = buffer.toString('base64');
}
}
delete att.stub;
delete att.length;
att.data = data;
resolve();
});
});
}
function fetchAttachments(results, stores, opts) {
var atts = [];
results.forEach(function (row) {
if (!(row.doc && row.doc._attachments)) {
return;
}
var attNames = Object.keys(row.doc._attachments);
attNames.forEach(function (attName) {
var att = row.doc._attachments[attName];
if (!('data' in att)) {
atts.push(att);
}
});
});
return Promise.all(atts.map(function (att) {
return fetchAttachment(att, stores, opts);
}));
}
function LevelPouch(opts, callback) {
opts = clone(opts);
var api = this;
var instanceId;
var stores = {};
var revLimit = opts.revs_limit;
var db;
var name = opts.name;
// TODO: this is undocumented and unused probably
/* istanbul ignore else */
if (typeof opts.createIfMissing === 'undefined') {
opts.createIfMissing = true;
}
var leveldown = opts.db;
var dbStore;
var leveldownName = functionName(leveldown);
if (dbStores.has(leveldownName)) {
dbStore = dbStores.get(leveldownName);
} else {
dbStore = new Map();
dbStores.set(leveldownName, dbStore);
}
if (dbStore.has(name)) {
db = dbStore.get(name);
afterDBCreated();
} else {
dbStore.set(name, sublevel(levelup(name, opts, function (err) {
/* istanbul ignore if */
if (err) {
dbStore.delete(name);
return callback(err);
}
db = dbStore.get(name);
db._docCount = -1;
db._queue = new Deque();
/* istanbul ignore else */
if (typeof opts.migrate === 'object') { // migration for leveldown
opts.migrate.doMigrationOne(name, db, afterDBCreated);
} else {
afterDBCreated();
}
})));
}
function afterDBCreated() {
stores.docStore = db.sublevel(DOC_STORE, {valueEncoding: safeJsonEncoding});
stores.bySeqStore = db.sublevel(BY_SEQ_STORE, {valueEncoding: 'json'});
stores.attachmentStore =
db.sublevel(ATTACHMENT_STORE, {valueEncoding: 'json'});
stores.binaryStore = db.sublevel(BINARY_STORE, {valueEncoding: 'binary'});
stores.localStore = db.sublevel(LOCAL_STORE, {valueEncoding: 'json'});
stores.metaStore = db.sublevel(META_STORE, {valueEncoding: 'json'});
/* istanbul ignore else */
if (typeof opts.migrate === 'object') { // migration for leveldown
opts.migrate.doMigrationTwo(db, stores, afterLastMigration);
} else {
afterLastMigration();
}
}
function afterLastMigration() {
stores.metaStore.get(UPDATE_SEQ_KEY, function (err, value) {
if (typeof db._updateSeq === 'undefined') {
db._updateSeq = value || 0;
}
stores.metaStore.get(DOC_COUNT_KEY, function (err, value) {
db._docCount = !err ? value : 0;
stores.metaStore.get(UUID_KEY, function (err, value) {
instanceId = !err ? value : uuid();
stores.metaStore.put(UUID_KEY, instanceId, function () {
nextTick(function () {
callback(null, api);
});
});
});
});
});
}
function countDocs(callback) {
/* istanbul ignore if */
if (db.isClosed()) {
return callback(new Error('database is closed'));
}
return callback(null, db._docCount); // use cached value
}
api.type = function () {
return 'leveldb';
};
api._id = function (callback) {
callback(null, instanceId);
};
api._info = function (callback) {
var res = {
doc_count: db._docCount,
update_seq: db._updateSeq,
backend_adapter: functionName(leveldown)
};
return nextTick(function () {
callback(null, res);
});
};
function tryCode(fun, args) {
try {
fun.apply(null, args);
} catch (err) {
args[args.length - 1](err);
}
}
function executeNext() {
var firstTask = db._queue.peekFront();
if (firstTask.type === 'read') {
runReadOperation(firstTask);
} else { // write, only do one at a time
runWriteOperation(firstTask);
}
}
function runReadOperation(firstTask) {
// do multiple reads at once simultaneously, because it's safe
var readTasks = [firstTask];
var i = 1;
var nextTask = db._queue.get(i);
while (typeof nextTask !== 'undefined' && nextTask.type === 'read') {
readTasks.push(nextTask);
i++;
nextTask = db._queue.get(i);
}
var numDone = 0;
readTasks.forEach(function (readTask) {
var args = readTask.args;
var callback = args[args.length - 1];
args[args.length - 1] = getArguments(function (cbArgs) {
callback.apply(null, cbArgs);
if (++numDone === readTasks.length) {
nextTick(function () {
// all read tasks have finished
readTasks.forEach(function () {
db._queue.shift();
});
if (db._queue.length) {
executeNext();
}
});
}
});
tryCode(readTask.fun, args);
});
}
function runWriteOperation(firstTask) {
var args = firstTask.args;
var callback = args[args.length - 1];
args[args.length - 1] = getArguments(function (cbArgs) {
callback.apply(null, cbArgs);
nextTick(function () {
db._queue.shift();
if (db._queue.length) {
executeNext();
}
});
});
tryCode(firstTask.fun, args);
}
// all read/write operations to the database are done in a queue,
// similar to how websql/idb works. this avoids problems such
// as e.g. compaction needing to have a lock on the database while
// it updates stuff. in the future we can revisit this.
function writeLock(fun) {
return getArguments(function (args) {
db._queue.push({
fun: fun,
args: args,
type: 'write'
});
if (db._queue.length === 1) {
nextTick(executeNext);
}
});
}
// same as the writelock, but multiple can run at once
function readLock(fun) {
return getArguments(function (args) {
db._queue.push({
fun: fun,
args: args,
type: 'read'
});
if (db._queue.length === 1) {
nextTick(executeNext);
}
});
}
function formatSeq(n) {
return ('0000000000000000' + n).slice(-16);
}
function parseSeq(s) {
return parseInt(s, 10);
}
api._get = readLock(function (id, opts, callback) {
opts = clone(opts);
stores.docStore.get(id, function (err, metadata) {
if (err || !metadata) {
return callback(createError(MISSING_DOC, 'missing'));
}
var rev;
if(!opts.rev) {
rev = getWinningRev(metadata);
var deleted = getIsDeleted(metadata, rev);
if (deleted) {
return callback(createError(MISSING_DOC, "deleted"));
}
} else {
rev = opts.latest ? getLatest(opts.rev, metadata) : opts.rev;
}
var seq = metadata.rev_map[rev];
stores.bySeqStore.get(formatSeq(seq), function (err, doc) {
if (!doc) {
return callback(createError(MISSING_DOC));
}
/* istanbul ignore if */
if ('_id' in doc && doc._id !== metadata.id) {
// this failing implies something very wrong
return callback(new Error('wrong doc returned'));
}
doc._id = metadata.id;
if ('_rev' in doc) {
/* istanbul ignore if */
if (doc._rev !== rev) {
// this failing implies something very wrong
return callback(new Error('wrong doc returned'));
}
} else {
// we didn't always store this
doc._rev = rev;
}
return callback(null, {doc: doc, metadata: metadata});
});
});
});
// not technically part of the spec, but if putAttachment has its own
// method...
api._getAttachment = function (docId, attachId, attachment, opts, callback) {
var digest = attachment.digest;
var type = attachment.content_type;
stores.binaryStore.get(digest, function (err, attach) {
if (err) {
/* istanbul ignore if */
if (err.name !== 'NotFoundError') {
return callback(err);
}
// Empty attachment
return callback(null, opts.binary ? createEmptyBluffer(type) : '');
}
if (opts.binary) {
callback(null, readAsBluffer(attach, type));
} else {
callback(null, attach.toString('base64'));
}
});
};
api._bulkDocs = writeLock(function (req, opts, callback) {
var newEdits = opts.new_edits;
var results = new Array(req.docs.length);
var fetchedDocs = new Map();
var stemmedRevs = new Map();
var txn = new LevelTransaction();
var docCountDelta = 0;
var newUpdateSeq = db._updateSeq;
// parse the docs and give each a sequence number
var userDocs = req.docs;
var docInfos = userDocs.map(function (doc) {
if (doc._id && isLocalId(doc._id)) {
return doc;
}
var newDoc = parseDoc(doc, newEdits);
if (newDoc.metadata && !newDoc.metadata.rev_map) {
newDoc.metadata.rev_map = {};
}
return newDoc;
});
var infoErrors = docInfos.filter(function (doc) {
return doc.error;
});
if (infoErrors.length) {
return callback(infoErrors[0]);
}
// verify any stub attachments as a precondition test
function verifyAttachment(digest, callback) {
txn.get(stores.attachmentStore, digest, function (levelErr) {
if (levelErr) {
var err = createError(MISSING_STUB,
'unknown stub attachment with digest ' +
digest);
callback(err);
} else {
callback();
}
});
}
function verifyAttachments(finish) {
var digests = [];
userDocs.forEach(function (doc) {
if (doc && doc._attachments) {
Object.keys(doc._attachments).forEach(function (filename) {
var att = doc._attachments[filename];
if (att.stub) {
digests.push(att.digest);
}
});
}
});
if (!digests.length) {
return finish();
}
var numDone = 0;
var err;
digests.forEach(function (digest) {
verifyAttachment(digest, function (attErr) {
if (attErr && !err) {
err = attErr;
}
if (++numDone === digests.length) {
finish(err);
}
});
});
}
function fetchExistingDocs(finish) {
var numDone = 0;
var overallErr;
function checkDone() {
if (++numDone === userDocs.length) {
return finish(overallErr);
}
}
userDocs.forEach(function (doc) {
if (doc._id && isLocalId(doc._id)) {
// skip local docs
return checkDone();
}
txn.get(stores.docStore, doc._id, function (err, info) {
if (err) {
/* istanbul ignore if */
if (err.name !== 'NotFoundError') {
overallErr = err;
}
} else {
fetchedDocs.set(doc._id, info);
}
checkDone();
});
});
}
function compact(revsMap, callback) {
var promise = Promise.resolve();
revsMap.forEach(function (revs, docId) {
// TODO: parallelize, for now need to be sequential to
// pass orphaned attachment tests
promise = promise.then(function () {
return new Promise(function (resolve, reject) {
api._doCompactionNoLock(docId, revs, {ctx: txn}, function (err) {
/* istanbul ignore if */
if (err) {
return reject(err);
}
resolve();
});
});
});
});
promise.then(function () {
callback();
}, callback);
}
function autoCompact(callback) {
var revsMap = new Map();
fetchedDocs.forEach(function (metadata, docId) {
revsMap.set(docId, compactTree(metadata));
});
compact(revsMap, callback);
}
function finish() {
compact(stemmedRevs, function (error) {
/* istanbul ignore if */
if (error) {
complete(error);
}
if (api.auto_compaction) {
return autoCompact(complete);
}
complete();
});
}
function writeDoc(docInfo, winningRev, winningRevIsDeleted, newRevIsDeleted,
isUpdate, delta, resultsIdx, callback2) {
docCountDelta += delta;
var err = null;
var recv = 0;
docInfo.metadata.winningRev = winningRev;
docInfo.metadata.deleted = winningRevIsDeleted;
docInfo.data._id = docInfo.metadata.id;
docInfo.data._rev = docInfo.metadata.rev;
if (newRevIsDeleted) {
docInfo.data._deleted = true;
}
if (docInfo.stemmedRevs.length) {
stemmedRevs.set(docInfo.metadata.id, docInfo.stemmedRevs);
}
var attachments = docInfo.data._attachments ?
Object.keys(docInfo.data._attachments) :
[];
function attachmentSaved(attachmentErr) {
recv++;
if (!err) {
/* istanbul ignore if */
if (attachmentErr) {
err = attachmentErr;
callback2(err);
} else if (recv === attachments.length) {
finish();
}
}
}
function onMD5Load(doc, key, data, attachmentSaved) {
return function (result) {
saveAttachment(doc, MD5_PREFIX + result, key, data, attachmentSaved);
};
}
function doMD5(doc, key, attachmentSaved) {
return function (data) {
binaryMd5(data, onMD5Load(doc, key, data, attachmentSaved));
};
}
for (var i = 0; i < attachments.length; i++) {
var key = attachments[i];
var att = docInfo.data._attachments[key];
if (att.stub) {
// still need to update the refs mapping
var id = docInfo.data._id;
var rev = docInfo.data._rev;
saveAttachmentRefs(id, rev, att.digest, attachmentSaved);
continue;
}
var data;
if (typeof att.data === 'string') {
// input is assumed to be a base64 string
try {
data = atob(att.data);
} catch (e) {
callback(createError(BAD_ARG,
'Attachment is not a valid base64 string'));
return;
}
doMD5(docInfo, key, attachmentSaved)(data);
} else {
prepareAttachmentForStorage(att.data,
doMD5(docInfo, key, attachmentSaved));
}
}
function finish() {
var seq = docInfo.metadata.rev_map[docInfo.metadata.rev];
/* istanbul ignore if */
if (seq) {
// check that there aren't any existing revisions with the same
// revision id, else we shouldn't do anything
return callback2();
}
seq = ++newUpdateSeq;
docInfo.metadata.rev_map[docInfo.metadata.rev] =
docInfo.metadata.seq = seq;
var seqKey = formatSeq(seq);
var batch = [{
key: seqKey,
value: docInfo.data,
prefix: stores.bySeqStore,
type: 'put'
}, {
key: docInfo.metadata.id,
value: docInfo.metadata,
prefix: stores.docStore,
type: 'put'
}];
txn.batch(batch);
results[resultsIdx] = {
ok: true,
id: docInfo.metadata.id,
rev: docInfo.metadata.rev
};
fetchedDocs.set(docInfo.metadata.id, docInfo.metadata);
callback2();
}
if (!attachments.length) {
finish();
}
}
// attachments are queued per-digest, otherwise the refs could be
// overwritten by concurrent writes in the same bulkDocs session
var attachmentQueues = {};
function saveAttachmentRefs(id, rev, digest, callback) {
function fetchAtt() {
return new Promise(function (resolve, reject) {
txn.get(stores.attachmentStore, digest, function (err, oldAtt) {
/* istanbul ignore if */
if (err && err.name !== 'NotFoundError') {
return reject(err);
}
resolve(oldAtt);
});
});
}
function saveAtt(oldAtt) {
var ref = [id, rev].join('@');
var newAtt = {};
if (oldAtt) {
if (oldAtt.refs) {
// only update references if this attachment already has them
// since we cannot migrate old style attachments here without
// doing a full db scan for references
newAtt.refs = oldAtt.refs;
newAtt.refs[ref] = true;
}
} else {
newAtt.refs = {};
newAtt.refs[ref] = true;
}
return new Promise(function (resolve) {
txn.batch([{
type: 'put',
prefix: stores.attachmentStore,
key: digest,
value: newAtt
}]);
resolve(!oldAtt);
});
}
// put attachments in a per-digest queue, to avoid two docs with the same
// attachment overwriting each other
var queue = attachmentQueues[digest] || Promise.resolve();
attachmentQueues[digest] = queue.then(function () {
return fetchAtt().then(saveAtt).then(function (isNewAttachment) {
callback(null, isNewAttachment);
}, callback);
});
}
function saveAttachment(docInfo, digest, key, data, callback) {
var att = docInfo.data._attachments[key];
delete att.data;
att.digest = digest;
att.length = data.length;
var id = docInfo.metadata.id;
var rev = docInfo.metadata.rev;
att.revpos = parseInt(rev, 10);
saveAttachmentRefs(id, rev, digest, function (err, isNewAttachment) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
// do not try to store empty attachments
if (data.length === 0) {
return callback(err);
}
if (!isNewAttachment) {
// small optimization - don't bother writing it again
return callback(err);
}
txn.batch([{
type: 'put',
prefix: stores.binaryStore,
key: digest,
value: bufferFrom(data, 'binary')
}]);
callback();
});
}
function complete(err) {
/* istanbul ignore if */
if (err) {
return nextTick(function () {
callback(err);
});
}
txn.batch([
{
prefix: stores.metaStore,
type: 'put',
key: UPDATE_SEQ_KEY,
value: newUpdateSeq
},
{
prefix: stores.metaStore,
type: 'put',
key: DOC_COUNT_KEY,
value: db._docCount + docCountDelta
}
]);
txn.execute(db, function (err) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
db._docCount += docCountDelta;
db._updateSeq = newUpdateSeq;
levelChanges.notify(name);
nextTick(function () {
callback(null, results);
});
});
}
if (!docInfos.length) {
return callback(null, []);
}
verifyAttachments(function (err) {
if (err) {
return callback(err);
}
fetchExistingDocs(function (err) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
processDocs(revLimit, docInfos, api, fetchedDocs, txn, results,
writeDoc, opts, finish);
});
});
});
api._allDocs = readLock(function (opts, callback) {
opts = clone(opts);
countDocs(function (err, docCount) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
var readstreamOpts = {};
var skip = opts.skip || 0;
if (opts.startkey) {
readstreamOpts.gte = opts.startkey;
}
if (opts.endkey) {
readstreamOpts.lte = opts.endkey;
}
if (opts.key) {
readstreamOpts.gte = readstreamOpts.lte = opts.key;
}
if (opts.descending) {
readstreamOpts.reverse = true;
// switch start and ends
var tmp = readstreamOpts.lte;
readstreamOpts.lte = readstreamOpts.gte;
readstreamOpts.gte = tmp;
}
var limit;
if (typeof opts.limit === 'number') {
limit = opts.limit;
}
if (limit === 0 ||
('start' in readstreamOpts && 'end' in readstreamOpts &&
readstreamOpts.start > readstreamOpts.end)) {
// should return 0 results when start is greater than end.
// normally level would "fix" this for us by reversing the order,
// so short-circuit instead
return callback(null, {
total_rows: docCount,
offset: opts.skip,
rows: []
});
}
var results = [];
var docstream = stores.docStore.readStream(readstreamOpts);
var throughStream = through(function (entry, _, next) {
var metadata = entry.value;
// winningRev and deleted are performance-killers, but
// in newer versions of PouchDB, they are cached on the metadata
var winningRev = getWinningRev(metadata);
var deleted = getIsDeleted(metadata, winningRev);
if (!deleted) {
if (skip-- > 0) {
next();
return;
} else if (typeof limit === 'number' && limit-- <= 0) {
docstream.unpipe();
docstream.destroy();
next();
return;
}
} else if (opts.deleted !== 'ok') {
next();
return;
}
function allDocsInner(data) {
var doc = {
id: metadata.id,
key: metadata.id,
value: {
rev: winningRev
}
};
if (opts.include_docs) {
doc.doc = data;
doc.doc._rev = doc.value.rev;
if (opts.conflicts) {
var conflicts = collectConflicts(metadata);
if (conflicts.length) {
doc.doc._conflicts = conflicts;
}
}
for (var att in doc.doc._attachments) {
if (doc.doc._attachments.hasOwnProperty(att)) {
doc.doc._attachments[att].stub = true;
}
}
}
if (opts.inclusive_end === false && metadata.id === opts.endkey) {
return next();
} else if (deleted) {
if (opts.deleted === 'ok') {
doc.value.deleted = true;
doc.doc = null;
} else {
/* istanbul ignore next */
return next();
}
}
results.push(doc);
next();
}
if (opts.include_docs) {
var seq = metadata.rev_map[winningRev];
stores.bySeqStore.get(formatSeq(seq), function (err, data) {
allDocsInner(data);
});
}
else {
allDocsInner();
}
}, function (next) {
Promise.resolve().then(function () {
if (opts.include_docs && opts.attachments) {
return fetchAttachments(results, stores, opts);
}
}).then(function () {
callback(null, {
total_rows: docCount,
offset: opts.skip,
rows: results
});
}, callback);
next();
}).on('unpipe', function () {
throughStream.end();
});
docstream.on('error', callback);
docstream.pipe(throughStream);
});
});
api._changes = function (opts) {
opts = clone(opts);
if (opts.continuous) {
var id = name + ':' + uuid();
levelChanges.addListener(name, id, api, opts);
levelChanges.notify(name);
return {
cancel: function () {
levelChanges.removeListener(name, id);
}
};
}
var descending = opts.descending;
var results = [];
var lastSeq = opts.since || 0;
var called = 0;
var streamOpts = {
reverse: descending
};
var limit;
if ('limit' in opts && opts.limit > 0) {
limit = opts.limit;
}
if (!streamOpts.reverse) {
streamOpts.start = formatSeq(opts.since || 0);
}
var docIds = opts.doc_ids && new Set(opts.doc_ids);
var filter = filterChange(opts);
var docIdsToMetadata = new Map();
var returnDocs;
if ('return_docs' in opts) {
returnDocs = opts.return_docs;
} else if ('returnDocs' in opts) {
// TODO: Remove 'returnDocs' in favor of 'return_docs' in a future release
returnDocs = opts.returnDocs;
} else {
returnDocs = true;
}
function complete() {
opts.done = true;
if (returnDocs && opts.limit) {
/* istanbul ignore if */
if (opts.limit < results.length) {
results.length = opts.limit;
}
}
changeStream.unpipe(throughStream);
changeStream.destroy();
if (!opts.continuous && !opts.cancelled) {
if (opts.include_docs && opts.attachments) {
fetchAttachments(results, stores, opts).then(function () {
opts.complete(null, {results: results, last_seq: lastSeq});
});
} else {
opts.complete(null, {results: results, last_seq: lastSeq});
}
}
}
var changeStream = stores.bySeqStore.readStream(streamOpts);
var throughStream = through(function (data, _, next) {
if (limit && called >= limit) {
complete();
return next();
}
if (opts.cancelled || opts.done) {
return next();
}
var seq = parseSeq(data.key);
var doc = data.value;
if (seq === opts.since && !descending) {
// couchdb ignores `since` if descending=true
return next();
}
if (docIds && !docIds.has(doc._id)) {
return next();
}
var metadata;
function onGetMetadata(metadata) {
var winningRev = getWinningRev(metadata);
function onGetWinningDoc(winningDoc) {
var change = opts.processChange(winningDoc, metadata, opts);
change.seq = metadata.seq;
var filtered = filter(change);
if (typeof filtered === 'object') {
return opts.complete(filtered);
}
if (filtered) {
called++;
if (opts.attachments && opts.include_docs) {
// fetch attachment immediately for the benefit
// of live listeners
fetchAttachments([change], stores, opts).then(function () {
opts.onChange(change);
});
} else {
opts.onChange(change);
}
if (returnDocs) {
results.push(change);
}
}
next();
}
if (metadata.seq !== seq) {
// some other seq is later
return next();
}
lastSeq = seq;
if (winningRev === doc._rev) {
return onGetWinningDoc(doc);
}
// fetch the winner
var winningSeq = metadata.rev_map[winningRev];
stores.bySeqStore.get(formatSeq(winningSeq), function (err, doc) {
onGetWinningDoc(doc);
});
}
metadata = docIdsToMetadata.get(doc._id);
if (metadata) { // cached
return onGetMetadata(metadata);
}
// metadata not cached, have to go fetch it
stores.docStore.get(doc._id, function (err, metadata) {
/* istanbul ignore if */
if (opts.cancelled || opts.done || db.isClosed() ||
isLocalId(metadata.id)) {
return next();
}
docIdsToMetadata.set(doc._id, metadata);
onGetMetadata(metadata);
});
}, function (next) {
if (opts.cancelled) {
return next();
}
if (returnDocs && opts.limit) {
/* istanbul ignore if */
if (opts.limit < results.length) {
results.length = opts.limit;
}
}
next();
}).on('unpipe', function () {
throughStream.end();
complete();
});
changeStream.pipe(throughStream);
return {
cancel: function () {
opts.cancelled = true;
complete();
}
};
};
api._close = function (callback) {
/* istanbul ignore if */
if (db.isClosed()) {
return callback(createError(NOT_OPEN));
}
db.close(function (err) {
/* istanbul ignore if */
if (err) {
callback(err);
} else {
dbStore.delete(name);
callback();
}
});
};
api._getRevisionTree = function (docId, callback) {
stores.docStore.get(docId, function (err, metadata) {
if (err) {
callback(createError(MISSING_DOC));
} else {
callback(null, metadata.rev_tree);
}
});
};
api._doCompaction = writeLock(function (docId, revs, opts, callback) {
api._doCompactionNoLock(docId, revs, opts, callback);
});
// the NoLock version is for use by bulkDocs
api._doCompactionNoLock = function (docId, revs, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
if (!revs.length) {
return callback();
}
var txn = opts.ctx || new LevelTransaction();
txn.get(stores.docStore, docId, function (err, metadata) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
var seqs = revs.map(function (rev) {
var seq = metadata.rev_map[rev];
delete metadata.rev_map[rev];
return seq;
});
traverseRevTree(metadata.rev_tree, function (isLeaf, pos,
revHash, ctx, opts) {
var rev = pos + '-' + revHash;
if (revs.indexOf(rev) !== -1) {
opts.status = 'missing';
}
});
var batch = [];
batch.push({
key: metadata.id,
value: metadata,
type: 'put',
prefix: stores.docStore
});
var digestMap = {};
var numDone = 0;
var overallErr;
function checkDone(err) {
/* istanbul ignore if */
if (err) {
overallErr = err;
}
if (++numDone === revs.length) { // done
/* istanbul ignore if */
if (overallErr) {
return callback(overallErr);
}
deleteOrphanedAttachments();
}
}
function finish(err) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
txn.batch(batch);
if (opts.ctx) {
// don't execute immediately
return callback();
}
txn.execute(db, callback);
}
function deleteOrphanedAttachments() {
var possiblyOrphanedAttachments = Object.keys(digestMap);
if (!possiblyOrphanedAttachments.length) {
return finish();
}
var numDone = 0;
var overallErr;
function checkDone(err) {
/* istanbul ignore if */
if (err) {
overallErr = err;
}
if (++numDone === possiblyOrphanedAttachments.length) {
finish(overallErr);
}
}
var refsToDelete = new Map();
revs.forEach(function (rev) {
refsToDelete.set(docId + '@' + rev, true);
});
possiblyOrphanedAttachments.forEach(function (digest) {
txn.get(stores.attachmentStore, digest, function (err, attData) {
/* istanbul ignore if */
if (err) {
if (err.name === 'NotFoundError') {
return checkDone();
} else {
return checkDone(err);
}
}
var refs = Object.keys(attData.refs || {}).filter(function (ref) {
return !refsToDelete.has(ref);
});
var newRefs = {};
refs.forEach(function (ref) {
newRefs[ref] = true;
});
if (refs.length) { // not orphaned
batch.push({
key: digest,
type: 'put',
value: {refs: newRefs},
prefix: stores.attachmentStore
});
} else { // orphaned, can safely delete
batch = batch.concat([{
key: digest,
type: 'del',
prefix: stores.attachmentStore
}, {
key: digest,
type: 'del',
prefix: stores.binaryStore
}]);
}
checkDone();
});
});
}
seqs.forEach(function (seq) {
batch.push({
key: formatSeq(seq),
type: 'del',
prefix: stores.bySeqStore
});
txn.get(stores.bySeqStore, formatSeq(seq), function (err, doc) {
/* istanbul ignore if */
if (err) {
if (err.name === 'NotFoundError') {
return checkDone();
} else {
return checkDone(err);
}
}
var atts = Object.keys(doc._attachments || {});
atts.forEach(function (attName) {
var digest = doc._attachments[attName].digest;
digestMap[digest] = true;
});
checkDone();
});
});
});
};
api._getLocal = function (id, callback) {
stores.localStore.get(id, function (err, doc) {
if (err) {
callback(createError(MISSING_DOC));
} else {
callback(null, doc);
}
});
};
api._putLocal = function (doc, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
if (opts.ctx) {
api._putLocalNoLock(doc, opts, callback);
} else {
api._putLocalWithLock(doc, opts, callback);
}
};
api._putLocalWithLock = writeLock(function (doc, opts, callback) {
api._putLocalNoLock(doc, opts, callback);
});
// the NoLock version is for use by bulkDocs
api._putLocalNoLock = function (doc, opts, callback) {
delete doc._revisions; // ignore this, trust the rev
var oldRev = doc._rev;
var id = doc._id;
var txn = opts.ctx || new LevelTransaction();
txn.get(stores.localStore, id, function (err, resp) {
if (err && oldRev) {
return callback(createError(REV_CONFLICT));
}
if (resp && resp._rev !== oldRev) {
return callback(createError(REV_CONFLICT));
}
doc._rev =
oldRev ? '0-' + (parseInt(oldRev.split('-')[1], 10) + 1) : '0-1';
var batch = [
{
type: 'put',
prefix: stores.localStore,
key: id,
value: doc
}
];
txn.batch(batch);
var ret = {ok: true, id: doc._id, rev: doc._rev};
if (opts.ctx) {
// don't execute immediately
return callback(null, ret);
}
txn.execute(db, function (err) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
callback(null, ret);
});
});
};
api._removeLocal = function (doc, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
if (opts.ctx) {
api._removeLocalNoLock(doc, opts, callback);
} else {
api._removeLocalWithLock(doc, opts, callback);
}
};
api._removeLocalWithLock = writeLock(function (doc, opts, callback) {
api._removeLocalNoLock(doc, opts, callback);
});
// the NoLock version is for use by bulkDocs
api._removeLocalNoLock = function (doc, opts, callback) {
var txn = opts.ctx || new LevelTransaction();
txn.get(stores.localStore, doc._id, function (err, resp) {
if (err) {
/* istanbul ignore if */
if (err.name !== 'NotFoundError') {
return callback(err);
} else {
return callback(createError(MISSING_DOC));
}
}
if (resp._rev !== doc._rev) {
return callback(createError(REV_CONFLICT));
}
txn.batch([{
prefix: stores.localStore,
type: 'del',
key: doc._id
}]);
var ret = {ok: true, id: doc._id, rev: '0-0'};
if (opts.ctx) {
// don't execute immediately
return callback(null, ret);
}
txn.execute(db, function (err) {
/* istanbul ignore if */
if (err) {
return callback(err);
}
callback(null, ret);
});
});
};
// close and delete open leveldb stores
api._destroy = function (opts, callback) {
var dbStore;
var leveldownName = functionName(leveldown);
/* istanbul ignore else */
if (dbStores.has(leveldownName)) {
dbStore = dbStores.get(leveldownName);
} else {
return callDestroy(name, callback);
}
/* istanbul ignore else */
if (dbStore.has(name)) {
levelChanges.removeAllListeners(name);
dbStore.get(name).close(function () {
dbStore.delete(name);
callDestroy(name, callback);
});
} else {
callDestroy(name, callback);
}
};
function callDestroy(name, cb) {
leveldown.destroy(name, cb);
}
}
export default LevelPouch;