blob: 3e0bf3acdb0d359193dd56061893650431c8119f [file] [log] [blame]
import { createError, MISSING_STUB } from 'pouchdb-errors';
import {
preprocessAttachments,
processDocs,
isLocalId,
parseDoc
} from 'pouchdb-adapter-utils';
import {
compactTree
} from 'pouchdb-merge';
import {
ATTACH_AND_SEQ_STORE,
ATTACH_STORE,
BY_SEQ_STORE,
DOC_STORE,
LOCAL_STORE,
META_STORE
} from './constants';
import {
compactRevs,
decodeMetadata,
encodeMetadata,
idbError,
openTransactionSafely
} from './utils';
import changesHandler from './changesHandler';
function idbBulkDocs(dbOpts, req, opts, api, idb, callback) {
var docInfos = req.docs;
var txn;
var docStore;
var bySeqStore;
var attachStore;
var attachAndSeqStore;
var metaStore;
var docInfoError;
var metaDoc;
for (var i = 0, len = docInfos.length; i < len; i++) {
var doc = docInfos[i];
if (doc._id && isLocalId(doc._id)) {
continue;
}
doc = docInfos[i] = parseDoc(doc, opts.new_edits);
if (doc.error && !docInfoError) {
docInfoError = doc;
}
}
if (docInfoError) {
return callback(docInfoError);
}
var allDocsProcessed = false;
var docCountDelta = 0;
var results = new Array(docInfos.length);
var fetchedDocs = new Map();
var preconditionErrored = false;
var blobType = api._meta.blobSupport ? 'blob' : 'base64';
preprocessAttachments(docInfos, blobType, function (err) {
if (err) {
return callback(err);
}
startTransaction();
});
function startTransaction() {
var stores = [
DOC_STORE, BY_SEQ_STORE,
ATTACH_STORE,
LOCAL_STORE, ATTACH_AND_SEQ_STORE,
META_STORE
];
var txnResult = openTransactionSafely(idb, stores, 'readwrite');
if (txnResult.error) {
return callback(txnResult.error);
}
txn = txnResult.txn;
txn.onabort = idbError(callback);
txn.ontimeout = idbError(callback);
txn.oncomplete = complete;
docStore = txn.objectStore(DOC_STORE);
bySeqStore = txn.objectStore(BY_SEQ_STORE);
attachStore = txn.objectStore(ATTACH_STORE);
attachAndSeqStore = txn.objectStore(ATTACH_AND_SEQ_STORE);
metaStore = txn.objectStore(META_STORE);
metaStore.get(META_STORE).onsuccess = function (e) {
metaDoc = e.target.result;
updateDocCountIfReady();
};
verifyAttachments(function (err) {
if (err) {
preconditionErrored = true;
return callback(err);
}
fetchExistingDocs();
});
}
function onAllDocsProcessed() {
allDocsProcessed = true;
updateDocCountIfReady();
}
function idbProcessDocs() {
processDocs(dbOpts.revs_limit, docInfos, api, fetchedDocs,
txn, results, writeDoc, opts, onAllDocsProcessed);
}
function updateDocCountIfReady() {
if (!metaDoc || !allDocsProcessed) {
return;
}
// caching the docCount saves a lot of time in allDocs() and
// info(), which is why we go to all the trouble of doing this
metaDoc.docCount += docCountDelta;
metaStore.put(metaDoc);
}
function fetchExistingDocs() {
if (!docInfos.length) {
return;
}
var numFetched = 0;
function checkDone() {
if (++numFetched === docInfos.length) {
idbProcessDocs();
}
}
function readMetadata(event) {
var metadata = decodeMetadata(event.target.result);
if (metadata) {
fetchedDocs.set(metadata.id, metadata);
}
checkDone();
}
for (var i = 0, len = docInfos.length; i < len; i++) {
var docInfo = docInfos[i];
if (docInfo._id && isLocalId(docInfo._id)) {
checkDone(); // skip local docs
continue;
}
var req = docStore.get(docInfo.metadata.id);
req.onsuccess = readMetadata;
}
}
function complete() {
if (preconditionErrored) {
return;
}
changesHandler.notify(api._meta.name);
callback(null, results);
}
function verifyAttachment(digest, callback) {
var req = attachStore.get(digest);
req.onsuccess = function (e) {
if (!e.target.result) {
var err = createError(MISSING_STUB,
'unknown stub attachment with digest ' +
digest);
err.status = 412;
callback(err);
} else {
callback();
}
};
}
function verifyAttachments(finish) {
var digests = [];
docInfos.forEach(function (docInfo) {
if (docInfo.data && docInfo.data._attachments) {
Object.keys(docInfo.data._attachments).forEach(function (filename) {
var att = docInfo.data._attachments[filename];
if (att.stub) {
digests.push(att.digest);
}
});
}
});
if (!digests.length) {
return finish();
}
var numDone = 0;
var err;
function checkDone() {
if (++numDone === digests.length) {
finish(err);
}
}
digests.forEach(function (digest) {
verifyAttachment(digest, function (attErr) {
if (attErr && !err) {
err = attErr;
}
checkDone();
});
});
}
function writeDoc(docInfo, winningRev, winningRevIsDeleted, newRevIsDeleted,
isUpdate, delta, resultsIdx, callback) {
docInfo.metadata.winningRev = winningRev;
docInfo.metadata.deleted = winningRevIsDeleted;
var doc = docInfo.data;
doc._id = docInfo.metadata.id;
doc._rev = docInfo.metadata.rev;
if (newRevIsDeleted) {
doc._deleted = true;
}
var hasAttachments = doc._attachments &&
Object.keys(doc._attachments).length;
if (hasAttachments) {
return writeAttachments(docInfo, winningRev, winningRevIsDeleted,
isUpdate, resultsIdx, callback);
}
docCountDelta += delta;
updateDocCountIfReady();
finishDoc(docInfo, winningRev, winningRevIsDeleted,
isUpdate, resultsIdx, callback);
}
function finishDoc(docInfo, winningRev, winningRevIsDeleted,
isUpdate, resultsIdx, callback) {
var doc = docInfo.data;
var metadata = docInfo.metadata;
doc._doc_id_rev = metadata.id + '::' + metadata.rev;
delete doc._id;
delete doc._rev;
function afterPutDoc(e) {
var revsToDelete = docInfo.stemmedRevs || [];
if (isUpdate && api.auto_compaction) {
revsToDelete = revsToDelete.concat(compactTree(docInfo.metadata));
}
if (revsToDelete && revsToDelete.length) {
compactRevs(revsToDelete, docInfo.metadata.id, txn);
}
metadata.seq = e.target.result;
// Current _rev is calculated from _rev_tree on read
// delete metadata.rev;
var metadataToStore = encodeMetadata(metadata, winningRev,
winningRevIsDeleted);
var metaDataReq = docStore.put(metadataToStore);
metaDataReq.onsuccess = afterPutMetadata;
}
function afterPutDocError(e) {
// ConstraintError, need to update, not put (see #1638 for details)
e.preventDefault(); // avoid transaction abort
e.stopPropagation(); // avoid transaction onerror
var index = bySeqStore.index('_doc_id_rev');
var getKeyReq = index.getKey(doc._doc_id_rev);
getKeyReq.onsuccess = function (e) {
var putReq = bySeqStore.put(doc, e.target.result);
putReq.onsuccess = afterPutDoc;
};
}
function afterPutMetadata() {
results[resultsIdx] = {
ok: true,
id: metadata.id,
rev: metadata.rev
};
fetchedDocs.set(docInfo.metadata.id, docInfo.metadata);
insertAttachmentMappings(docInfo, metadata.seq, callback);
}
var putReq = bySeqStore.put(doc);
putReq.onsuccess = afterPutDoc;
putReq.onerror = afterPutDocError;
}
function writeAttachments(docInfo, winningRev, winningRevIsDeleted,
isUpdate, resultsIdx, callback) {
var doc = docInfo.data;
var numDone = 0;
var attachments = Object.keys(doc._attachments);
function collectResults() {
if (numDone === attachments.length) {
finishDoc(docInfo, winningRev, winningRevIsDeleted,
isUpdate, resultsIdx, callback);
}
}
function attachmentSaved() {
numDone++;
collectResults();
}
attachments.forEach(function (key) {
var att = docInfo.data._attachments[key];
if (!att.stub) {
var data = att.data;
delete att.data;
att.revpos = parseInt(winningRev, 10);
var digest = att.digest;
saveAttachment(digest, data, attachmentSaved);
} else {
numDone++;
collectResults();
}
});
}
// map seqs to attachment digests, which
// we will need later during compaction
function insertAttachmentMappings(docInfo, seq, callback) {
var attsAdded = 0;
var attsToAdd = Object.keys(docInfo.data._attachments || {});
if (!attsToAdd.length) {
return callback();
}
function checkDone() {
if (++attsAdded === attsToAdd.length) {
callback();
}
}
function add(att) {
var digest = docInfo.data._attachments[att].digest;
var req = attachAndSeqStore.put({
seq: seq,
digestSeq: digest + '::' + seq
});
req.onsuccess = checkDone;
req.onerror = function (e) {
// this callback is for a constaint error, which we ignore
// because this docid/rev has already been associated with
// the digest (e.g. when new_edits == false)
e.preventDefault(); // avoid transaction abort
e.stopPropagation(); // avoid transaction onerror
checkDone();
};
}
for (var i = 0; i < attsToAdd.length; i++) {
add(attsToAdd[i]); // do in parallel
}
}
function saveAttachment(digest, data, callback) {
var getKeyReq = attachStore.count(digest);
getKeyReq.onsuccess = function (e) {
var count = e.target.result;
if (count) {
return callback(); // already exists
}
var newAtt = {
digest: digest,
body: data
};
var putReq = attachStore.put(newAtt);
putReq.onsuccess = callback;
};
}
}
export default idbBulkDocs;