blob: fc785c087db8faf4ae1513518403723ec8a6e2c0 [file] [log] [blame]
var request = require('request');
/**
* Feed to listen to Kafka messages
* @param {string} brokers - array of Kafka brokers
* @param {string} topic - topic to subscribe to
* @param {bool} isJSONData - attempt to parse messages as JSON
* @param {bool} isBinaryKey - encode message as Base64
* @param {bool} isBinaryValue - encode key as Base64
* @param {string} endpoint - address to OpenWhisk deployment
*/
function main(params) {
var promise = new Promise(function(resolve, reject) {
if(!params.package_endpoint) {
reject('Could not find the package_endpoint parameter.');
return;
}
var triggerComponents = params.triggerName.split("/");
var namespace = encodeURIComponent(process.env['__OW_NAMESPACE']);
var trigger = encodeURIComponent(triggerComponents[2]);
var feedServiceURL = 'http://' + params.package_endpoint + '/triggers/' + namespace + '/' + trigger;
if (params.lifecycleEvent === 'CREATE') {
// makes a PUT request to create the trigger in the feed service provider
var putTrigger = function(validatedParams) {
var body = validatedParams;
// params.endpoint may already include the protocol - if so,
// strip it out
var massagedAPIHost = params.endpoint.replace(/https?:\/\/(.*)/, "$1");
body.triggerURL = 'https://' + process.env['__OW_API_KEY'] + "@" + massagedAPIHost + '/api/v1/namespaces/' + namespace + '/triggers/' + trigger;
var options = {
method: 'PUT',
url: feedServiceURL,
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
'User-Agent': 'whisk'
}
};
return doRequest(options);
};
validateParameters(params)
.then(putTrigger)
.then(resolve)
.catch(reject);
} else if (params.lifecycleEvent === 'DELETE') {
var authorizationHeader = 'Basic ' + new Buffer(process.env['__OW_API_KEY']).toString('base64');
var options = {
method: 'DELETE',
url: feedServiceURL,
headers: {
'Content-Type': 'application/json',
'Authorization': authorizationHeader,
'User-Agent': 'whisk'
}
};
doRequest(options)
.then(resolve)
.catch(reject);
}
});
return promise;
}
function doRequest(options) {
var requestPromise = new Promise(function (resolve, reject) {
request(options, function (error, response, body) {
if (error) {
reject({
response: response,
error: error,
body: JSON.parse(body)
});
} else {
console.log("Status code: " + response.statusCode);
if (response.statusCode >= 400) {
console.log("Response from Kafka feed service: " + body);
reject({
statusCode: response.statusCode,
response: JSON.parse(body)
});
} else {
resolve({
response: JSON.parse(body)
});
}
}
});
});
return requestPromise;
}
function validateParameters(rawParams) {
var promise = new Promise(function(resolve, reject) {
var validatedParams = {};
validatedParams.isMessageHub = false;
validatedParams.isJSONData = getBooleanFromArgs(rawParams, 'isJSONData');
validatedParams.isBinaryValue = getBooleanFromArgs(rawParams, 'isBinaryValue');
validatedParams.isBinaryKey = getBooleanFromArgs(rawParams, 'isBinaryKey');
if (validatedParams.isJSONData && validatedParams.isBinaryValue) {
reject('isJSONData and isBinaryValue cannot both be enabled.');
return;
}
if (rawParams.topic && rawParams.topic.length > 0) {
validatedParams.topic = rawParams.topic;
} else {
reject('You must supply a "topic" parameter.');
return;
}
if (isNonEmptyArray(rawParams.brokers)) {
validatedParams.brokers = rawParams.brokers;
} else {
reject('You must supply a "brokers" parameter as an array of Kafka brokers.');
return;
}
resolve(validatedParams);
});
return promise;
}
function getBooleanFromArgs(args, key) {
return (typeof args[key] !== 'undefined' && args[key] && (args[key] === true || args[key].toString().trim().toLowerCase() === 'true'));
}
function isNonEmptyArray(obj) {
return obj && Array.isArray(obj) && obj.length !== 0;
}