blob: 4d994aff09a7cd3415d4727c8a50eac4723442a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const { initializeActionHandler, NodeActionRunner } = require('../runner');
function NodeActionService(config) {
const Status = {
ready: 'ready',
starting: 'starting',
running: 'running',
stopped: 'stopped',
};
const ignoreRunStatus = config.allowConcurrent === undefined ? false : config.allowConcurrent.toLowerCase() === 'true';
let status = Status.ready;
let server = undefined;
let userCodeRunner = undefined;
function setStatus(newStatus) {
if (status !== Status.stopped) {
status = newStatus;
}
}
/**
* An ad-hoc format for the endpoints returning a Promise representing,
* eventually, an HTTP response.
*
* The promised values (whether successful or not) have the form:
* { code: int, response: object }
*
*/
function responseMessage(code, response) {
return { code: code, response: response };
}
function errorMessage(code, errorMsg) {
return responseMessage(code, { error: errorMsg });
}
/**
* Indicates if we have been initialized which is determined by if we have
* created a NodeActionRunner.
* @returns {boolean}
*/
this.initialized = function isInitialized(){
return (typeof userCodeRunner !== 'undefined');
};
/**
* Starts the server.
*
* @param app express app
*/
this.start = function start(app) {
server = app.listen(config.port, function() {
var host = server.address().address;
var port = server.address().port;
});
// This is required as http server will auto disconnect in 2 minutes, this to not auto disconnect at all
server.timeout = 0;
};
/** Returns a promise of a response to the /init invocation.
*
* req.body = { main: String, code: String, binary: Boolean }
*/
this.initCode = function initCode(req) {
if (status === Status.ready && userCodeRunner === undefined) {
setStatus(Status.starting);
let body = req.body || {};
let message = body.value || {};
if (message.main && message.code && typeof message.main === 'string' && typeof message.code === 'string') {
return doInit(message).then(_ => {
setStatus(Status.ready);
return responseMessage(200, { OK: true });
}).catch(error => {
setStatus(Status.stopped);
let errStr = `Initialization has failed due to: ${error.stack ? String(error.stack) : error}`;
return Promise.reject(errorMessage(502, errStr));
});
} else {
setStatus(Status.ready);
let msg = 'Missing main/no code to execute.';
return Promise.reject(errorMessage(403, msg));
}
} else if (userCodeRunner !== undefined) {
let msg = 'Cannot initialize the action more than once.';
console.error('Internal system error:', msg);
return Promise.reject(errorMessage(403, msg));
} else {
let msg = `System not ready, status is ${status}.`;
console.error('Internal system error:', msg);
return Promise.reject(errorMessage(403, msg));
}
};
/**
* Returns a promise of a response to the /exec invocation.
* Note that the promise is failed if and only if there was an unhandled error
* (the user code threw an exception, or our proxy had an internal error).
* Actions returning { error: ... } are modeled as a Promise successful resolution.
*
* req.body = { value: Object, meta { activationId : int } }
*/
this.runCode = function runCode(req) {
if (status === Status.ready && userCodeRunner !== undefined) {
if (!ignoreRunStatus) {
setStatus(Status.running);
}
// these are defensive checks against the expected interface invariants
let msg = req && req.body || {};
if (msg.value === null || msg.value === undefined) {
msg.value = {};
} else if (typeof msg.value !== 'object') {
let errStr = `Internal system error: the argument must be a dictionary but has type '${typeof msg.value}'.`;
console.error('Internal system error:', errStr);
return Promise.reject(errorMessage(403, errStr));
}
return doRun(msg).then(result => {
if (!ignoreRunStatus) {
setStatus(Status.ready);
}
if (typeof result !== 'object') {
return errorMessage(502, 'The action did not return a dictionary.');
} else {
return responseMessage(200, result);
}
}).catch(error => {
let msg = `An error has occurred: ${error}`;
setStatus(Status.stopped);
return Promise.reject(errorMessage(502, msg));
});
} else {
let msg = userCodeRunner ? `System not ready, status is ${status}.` : 'System not initialized.';
console.error('Internal system error:', msg);
return Promise.reject(errorMessage(403, msg));
}
};
function doInit(message) {
return initializeActionHandler(message)
.then(handler => {
userCodeRunner = new NodeActionRunner(handler);
})
// emit error to activation log then flush the logs as this is the end of the activation
.catch(error => {
console.error('Error during initialization:', error);
writeMarkers();
return Promise.reject(error);
});
}
function doRun(msg) {
// Move per-activation keys to process env. vars with __OW_ (reserved) prefix
Object.keys(msg).forEach(k => {
if (typeof msg[k] === 'string' && k !== 'value') {
let envVariable = '__OW_' + k.toUpperCase();
process.env[envVariable] = msg[k];
}
});
return userCodeRunner
.run(msg.value)
.then(result => {
if (typeof result !== 'object') {
console.error(`Result must be of type object but has type "${typeof result}":`, result);
}
writeMarkers();
return result;
}).catch(error => {
console.error(error);
writeMarkers();
return Promise.reject(error);
});
}
function writeMarkers() {
console.log('XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX');
console.error('XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX');
}
}
NodeActionService.getService = config => new NodeActionService(config);
module.exports = NodeActionService;