blob: d06f24b5405d390afddb3672fa01867d79e59ab1 [file] [log] [blame]
const MAX_LINE = 10240
class IO {
constructor () {
// this.encoder = new TextEncoder()
}
async readline () {
let buffer = new Uint8Array(MAX_LINE)
let bytes
while (bytes = await Deno.read(Deno.stdin.rid, buffer)) {
let text = new TextDecoder().decode(buffer).substr(0, bytes - 1) // strip newline
return text
}
}
async print (msg) {
this.encoder = new TextEncoder()
await Deno.write(Deno.stdout.rid, this.encoder.encode(msg + '\n'))
// console.log(msg)
}
writeline (obj) {
try {
this.print(JSON.stringify(obj))
} catch (e) {
this.log("Error converting object to JSON: " + e.toString())
this.log("error on obj: " + (obj.toSource ? obj.toSource() : obj.toString()))
}
}
log (message) {
if (typeof message == "xml") {
message = message.toXMLString();
} else if (typeof message != "string") {
message = JSON.stringify(message);
}
this.writeline(["log", String(message)]);
}
}
class State {
constructor (couch, io) {
this.couch = couch
this.io = io
this.funs = []
this.lib = null
this.query_config = {}
}
reset (config) {
// clear the globals and run gc
this.funs = []
this.lib = null
this.query_config = config || {}
this.io.print("true"); // indicates success
}
addFun (newFun) {
// Compile to a function and add it to funs array
this.funs.push(this.couch.compileFunction(newFun, {views : {lib : this.lib}}))
Debug.debug(`this.funs`)
Debug.debug(this.funs)
this.io.print("true")
}
addLib (lib) { // TODO: take out
this.lib = lib
this.io.print("true")
}
}
class Filter {
constructor (couch, io) {
this.couch = couch
this.io = io
this.view_emit = false;
}
emit (key, value) {
view_emit = true;
}
createFilterSandbox () {
var sandbox = this.couch.createSandbox();
sandbox.emit = this.emit;
return sandbox;
}
filter (fun, ddoc, args) {
var results = [];
var docs = args[0];
var req = args[1];
for (var i=0; i < docs.length; i++) {
results.push((fun.apply(ddoc, [docs[i], req]) && true) || false);
};
this.io.writeline([true, results]);
}
filter_view (fun, ddoc, args) {
// recompile
var sandbox = this.createFlterSandbox();
var source = fun.toSource ? fun.toSource() : '(' + fun.toString() + ')';
const emit = sandbox.emit;
const log = sandbox.log;
const JSON = sandbox.JSON;
fun = eval(source);
var results = [];
var docs = args[0];
for (var i=0; i < docs.length; i++) {
view_emit = false;
fun(docs[i]);
results.push((view_emit && true) || false);
};
this.io.writeline([true, results]);
}
}
class DesignDoc {
constructor (couch, io) {
this.couch = couch
this.io = io
this.validate = new Validate(io)
this.filter = new Filter(couch, io)
this.ddoc_dispatch = {
"filters": [this.filter, this.filter.filter],
"views": [this.filter, this.filter.filter_view],
"validate_doc_update": [this.validate, this.validate.validate]
}
this.ddocs = {}
}
ddoc () {
Debug.debug('> ddoc()')
var args = []
for (var i=0; i < arguments.length; i++) {
args.push(arguments[i])
}
Debug.debug('ddoc args' + JSON.stringify(args))
var ddocId = args.shift()
if (ddocId == "new") {
// get the real ddocId.
ddocId = args.shift()
// store the ddoc, functions are lazily compiled.
this.ddocs[ddocId] = args.shift()
Debug.debug('ddocs: ' + JSON.stringify(this.ddocs))
this.io.print("true")
} else {
// Couch makes sure we know this ddoc already.
Debug.debug('ddocs: known')
var ddoc = this.ddocs[ddocId]
if (!ddoc) throw(["fatal", "query_protocol_error", "uncached design doc: "+ddocId])
var funPath = args.shift()
var cmd = funPath[0]
// the first member of the fun path determines the type of operation
var funArgs = args.shift()
if (this.ddoc_dispatch[cmd]) {
// get the function, call the command with it
var point = ddoc
for (var i=0; i < funPath.length; i++) {
if (i+1 == funPath.length) {
var fun = point[funPath[i]]
if (!fun) {
throw(["error","not_found",
"missing " + funPath[0] + " function " + funPath[i] +
" on design doc " + ddocId])
}
if (typeof fun != "function") {
fun = this.couch.compileFunction(fun, ddoc, funPath.join('.'))
// cache the compiled fun on the ddoc
point[funPath[i]] = fun
}
} else {
point = point[funPath[i]]
}
}
// run the correct responder with the cmd body
const [object, method] = this.ddoc_dispatch[cmd]
method.apply(object, [fun, ddoc, funArgs])
} else {
// unknown command, quit and hope the restarted version is better
throw(["fatal", "unknown_command", "unknown ddoc command '" + cmd + "'"])
}
}
}
}
class Views {
constructor (couch, io, state) {
this.couch = couch
this.io = io
this.map_results = [] // holds temporary emitted values during doc map
this.state = state
}
runReduce (reduceFuns, keys, values, rereduce) {
var code_size = 0
for (var i in reduceFuns) {
var fun_body = reduceFuns[i]
code_size += fun_body.length
reduceFuns[i] = this.couch.compileFunction(fun_body)
}
var reductions = new Array(reduceFuns.length)
for(var i = 0; i < reduceFuns.length; i++) {
try {
reductions[i] = reduceFuns[i](keys, values, rereduce)
} catch (err) {
Debug.debug(`reduce err`)
Debug.debug(JSON.stringify(doc))
handleViewError(err)
// if the error is not fatal, ignore the results and continue
reductions[i] = null
}
}
var reduce_line = JSON.stringify(reductions)
var reduce_length = reduce_line.length
var input_length = this.state.line_length - code_size
// TODO make reduce_limit config into a number
if (this.state.query_config && this.state.query_config.reduce_limit &&
reduce_length > 4096 && ((reduce_length * 2) > input_length)) {
var log_message = [
"Reduce output must shrink more rapidly:",
"input size:", input_length,
"output size:", reduce_length
].join(" ")
if (this.state.query_config.reduce_limit === "log") {
this.io.log("reduce_overflow_error: " + log_message)
this.io.writeline("[true," + reduce_line + "]")
} else {
throw(["error", "reduce_overflow_error", log_message])
}
} else {
this.io.print("[true," + reduce_line + "]")
}
}
handleViewError(err, doc) {
if (err == "fatal_error") {
// Only if it's a "fatal_error" do we exit. What's a fatal error?
// That's for the query to decide.
//
// This will make it possible for queries to completely error out,
// by catching their own local exception and rethrowing a
// fatal_error. But by default if they don't do error handling we
// just eat the exception and carry on.
//
// In this case we abort map processing but don't destroy the
// JavaScript process. If you need to destroy the JavaScript
// process, throw the error form matched by the block below.
throw(["error", "map_runtime_error", "function raised 'fatal_error'"])
} else if (err[0] == "fatal") {
// Throwing errors of the form ["fatal","error_key","reason"]
// will kill the OS process. This is not normally what you want.
throw(err)
}
var message = "function raised exception " +
(err.toSource ? err.toSource() : err.stack)
if (doc) message += " with doc._id " + doc._id
this.io.log(message)
}
// view helper function
emit (key, value) {
Debug.debug(`emit called`)
Debug.debug(key)
Debug.debug(value)
this.map_results.push([key, value])
}
sum (values) {
var rv = 0
for (var i in values) {
rv += values[i]
}
return rv
}
reduce (reduceFuns, kvs) {
var keys = new Array(kvs.length)
var values = new Array(kvs.length)
for(var i = 0; i < kvs.length; i++) {
keys[i] = kvs[i][0]
values[i] = kvs[i][1]
}
this.runReduce(reduceFuns, keys, values, false)
}
rereduce (reduceFuns, values) {
this.runReduce(reduceFuns, null, values, true)
}
mapDoc (doc) {
// Compute all the map functions against the document.
//
// Each function can output multiple key/value pairs for each document.
//
// Example output of map_doc after three functions set by add_fun cmds:
// [
// [["Key","Value"]], <- fun 1 returned 1 key value
// [], <- fun 2 returned 0 key values
// [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values
// ]
//
// Couch.recursivelySeal(doc)
var buf = []
for (var fun in this.state.funs) {
this.map_results = []
try {
this.state.funs[fun](doc)
buf.push(this.map_results)
} catch (err) {
Debug.debug(`map_doc err ${err}`)
this.handleViewError(err, doc)
// If the error is not fatal, we treat the doc as if it
// did not emit anything, by buffering an empty array.
buf.push([])
}
}
this.io.writeline(buf)
}
}
class Validate {
constructor (io) {
this.io = io
}
validate (fun, ddoc, args) {
try {
fun.apply(ddoc, args);
this.io.writeline(1);
} catch (error) {
if (error.name && error.stack) {
throw error;
}
this.io.writeline(error);
}
}
};
class Debug {
static async debug(message) {
// await Deno.writeTextFileSync('/tmp/deno-qs.log', message.toString() + '\n')
}
}
class Couch {
constructor (qs) {
this.qs = qs
}
compileFunction (source, ddoc, name) {
if (!source) throw (["error", "not_found", "missing function"]);
var functionObject = null;
var sandbox = this.qs.createSandbox();
try {
var rewrittenFun = `(${source})` //rewriteFunInt(source);
const emit = sandbox.emit;
const sum = sandbox.sum;
const log = sandbox.log;
const JSON = sandbox.JSON;
/* const index = sandbox.index */;
var newRew = rewrittenFun
// console.log(newRew)
Debug.debug('nnnL: ' + newRew)
functionObject = eval(newRew);
} catch (err) {
throw ([
"error",
"compilation_error",
(err.toSource ? err.toSource() : err.stack) + " (" + source + ")"
]);
};
if (typeof(functionObject) == "function") {
Debug.debug(`compiled fun ${functionObject}`)
return functionObject;
} else {
throw (["error", "compilation_error",
"Expression does not eval to a function. (" + source.toString() + ")"
]);
};
}
}
class QueryServer {
constructor () {
this.io = new IO()
this.couch = new Couch(this)
this.state = new State(this.couch, this.io)
this.ddoc = new DesignDoc(this.couch, this.io)
this.views = new Views(this.couch, this.io, this.state)
this.dispatch = {
"ddoc": [this.ddoc, this.ddoc.ddoc],
"reset": [this.state, this.state.reset],
"add_fun": [this.state, this.state.addFun],
"map_doc": [this.views, this.views.mapDoc],
// "index_doc": Dreyfus.indexDoc,
"reduce": [this.views, this.views.reduce],
"rereduce": [this.views, this.views.rereduce]
}
}
createSandbox () {
// if possible, use evalcx (not always available)
var sandbox = {}; //eval('');
sandbox.emit = (k, v) => this.views.emit(k, v);
sandbox.sum = (k, v, r) => this.views.sum(k, v);
// sandbox.log = log;
sandbox.JSON = JSON;
// sandbox.index = Dreyfus.index;
return sandbox;
}
handleError (e) {
Debug.debug(e)
Debug.debug(e.stack)
var type = e[0]
if (type == "fatal") {
e[0] = "error"; // we tell the client it was a fatal error by dying
this.io.writeline(e)
Deno.exit(-1)
} else if (type == "error") {
this.io.writeline(e)
} else if (e.error && e.reason) {
// compatibility with old error format
this.io.writeline(["error", e.error, e.reason])
} else if (e.name) {
this.io.writeline(["error", e.name, e])
} else {
this.io.writeline(["error", "unnamed_error", e.toSource ? e.toSource() : e.stack])
}
}
async loop () {
let cmd
let cmdkey
let line
while (line = await this.io.readline()) {
cmd = JSON.parse(line)
Debug.debug(`line: ` + JSON.stringify(cmd) + "\n")
this.state.line_length = line.length
try {
cmdkey = cmd.shift()
Debug.debug(`cmdkey: ` + JSON.stringify(cmdkey) + "\n")
Debug.debug(`cmd: ` + JSON.stringify(cmd) + "\n")
if (this.dispatch[cmdkey]) {
// run the correct responder with the cmd body
const [object, method] = this.dispatch[cmdkey]
method.apply(object, cmd)
} else {
// unknown command, quit and hope the restarted version is better
throw (["fatal", "unknown_command", "unknown command '" + cmdkey + "'"])
}
} catch (e) {
this.handleError(e)
}
}
}
}
const query_server = new QueryServer()
query_server.loop()