blob: 6975677639a97cc65ed71bab664444e67e65a896 [file] [log] [blame]
const common = require('./lib/common');
const Database = require('./lib/Database');
/**
* Feed to listen to Kafka messages
* @param {string} brokers - array of Kafka brokers
* @param {string} topic - topic to subscribe to
* @param {bool} isJSONData - attempt to parse messages as JSON
* @param {bool} isBinaryKey - encode key as Base64
* @param {bool} isBinaryValue - encode message as Base64
* @param {string} endpoint - address to OpenWhisk deployment (expected to be bound at deployment)
* @param {string} DB_URL - URL for the DB, must include authentication (expected to be bound at deployment)
* @param {string} DB_NAME - DB name (expected to be bound at deployment)
*/
function main(params) {
var promise = new Promise((resolve, reject) => {
// hold off initializing this until definitely needed
var db;
if (params.__ow_method === "put") {
return validateParameters(params)
.then(validatedParams => {
console.log(`VALIDATED: ${JSON.stringify(validatedParams, null, 2)}`);
db = new Database(params.DB_URL, params.DB_NAME);
var promises = [];
// do these in parallel!
promises.push(db.ensureTriggerIsUnique(validatedParams.triggerName));
promises.push(common.verifyTriggerAuth(validatedParams.triggerURL));
return Promise.all(promises)
.then(result => validatedParams);
})
.then(validatedParams => db.recordTrigger(validatedParams))
.then(result => {
console.log('successfully wrote the trigger');
resolve();
})
.catch(error => {
console.log(`Failed to write the trigger ${error}`);
// defaults to potentially be overridden
var statusCode = 500;
var body = error.toString();
if(error.validationError) {
statusCode = 400;
body = error.validationError;
} else if(error.authError) {
statusCode = 401;
body = error.authError;
}
resolve({
statusCode: statusCode,
headers: {'Content-Type': 'text/plain'},
body: body
});
});
} else if (params.__ow_method === "delete") {
const triggerURL = common.getTriggerURL(params.authKey, params.endpoint, params.triggerName);
return common.verifyTriggerAuth(triggerURL)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.deleteTrigger(params.triggerName);
})
.then(resolve)
.catch(reject);
} else {
resolve({
statusCode: 400,
headers: {'Content-Type': 'text/plain'},
body: 'unsupported lifecycleEvent'
});
}
});
return promise;
}
function validateParameters(rawParams) {
var promise = new Promise((resolve, reject) => {
var validatedParams;
var commonValidationResult = common.performCommonParameterValidation(rawParams);
if(commonValidationResult.validationError) {
reject(commonValidationResult);
return;
} else {
validatedParams = commonValidationResult.validatedParams;
}
// brokers
if (rawParams.brokers) {
validatedParams.brokers = common.validateBrokerParam(rawParams.brokers);
if (!validatedParams.brokers) {
reject( { validationError: "You must supply a 'brokers' parameter as an array of Message Hub brokers." });
return;
}
} else {
reject( { validationError: "You must supply a 'brokers' parameter." });
return;
}
validatedParams.isMessageHub = false;
resolve(validatedParams);
});
return promise;
}
exports.main = main;