blob: bcf27b5332051a3bfceeea51946df6f62b37aca0 [file] [log] [blame]
import {
preprocessAttachments,
isLocalId,
processDocs,
parseDoc
} from 'pouchdb-adapter-utils';
import {
compactTree
} from 'pouchdb-merge';
import {
safeJsonParse,
safeJsonStringify
} from 'pouchdb-json';
import {
MISSING_STUB,
createError
} from 'pouchdb-errors';
import {
DOC_STORE,
BY_SEQ_STORE,
ATTACH_STORE,
ATTACH_AND_SEQ_STORE
} from './constants';
import {
select,
stringifyDoc,
compactRevs,
websqlError,
escapeBlob
} from './utils';
function websqlBulkDocs(dbOpts, req, opts, api, db, websqlChanges, callback) {
var newEdits = opts.new_edits;
var userDocs = req.docs;
// Parse the docs, give them a sequence number for the result
var docInfos = userDocs.map(function (doc) {
if (doc._id && isLocalId(doc._id)) {
return doc;
}
var newDoc = parseDoc(doc, newEdits);
return newDoc;
});
var docInfoErrors = docInfos.filter(function (docInfo) {
return docInfo.error;
});
if (docInfoErrors.length) {
return callback(docInfoErrors[0]);
}
var tx;
var results = new Array(docInfos.length);
var fetchedDocs = new Map();
var preconditionErrored;
function complete() {
if (preconditionErrored) {
return callback(preconditionErrored);
}
websqlChanges.notify(api._name);
callback(null, results);
}
function verifyAttachment(digest, callback) {
var sql = 'SELECT count(*) as cnt FROM ' + ATTACH_STORE +
' WHERE digest=?';
tx.executeSql(sql, [digest], function (tx, result) {
if (result.rows.item(0).cnt === 0) {
var err = createError(MISSING_STUB,
'unknown stub attachment with digest ' +
digest);
callback(err);
} else {
callback();
}
});
}
function verifyAttachments(finish) {
var digests = [];
docInfos.forEach(function (docInfo) {
if (docInfo.data && docInfo.data._attachments) {
Object.keys(docInfo.data._attachments).forEach(function (filename) {
var att = docInfo.data._attachments[filename];
if (att.stub) {
digests.push(att.digest);
}
});
}
});
if (!digests.length) {
return finish();
}
var numDone = 0;
var err;
function checkDone() {
if (++numDone === digests.length) {
finish(err);
}
}
digests.forEach(function (digest) {
verifyAttachment(digest, function (attErr) {
if (attErr && !err) {
err = attErr;
}
checkDone();
});
});
}
function writeDoc(docInfo, winningRev, winningRevIsDeleted, newRevIsDeleted,
isUpdate, delta, resultsIdx, callback) {
function finish() {
var data = docInfo.data;
var deletedInt = newRevIsDeleted ? 1 : 0;
var id = data._id;
var rev = data._rev;
var json = stringifyDoc(data);
var sql = 'INSERT INTO ' + BY_SEQ_STORE +
' (doc_id, rev, json, deleted) VALUES (?, ?, ?, ?);';
var sqlArgs = [id, rev, json, deletedInt];
// map seqs to attachment digests, which
// we will need later during compaction
function insertAttachmentMappings(seq, callback) {
var attsAdded = 0;
var attsToAdd = Object.keys(data._attachments || {});
if (!attsToAdd.length) {
return callback();
}
function checkDone() {
if (++attsAdded === attsToAdd.length) {
callback();
}
return false; // ack handling a constraint error
}
function add(att) {
var sql = 'INSERT INTO ' + ATTACH_AND_SEQ_STORE +
' (digest, seq) VALUES (?,?)';
var sqlArgs = [data._attachments[att].digest, seq];
tx.executeSql(sql, sqlArgs, checkDone, checkDone);
// second callback is for a constaint error, which we ignore
// because this docid/rev has already been associated with
// the digest (e.g. when new_edits == false)
}
for (var i = 0; i < attsToAdd.length; i++) {
add(attsToAdd[i]); // do in parallel
}
}
tx.executeSql(sql, sqlArgs, function (tx, result) {
var seq = result.insertId;
insertAttachmentMappings(seq, function () {
dataWritten(tx, seq);
});
}, function () {
// constraint error, recover by updating instead (see #1638)
var fetchSql = select('seq', BY_SEQ_STORE, null,
'doc_id=? AND rev=?');
tx.executeSql(fetchSql, [id, rev], function (tx, res) {
var seq = res.rows.item(0).seq;
var sql = 'UPDATE ' + BY_SEQ_STORE +
' SET json=?, deleted=? WHERE doc_id=? AND rev=?;';
var sqlArgs = [json, deletedInt, id, rev];
tx.executeSql(sql, sqlArgs, function (tx) {
insertAttachmentMappings(seq, function () {
dataWritten(tx, seq);
});
});
});
return false; // ack that we've handled the error
});
}
function collectResults(attachmentErr) {
if (!err) {
if (attachmentErr) {
err = attachmentErr;
callback(err);
} else if (recv === attachments.length) {
finish();
}
}
}
var err = null;
var recv = 0;
docInfo.data._id = docInfo.metadata.id;
docInfo.data._rev = docInfo.metadata.rev;
var attachments = Object.keys(docInfo.data._attachments || {});
if (newRevIsDeleted) {
docInfo.data._deleted = true;
}
function attachmentSaved(err) {
recv++;
collectResults(err);
}
attachments.forEach(function (key) {
var att = docInfo.data._attachments[key];
if (!att.stub) {
var data = att.data;
delete att.data;
att.revpos = parseInt(winningRev, 10);
var digest = att.digest;
saveAttachment(digest, data, attachmentSaved);
} else {
recv++;
collectResults();
}
});
if (!attachments.length) {
finish();
}
function dataWritten(tx, seq) {
var id = docInfo.metadata.id;
var revsToCompact = docInfo.stemmedRevs || [];
if (isUpdate && api.auto_compaction) {
revsToCompact = compactTree(docInfo.metadata).concat(revsToCompact);
}
if (revsToCompact.length) {
compactRevs(revsToCompact, id, tx);
}
docInfo.metadata.seq = seq;
var rev = docInfo.metadata.rev;
delete docInfo.metadata.rev;
var sql = isUpdate ?
'UPDATE ' + DOC_STORE +
' SET json=?, max_seq=?, winningseq=' +
'(SELECT seq FROM ' + BY_SEQ_STORE +
' WHERE doc_id=' + DOC_STORE + '.id AND rev=?) WHERE id=?'
: 'INSERT INTO ' + DOC_STORE +
' (id, winningseq, max_seq, json) VALUES (?,?,?,?);';
var metadataStr = safeJsonStringify(docInfo.metadata);
var params = isUpdate ?
[metadataStr, seq, winningRev, id] :
[id, seq, seq, metadataStr];
tx.executeSql(sql, params, function () {
results[resultsIdx] = {
ok: true,
id: docInfo.metadata.id,
rev: rev
};
fetchedDocs.set(id, docInfo.metadata);
callback();
});
}
}
function websqlProcessDocs() {
processDocs(dbOpts.revs_limit, docInfos, api, fetchedDocs, tx,
results, writeDoc, opts);
}
function fetchExistingDocs(callback) {
if (!docInfos.length) {
return callback();
}
var numFetched = 0;
function checkDone() {
if (++numFetched === docInfos.length) {
callback();
}
}
docInfos.forEach(function (docInfo) {
if (docInfo._id && isLocalId(docInfo._id)) {
return checkDone(); // skip local docs
}
var id = docInfo.metadata.id;
tx.executeSql('SELECT json FROM ' + DOC_STORE +
' WHERE id = ?', [id], function (tx, result) {
if (result.rows.length) {
var metadata = safeJsonParse(result.rows.item(0).json);
fetchedDocs.set(id, metadata);
}
checkDone();
});
});
}
function saveAttachment(digest, data, callback) {
var sql = 'SELECT digest FROM ' + ATTACH_STORE + ' WHERE digest=?';
tx.executeSql(sql, [digest], function (tx, result) {
if (result.rows.length) { // attachment already exists
return callback();
}
// we could just insert before selecting and catch the error,
// but my hunch is that it's cheaper not to serialize the blob
// from JS to C if we don't have to (TODO: confirm this)
sql = 'INSERT INTO ' + ATTACH_STORE +
' (digest, body, escaped) VALUES (?,?,1)';
tx.executeSql(sql, [digest, escapeBlob(data)], function () {
callback();
}, function () {
// ignore constaint errors, means it already exists
callback();
return false; // ack we handled the error
});
});
}
preprocessAttachments(docInfos, 'binary', function (err) {
if (err) {
return callback(err);
}
db.transaction(function (txn) {
tx = txn;
verifyAttachments(function (err) {
if (err) {
preconditionErrored = err;
} else {
fetchExistingDocs(websqlProcessDocs);
}
});
}, websqlError(callback), complete);
});
}
export default websqlBulkDocs;