blob: b9acb136866493124046f9332add33fa76667ec6 [file] [log] [blame]
'use strict';
var levelup = require('levelup');
var originalLeveldown = require('leveldown');
var sublevel = require('level-sublevel');
var through = require('through2').obj;
var errors = require('../deps/errors');
var merge = require('../merge');
var utils = require('../utils');
var migrate = require('../deps/migrate');
var vuvuzela = require('vuvuzela');
var Deque = require("double-ended-queue");
var DOC_STORE = 'document-store';
var BY_SEQ_STORE = 'by-sequence';
var ATTACHMENT_STORE = 'attach-store';
var BINARY_STORE = 'attach-binary-store';
var LOCAL_STORE = 'local-store';
var META_STORE = 'meta-store';
// leveldb barks if we try to open a db multiple times
// so we cache opened connections here for initstore()
var dbStores = new utils.Map();
// store the value of update_seq in the by-sequence store the key name will
// never conflict, since the keys in the by-sequence store are integers
var UPDATE_SEQ_KEY = '_local_last_update_seq';
var DOC_COUNT_KEY = '_local_doc_count';
var UUID_KEY = '_local_uuid';
var MD5_PREFIX = 'md5-';
var vuvuEncoding = {
encode: vuvuzela.stringify,
decode: vuvuzela.parse,
buffer: false,
type: 'cheap-json'
};
function LevelPouch(opts, callback) {
opts = utils.clone(opts);
var api = this;
var instanceId;
var stores = {};
var db;
var name = opts.name;
if (typeof opts.createIfMissing === 'undefined') {
opts.createIfMissing = true;
}
var leveldown = opts.db || originalLeveldown;
if (typeof leveldown.destroy !== 'function') {
leveldown.destroy = function (name, cb) { cb(); };
}
var dbStore;
if (dbStores.has(leveldown.name)) {
dbStore = dbStores.get(leveldown.name);
} else {
dbStore = new utils.Map();
dbStores.set(leveldown.name, dbStore);
}
if (dbStore.has(name)) {
db = dbStore.get(name);
afterDBCreated();
} else {
dbStore.set(name, sublevel(levelup(name, opts, function (err) {
if (err) {
dbStore.delete(name);
return callback(err);
}
db = dbStore.get(name);
db._docCountQueue = {
queue : [],
running : false,
docCount : -1
};
db._writeQueue = new Deque();
if (opts.db || opts.noMigrate) {
afterDBCreated();
} else {
migrate.toSublevel(name, db, afterDBCreated);
}
})));
}
function afterDBCreated() {
stores.docStore = db.sublevel(DOC_STORE, {valueEncoding: vuvuEncoding});
stores.bySeqStore = db.sublevel(BY_SEQ_STORE, {valueEncoding: 'json'});
stores.attachmentStore =
db.sublevel(ATTACHMENT_STORE, {valueEncoding: 'json'});
stores.binaryStore = db.sublevel(BINARY_STORE, {valueEncoding: 'binary'});
stores.localStore = db.sublevel(LOCAL_STORE, {valueEncoding: 'json'});
stores.metaStore = db.sublevel(META_STORE, {valueEncoding: 'json'});
migrate.localAndMetaStores(db, stores, function () {
stores.metaStore.get(UPDATE_SEQ_KEY, function (err, value) {
if (typeof db._updateSeq === 'undefined') {
db._updateSeq = value || 0;
}
stores.metaStore.get(DOC_COUNT_KEY, function (err, value) {
db._docCountQueue.docCount = !err ? value : 0;
countDocs(function (err) { // notify queue that the docCount is ready
if (err) {
api.emit('error', err);
}
stores.metaStore.get(UUID_KEY, function (err, value) {
instanceId = !err ? value : utils.uuid();
stores.metaStore.put(UUID_KEY, instanceId, function (err, value) {
process.nextTick(function () {
callback(null, api);
});
});
});
});
});
});
});
}
function countDocs(callback) {
if (db._docCountQueue.running || !db._docCountQueue.queue.length ||
db._docCountQueue.docCount === -1) {
return incrementDocCount(0, callback); // wait for fresh data
}
return db._docCountQueue.docCount; // use cached value
}
function applyNextDocCountDelta() {
if (db._docCountQueue.running || !db._docCountQueue.queue.length ||
db._docCountQueue.docCount === -1) {
return;
}
db._docCountQueue.running = true;
var item = db._docCountQueue.queue.shift();
if (db.isClosed()) {
return item.callback(new Error('database is closed'));
}
stores.metaStore.get(DOC_COUNT_KEY, function (err, docCount) {
docCount = !err ? docCount : 0;
function complete(err) {
db._docCountQueue.docCount = docCount;
item.callback(err, docCount);
db._docCountQueue.running = false;
applyNextDocCountDelta();
}
if (item.delta === 0) {
complete();
} else {
stores.metaStore.put(DOC_COUNT_KEY, docCount + item.delta, complete);
}
});
}
function incrementDocCount(delta, callback) {
db._docCountQueue.queue.push({delta : delta, callback : callback});
applyNextDocCountDelta();
}
api.type = function () {
return 'leveldb';
};
api._id = function (callback) {
callback(null, instanceId);
};
api._info = function (callback) {
countDocs(function (err, docCount) {
if (err) {
return callback(err);
}
stores.metaStore.get(UPDATE_SEQ_KEY, function (err, otherUpdateSeq) {
if (err) {
otherUpdateSeq = db._updateSeq;
}
return callback(null, {
doc_count: docCount,
update_seq: otherUpdateSeq
});
});
});
};
// all read/write operations to the database are done in a queue,
// similar to how websql/idb works. this avoids problems such
// as e.g. compaction needing to have a lock on the database while
// it updates stuff. in the future we can revisit this.
function writeLock(fun) {
return utils.getArguments(function (args) {
var callback = args[args.length - 1];
args[args.length - 1] = utils.getArguments(function (cbArgs) {
callback.apply(null, cbArgs);
process.nextTick(function () {
db._writeQueue.shift();
if (db._writeQueue.length) {
db._writeQueue.peekFront()();
}
});
});
db._writeQueue.push(function () {
fun.apply(null, args);
});
if (db._writeQueue.length === 1) {
db._writeQueue.peekFront()();
}
});
}
function formatSeq(n) {
return ('0000000000000000' + n).slice(-16);
}
function parseSeq(s) {
return parseInt(s, 10);
}
function makeDoc(rawDoc, callback) {
var doc = rawDoc.data;
doc._id = rawDoc.metadata.id;
if ('_rev' in doc) {
if (doc._rev !== rawDoc.metadata.rev) {
return callback(new Error('wrong doc returned'));
}
} else {
// we didn't always store rev
doc._rev = rawDoc.metadata.rev;
}
callback(null, {doc: doc, metadata: rawDoc.metadata});
}
api._get = function (id, opts, callback) {
opts = utils.clone(opts);
var docChanged = [];
function didDocChange(doc) {
docChanged.push(doc);
}
db.on('pouchdb-id-' + id, didDocChange);
stores.docStore.get(id, function (err, metadata) {
db.removeListener('pouchdb-id-' + id, didDocChange);
if (err || !metadata) {
return callback(errors.MISSING_DOC);
}
if (utils.isDeleted(metadata) && !opts.rev) {
return callback(errors.error(errors.MISSING_DOC, "deleted"));
}
var updated;
function ifUpdate(doc) {
updated = doc;
}
var rev = merge.winningRev(metadata);
rev = opts.rev ? opts.rev : rev;
var seq = metadata.rev_map[rev];
var anyChanged = docChanged.filter(function (doc) {
return doc.metadata.seq === seq;
});
if (anyChanged.length) {
return makeDoc(anyChanged.pop(), callback);
}
db.on('pouchdb-' + seq, ifUpdate);
stores.bySeqStore.get(formatSeq(seq), function (err, doc) {
db.removeListener('pouchdb-' + seq, ifUpdate);
if (updated) {
return makeDoc(updated, callback);
}
if (!doc) {
return callback(errors.MISSING_DOC);
}
if ('_id' in doc && doc._id !== metadata.id) {
// this failing implies something very wrong
return callback(new Error('wrong doc returned'));
}
doc._id = metadata.id;
if ('_rev' in doc) {
if (doc._rev !== rev) {
// this failing implies something very wrong
return callback(new Error('wrong doc returned'));
}
} else {
// we didn't always store this
doc._rev = rev;
}
return callback(null, {doc: doc, metadata: metadata});
});
});
};
// not technically part of the spec, but if putAttachment has its own
// method...
api._getAttachment = function (attachment, opts, callback) {
var digest = attachment.digest;
stores.binaryStore.get(digest, function (err, attach) {
var data;
if (err && err.name === 'NotFoundError') {
// Empty attachment
data = opts.encode ? '' : process.browser ?
utils.createBlob([''], {type: attachment.content_type}) :
new Buffer('');
return callback(null, data);
}
if (err) {
return callback(err);
}
if (process.browser) {
if (opts.encode) {
data = utils.btoa(attach);
} else {
data = utils.createBlob([utils.fixBinary(attach)],
{type: attachment.content_type});
}
} else {
data = opts.encode ? utils.btoa(attach) : attach;
}
callback(null, data);
});
};
api._bulkDocs = writeLock(function (req, opts, callback) {
var newEdits = opts.new_edits;
var results = new Array(req.docs.length);
var fetchedDocs = new utils.Map();
// parse the docs and give each a sequence number
var userDocs = req.docs;
var docInfos = userDocs.map(function (doc, i) {
if (doc._id && utils.isLocalId(doc._id)) {
return doc;
}
var newDoc = utils.parseDoc(doc, newEdits);
if (newDoc.metadata && !newDoc.metadata.rev_map) {
newDoc.metadata.rev_map = {};
}
return newDoc;
});
var infoErrors = docInfos.filter(function (doc) {
return doc.error;
});
if (infoErrors.length) {
return callback(infoErrors[0]);
}
// verify any stub attachments as a precondition test
function verifyAttachment(digest, callback) {
stores.attachmentStore.get(digest, function (levelErr) {
if (levelErr) {
var err = new Error('unknown stub attachment with digest ' + digest);
err.status = 412;
callback(err);
} else {
callback();
}
});
}
function verifyAttachments(finish) {
var digests = [];
userDocs.forEach(function (doc) {
if (doc && doc._attachments) {
Object.keys(doc._attachments).forEach(function (filename) {
var att = doc._attachments[filename];
if (att.stub) {
digests.push(att.digest);
}
});
}
});
if (!digests.length) {
return finish();
}
var numDone = 0;
var err;
function checkDone() {
if (++numDone === digests.length) {
finish(err);
}
}
digests.forEach(function (digest) {
verifyAttachment(digest, function (attErr) {
if (attErr && !err) {
err = attErr;
}
checkDone();
});
});
}
function fetchExistingDocs(callback) {
if (!docInfos.length) {
return callback();
}
var numFetched = 0;
function checkDone() {
if (++numFetched === docInfos.length) {
callback();
}
}
docInfos.forEach(function (docInfo) {
if (docInfo._id && utils.isLocalId(docInfo._id)) {
return checkDone(); // skip local docs
}
var id = docInfo.metadata.id;
stores.docStore.get(id, function (err, oldDoc) {
if (oldDoc) {
fetchedDocs.set(id, oldDoc);
}
checkDone();
});
});
}
function processDocs() {
utils.processDocs(docInfos, api, fetchedDocs,
null, results, writeDoc, opts, complete);
}
function writeDoc(doc, delta, winningRev, deleted, callback2, isUpdate,
resultsIdx) {
var err = null;
var recv = 0;
doc.data._id = doc.metadata.id;
doc.data._rev = doc.metadata.rev;
if (deleted) {
doc.data._deleted = true;
}
var attachments = doc.data._attachments ?
Object.keys(doc.data._attachments) :
[];
function collectResults(attachmentErr) {
if (!err) {
if (attachmentErr) {
err = attachmentErr;
callback2(err);
} else if (recv === attachments.length) {
finish();
}
}
}
function attachmentSaved(err) {
recv++;
collectResults(err);
}
function onMD5Load(doc, key, data, attachmentSaved) {
return function (result) {
saveAttachment(doc, MD5_PREFIX + result, key, data, attachmentSaved);
};
}
function onLoadEnd(doc, key, attachmentSaved) {
return function (data) {
utils.MD5(data).then(
onMD5Load(doc, key, data, attachmentSaved)
);
};
}
for (var i = 0; i < attachments.length; i++) {
var key = attachments[i];
var att = doc.data._attachments[key];
if (att.stub) {
// still need to update the refs mapping
var id = doc.data._id;
var rev = doc.data._rev;
saveAttachmentRefs(id, rev, att.digest, attachmentSaved);
continue;
}
var data;
if (typeof att.data === 'string') {
try {
data = utils.atob(att.data);
} catch (e) {
callback(utils.extend({}, errors.BAD_ARG,
{reason: "Attachments need to be base64 encoded"}));
return;
}
} else if (!process.browser) {
data = att.data;
} else { // browser
utils.readAsBinaryString(att.data,
onLoadEnd(doc, key, attachmentSaved));
continue;
}
utils.MD5(data).then(
onMD5Load(doc, key, data, attachmentSaved)
);
}
function finish() {
var seq = doc.metadata.rev_map[doc.metadata.rev];
if (!seq) {
// check that there aren't any existing revisions with the same
// reivision id, else we shouldn't increment updateSeq
seq = ++db._updateSeq;
}
doc.metadata.rev_map[doc.metadata.rev] = doc.metadata.seq = seq;
var seqKey = formatSeq(seq);
db.emit('pouchdb-id-' + doc.metadata.id, doc);
db.emit('pouchdb-' + seqKey, doc);
db.batch([{
key: seqKey,
value: doc.data,
prefix: stores.bySeqStore,
type: 'put',
valueEncoding: 'json'
}, {
key: doc.metadata.id,
value: doc.metadata,
prefix: stores.docStore,
type: 'put',
valueEncoding: vuvuEncoding
}], function (err) {
if (!err) {
db.emit('pouchdb-id-' + doc.metadata.id, doc);
db.emit('pouchdb-' + seqKey, doc);
}
return stores.metaStore.put(UPDATE_SEQ_KEY, db._updateSeq,
function (err) {
if (err) {
results[resultsIdx] = err;
} else {
results[resultsIdx] = doc;
}
incrementDocCount(delta, callback2);
});
});
}
if (!attachments.length) {
finish();
}
}
function saveAttachmentRefs(id, rev, digest, callback) {
stores.attachmentStore.get(digest, function (err, oldAtt) {
var newAttachment = false;
if (err) {
if (err.name !== 'NotFoundError') {
return callback(err);
} else {
newAttachment = true;
}
}
var ref = [id, rev].join('@');
var newAtt = {};
if (oldAtt) {
if (oldAtt.refs) {
// only update references if this attachment already has them
// since we cannot migrate old style attachments here without
// doing a full db scan for references
newAtt.refs = oldAtt.refs;
newAtt.refs[ref] = true;
}
} else {
newAtt.refs = {};
newAtt.refs[ref] = true;
}
stores.attachmentStore.put(digest, newAtt, function (err) {
if (err) {
return callback(err);
}
callback(null, newAttachment);
});
});
}
function saveAttachment(docInfo, digest, key, data, callback) {
var att = docInfo.data._attachments[key];
delete att.data;
att.digest = digest;
att.length = data.length;
var id = docInfo.metadata.id;
var rev = docInfo.metadata.rev;
saveAttachmentRefs(id, rev, digest, function (err, newAttachment) {
if (err) {
return callback(err);
}
// do not try to store empty attachments
if (data.length === 0) {
return callback(err);
}
if (!newAttachment) {
// small optimization - don't bother writing it again
return callback(err);
}
// doing this in batch causes a test to fail, wtf?
stores.binaryStore.put(digest, data, function (err) {
callback(err);
});
});
}
function complete(err) {
if (err) {
return callback(err);
}
var aresults = results.map(function (result) {
if (!Object.keys(result).length) {
return {
ok: true
};
}
if (result.error) {
return result;
}
var metadata = result.metadata;
var rev = merge.winningRev(metadata);
return {
ok: true,
id: metadata.id,
rev: rev
};
});
LevelPouch.Changes.notify(name);
process.nextTick(function () {
callback(null, aresults);
});
}
verifyAttachments(function (err) {
if (err) {
return callback(err);
}
fetchExistingDocs(processDocs);
});
});
api._allDocs = function (opts, callback) {
opts = utils.clone(opts);
countDocs(function (err, docCount) {
if (err) {
return callback(err);
}
var readstreamOpts = {};
var skip = opts.skip || 0;
if (opts.startkey) {
readstreamOpts.start = opts.startkey;
}
if (opts.endkey) {
readstreamOpts.end = opts.endkey;
}
if (opts.key) {
readstreamOpts.start = readstreamOpts.end = opts.key;
}
if (opts.descending) {
readstreamOpts.reverse = true;
// switch start and ends
var tmp = readstreamOpts.start;
readstreamOpts.start = readstreamOpts.end;
readstreamOpts.end = tmp;
}
var limit;
if (typeof opts.limit === 'number') {
limit = opts.limit;
} else {
limit = -1;
}
if (limit === 0 ||
('start' in readstreamOpts && 'end' in readstreamOpts &&
readstreamOpts.start > readstreamOpts.end)) {
// should return 0 results when start is greater than end.
// normally level would "fix" this for us by reversing the order,
// so short-circuit instead
return callback(null, {
total_rows: docCount,
offset: opts.skip,
rows: []
});
}
var results = [];
var docstream = stores.docStore.readStream(readstreamOpts);
function fetchAttachments() {
return utils.Promise.all(results.map(function (row) {
if (row.doc && row.doc._attachments) {
var attNames = Object.keys(row.doc._attachments);
return utils.Promise.all(attNames.map(function (att) {
var attObj = row.doc._attachments[att];
return new utils.Promise(function (resolve, reject) {
stores.binaryStore.get(attObj.digest, function (err, buffer) {
var base64 = '';
if (err && err.name !== 'NotFoundError') {
return reject(err);
} else if (!err) {
base64 = utils.btoa(buffer);
}
row.doc._attachments[att] = utils.extend(
utils.pick(attObj, ['digest', 'content_type']),
{data: base64}
);
resolve();
});
});
}));
}
}));
}
var throughStream = through(function (entry, _, next) {
if (!utils.isDeleted(entry.value)) {
if (skip-- > 0) {
next();
return;
} else if (limit-- === 0) {
docstream.unpipe();
docstream.destroy();
next();
return;
}
} else if (opts.deleted !== 'ok') {
next();
return;
}
function allDocsInner(metadata, data) {
var doc = {
id: metadata.id,
key: metadata.id,
value: {
rev: merge.winningRev(metadata)
}
};
if (opts.include_docs) {
doc.doc = data;
doc.doc._rev = doc.value.rev;
if (opts.conflicts) {
doc.doc._conflicts = merge.collectConflicts(metadata);
}
for (var att in doc.doc._attachments) {
if (doc.doc._attachments.hasOwnProperty(att)) {
doc.doc._attachments[att].stub = true;
}
}
}
if (opts.inclusive_end === false && metadata.id === opts.endkey) {
return next();
} else if (utils.isDeleted(metadata)) {
if (opts.deleted === 'ok') {
doc.value.deleted = true;
doc.doc = null;
} else {
return next();
}
}
results.push(doc);
next();
}
var metadata = entry.value;
if (opts.include_docs) {
var seq = metadata.rev_map[merge.winningRev(metadata)];
stores.bySeqStore.get(formatSeq(seq), function (err, data) {
allDocsInner(metadata, data);
});
}
else {
allDocsInner(metadata);
}
}, function (next) {
utils.Promise.resolve().then(function () {
return opts.attachments && fetchAttachments();
}).then(function () {
callback(null, {
total_rows: docCount,
offset: opts.skip,
rows: results
});
}, callback);
next();
}).on('unpipe', function () {
throughStream.end();
});
docstream.on('error', callback);
docstream.pipe(throughStream);
});
};
api._changes = function (opts) {
opts = utils.clone(opts);
if (opts.continuous) {
var id = name + ':' + utils.uuid();
LevelPouch.Changes.addListener(name, id, api, opts);
LevelPouch.Changes.notify(name);
return {
cancel: function () {
LevelPouch.Changes.removeListener(name, id);
}
};
}
var descending = opts.descending;
var results = [];
var last_seq = 0;
var called = 0;
var streamOpts = {
reverse: descending
};
var limit;
if ('limit' in opts && opts.limit > 0) {
limit = opts.limit;
}
if (!streamOpts.reverse) {
streamOpts.start = formatSeq(opts.since ? opts.since + 1 : 0);
}
var filter = utils.filterChange(opts);
var returnDocs;
if ('returnDocs' in opts) {
returnDocs = opts.returnDocs;
} else {
returnDocs = true;
}
function complete() {
opts.done = true;
if (returnDocs && opts.limit) {
if (opts.limit < results.length) {
results.length = opts.limit;
}
}
changeStream.unpipe(throughStream);
changeStream.destroy();
if (!opts.continuous && !opts.cancelled) {
opts.complete(null, {results: results, last_seq: last_seq});
}
}
var changeStream = stores.bySeqStore.readStream(streamOpts);
var throughStream = through(function (data, _, next) {
if (limit && called >= limit) {
complete();
return next();
}
if (opts.cancelled || opts.done) {
return next();
}
stores.docStore.get(data.value._id, function (err, metadata) {
if (opts.cancelled || opts.done || db.isClosed() ||
utils.isLocalId(metadata.id)) {
return next();
}
var doc = data.value;
doc._rev = merge.winningRev(metadata);
var change = opts.processChange(doc, metadata, opts);
change.seq = metadata.seq;
if (last_seq < metadata.seq) {
last_seq = metadata.seq;
}
// Ensure duplicated dont overwrite winning rev
if (parseSeq(data.key) === metadata.rev_map[change.doc._rev] &&
filter(change)) {
called++;
utils.call(opts.onChange, change);
if (returnDocs) {
results.push(change);
}
}
next();
});
}, function (next) {
if (opts.cancelled) {
return next();
}
if (returnDocs && opts.limit) {
if (opts.limit < results.length) {
results.length = opts.limit;
}
}
next();
}).on('unpipe', function () {
throughStream.end();
complete();
});
changeStream.pipe(throughStream);
return {
cancel: function () {
opts.cancelled = true;
complete();
}
};
};
api._close = function (callback) {
if (db.isClosed()) {
return callback(errors.NOT_OPEN);
}
db.close(function (err) {
if (err) {
callback(err);
} else {
dbStore.delete(name);
callback();
}
});
};
api._getRevisionTree = function (docId, callback) {
stores.docStore.get(docId, function (err, metadata) {
if (err) {
callback(errors.MISSING_DOC);
} else {
callback(null, metadata.rev_tree);
}
});
};
api._doCompaction = writeLock(function (docId, revs, callback) {
if (!revs.length) {
return callback();
}
stores.docStore.get(docId, function (err, metadata) {
if (err) {
return callback(err);
}
var seqs = metadata.rev_map; // map from rev to seq
merge.traverseRevTree(metadata.rev_tree, function (isLeaf, pos,
revHash, ctx, opts) {
var rev = pos + '-' + revHash;
if (revs.indexOf(rev) !== -1) {
opts.status = 'missing';
}
});
var batch = [];
batch.push({
key: metadata.id,
value: metadata,
type: 'put',
valueEncoding: vuvuEncoding,
prefix: stores.docStore
});
var digestMap = {};
var numDone = 0;
var overallErr;
function checkDone(err) {
if (err) {
overallErr = err;
}
if (++numDone === revs.length) { // done
if (overallErr) {
return callback(err);
}
deleteOrphanedAttachments();
}
}
function finish(err) {
if (err) {
return callback(err);
}
db.batch(batch, callback);
}
function deleteOrphanedAttachments() {
var possiblyOrphanedAttachments = Object.keys(digestMap);
if (!possiblyOrphanedAttachments.length) {
return finish();
}
var numDone = 0;
var overallErr;
function checkDone(err) {
if (err) {
overallErr = err;
}
if (++numDone === possiblyOrphanedAttachments.length) {
finish(overallErr);
}
}
var refsToDelete = new utils.Map();
revs.forEach(function (rev) {
refsToDelete.set(docId + '@' + rev, true);
});
possiblyOrphanedAttachments.forEach(function (digest) {
stores.attachmentStore.get(digest, function (err, attData) {
if (err) {
if (err.name === 'NotFoundError') {
return checkDone();
} else {
return checkDone(err);
}
}
var refs = Object.keys(attData.refs || {}).filter(function (ref) {
return !refsToDelete.has(ref);
});
var newRefs = {};
refs.forEach(function (ref) {
newRefs[ref] = true;
});
if (refs.length) { // not orphaned
batch.push({
key: digest,
type: 'put',
valueEncoding: 'json',
value: {refs: newRefs},
prefix: stores.attachmentStore
});
} else { // orphaned, can safely delete
batch = batch.concat([{
key: digest,
type: 'del',
prefix: stores.attachmentStore
}, {
key: digest,
type: 'del',
prefix: stores.binaryStore
}]);
}
checkDone();
});
});
}
revs.forEach(function (rev) {
var seq = seqs[rev];
if (!seq) {
return;
}
batch.push({
key: formatSeq(seq),
type: 'del',
prefix: stores.bySeqStore
});
stores.bySeqStore.get(formatSeq(seq), function (err, doc) {
if (err) {
if (err.name === 'NotFoundError') {
return checkDone();
} else {
return checkDone(err);
}
}
var atts = Object.keys(doc._attachments || {});
atts.forEach(function (attName) {
var digest = doc._attachments[attName].digest;
digestMap[digest] = true;
});
checkDone();
});
});
});
});
api._getLocal = function (id, callback) {
stores.localStore.get(id, function (err, doc) {
if (err) {
callback(errors.MISSING_DOC);
} else {
callback(null, doc);
}
});
};
api._putLocal = writeLock(function (doc, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
api._putLocalNoLock(doc, callback);
});
// the NoLock version is for use by bulkDocs
api._putLocalNoLock = function (doc, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
delete doc._revisions; // ignore this, trust the rev
var oldRev = doc._rev;
var id = doc._id;
stores.localStore.get(id, function (err, resp) {
if (err) {
if (oldRev) {
return callback(errors.REV_CONFLICT);
}
}
if (resp && resp._rev !== oldRev) {
return callback(errors.REV_CONFLICT);
}
if (!oldRev) {
doc._rev = '0-1';
} else {
doc._rev = '0-' + (parseInt(oldRev.split('-')[1], 10) + 1);
}
stores.localStore.put(id, doc, function (err) {
if (err) {
return callback(err);
}
var ret = {ok: true, id: doc._id, rev: doc._rev};
callback(null, ret);
});
});
};
api._removeLocal = writeLock(function (doc, callback) {
api._removeLocalNoLock(doc, callback);
});
// the NoLock version is for use by bulkDocs
api._removeLocalNoLock = function (doc, callback) {
stores.localStore.get(doc._id, function (err, resp) {
if (err) {
return callback(err);
}
if (resp._rev !== doc._rev) {
return callback(errors.REV_CONFLICT);
}
stores.localStore.del(doc._id, function (err) {
if (err) {
return callback(err);
}
var ret = {ok: true, id: doc._id, rev: '0-0'};
callback(null, ret);
});
});
};
}
LevelPouch.valid = function () {
return process && !process.browser;
};
// close and delete open leveldb stores
LevelPouch.destroy = utils.toPromise(function (name, opts, callback) {
opts = utils.clone(opts);
var leveldown = opts.db || originalLeveldown;
function callDestroy(name, cb) {
if (typeof leveldown.destroy === 'function') {
leveldown.destroy(name, cb);
} else {
process.nextTick(callback);
}
}
var dbStore;
if (dbStores.has(leveldown.name)) {
dbStore = dbStores.get(leveldown.name);
} else {
return callDestroy(name, callback);
}
if (dbStore.has(name)) {
LevelPouch.Changes.removeAllListeners(name);
dbStore.get(name).close(function () {
dbStore.delete(name);
callDestroy(name, callback);
});
} else {
callDestroy(name, callback);
}
});
LevelPouch.use_prefix = false;
LevelPouch.Changes = new utils.Changes();
module.exports = LevelPouch;