support for rewriting rules, plus some code cleanup
diff --git a/client/lib/activations.js b/client/lib/activations.js
new file mode 100644
index 0000000..2ab4f51
--- /dev/null
+++ b/client/lib/activations.js
@@ -0,0 +1,57 @@
+var pollIntervalMillis = 200,
+ openwhisk = require('openwhisk'),
+ ok_ = require('./repl-messages').ok_,
+ errorWhile = require('./repl-messages').errorWhile,
+ api = {
+ host: 'https://openwhisk.ng.bluemix.net',
+ path: '/api/v1'
+ };
+
+exports.waitForActivationCompletion = function waitForActivationCompletion(wskprops, eventBus, waitForThisAction, activation) {
+ var key = wskprops.AUTH;
+ var ow = openwhisk({
+ api: api.host + api.path,
+ api_key: key,
+ namespace: '_' // special here, as activations are currently stored in the user's default namespace
+ });
+
+ return new Promise((resolve, reject) => {
+ if (activation && activation.activationId) {
+ // successfully invoked
+
+ /*if (!attachedTo) {
+ console.log('Successfully invoked with activationId', activation.activationId);
+
+ } else {
+ // we'll wait for the result...
+ }*/
+
+ //
+ // wait for activation completion
+ //
+ var pollOnce = function() {
+ ow.activations.list({ limit: 10 }).then(list => {
+ var allDone = false;
+ for (var i = 0; i < list.length; i++) {
+ var activation = list[i];
+ if (activation.name === waitForThisAction) {
+ ow.activations.get({ activation: activation.activationId })
+ .then(activationDetails => {
+ console.log(JSON.stringify(activationDetails, undefined, 4));
+ eventBus.emit('invocation-done', activationDetails);
+ resolve(activationDetails);
+ }).catch(errorWhile('fetching activation detais', reject));
+ allDone = true;
+ break;
+ }
+ }
+ if (!allDone) {
+ setTimeout(pollOnce, pollIntervalMillis);
+ }
+
+ }).catch(errorWhile('listing activations', reject));
+ };
+ setTimeout(pollOnce, pollIntervalMillis);
+ }
+ });
+};
diff --git a/client/lib/commands/fire.js b/client/lib/commands/fire.js
new file mode 100644
index 0000000..5e7a91e
--- /dev/null
+++ b/client/lib/commands/fire.js
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+var inquirer = require('inquirer'),
+ _list = require('./list')._list,
+ ok = require('../repl-messages').ok,
+ ok_ = require('../repl-messages').ok_,
+ errorWhile = require('../repl-messages').errorWhile,
+ setupOpenWhisk = require('../util').setupOpenWhisk;
+
+exports.created = {};
+
+/**
+ * Fires a trigger
+ *
+ */
+exports.fire = function fireTrigger(wskprops, next, name) {
+ var ow = setupOpenWhisk(wskprops);
+
+ function doFire(name) {
+ return ow.triggers.invoke(name)
+ .then(ok(next))
+ .catch(errorWhile('firing trigger'), next);
+ }
+
+ if (!name) {
+ _list(ow, function(L) {
+ inquirer
+ .prompt([{ name: 'triggerName', type: 'list',
+ message: 'Which trigger do you wish to fire?',
+ choices: L.map(trigger => trigger.name)
+ }])
+ .then(doFire)
+ .catch(next);
+
+ }, 'triggers');
+ } else {
+ doFire({ triggerName: name });
+ }
+};
diff --git a/client/lib/repl.js b/client/lib/repl.js
index d399199..9f1a985 100644
--- a/client/lib/repl.js
+++ b/client/lib/repl.js
@@ -75,6 +75,11 @@
needsEventBus: true,
synchronous: true
};
+var fire = {
+ handler: require('./commands/fire').fire,
+ description: 'Fire a trigger',
+ synchronous: true
+};
var list = {
handler: lister.listToConsole,
description: 'List available actions',
@@ -103,6 +108,9 @@
invoke: invoke,
i: invoke,
+ fire: fire,
+ f: fire,
+
attach: attach,
a: attach,
diff --git a/client/lib/rewriter.js b/client/lib/rewriter.js
index 95c278d..0bc22db 100644
--- a/client/lib/rewriter.js
+++ b/client/lib/rewriter.js
@@ -15,9 +15,12 @@
*/
var uuid = require('uuid'),
+ fs = require('fs'),
+ path = require('path'),
inquirer = require('inquirer'),
openwhisk = require('openwhisk'),
setupOpenWhisk = require('./util').setupOpenWhisk,
+ waitForActivationCompletion = require('./activations').waitForActivationCompletion,
lister = require('./commands/list'),
Namer = require('./namer'),
ok = require('./repl-messages').ok,
@@ -30,6 +33,9 @@
api = {
host: 'https://openwhisk.ng.bluemix.net',
path: '/api/v1'
+ },
+ debugBroker = {
+ host: 'https://owdbg-broker.mybluemix.net'
};
/** the dictionary of live attachments to actions */
@@ -64,16 +70,29 @@
if (counter === 0) {
return resolve(toClean.length);
}
- function countDown() {
+ function _countDown(resolver) {
if (--counter === 0) {
- resolve(toClean.length);
+ resolver(toClean.length);
}
}
+ var countDownError = _countDown.bind(undefined, reject);
+ var countDown = _countDown.bind(undefined, resolve);
+
toClean.forEach(function(entity) {
var params = {};
params[type + 'Name'] = entity.name;
- ow[types].delete(params).then(countDown,
- errorWhile('cleaning ' + types, countDown));
+ function clean() {
+ ow[types].delete(params)
+ .then(countDown)
+ .catch(errorWhile('cleaning ' + entity.name, countDownError));
+ }
+ if (type === 'rule') {
+ ow.rules.disable(params)
+ .then(clean)
+ .catch(errorWhile('disabling rule ' + entity.name, countDownError));
+ } else {
+ clean();
+ }
});
}, types);
});
@@ -85,90 +104,90 @@
])
.then(() =>
cleanType('rule')
- .then(ok(next),
- errorWhile('cleaning rules', next)),
- errorWhile('cleaning actions and triggers', next));
+ .then(ok(next))
+ .catch(errorWhile('cleaning rules', next)))
+ .catch(errorWhile('cleaning actions and triggers', next));
};
-function createUpstreamAdapterNames(continuationName) {
- return {
- ruleName: Namer.name('continuation-rule'),
- triggerName: Namer.name('continuation-trigger'),
- continuationName: continuationName || Namer.name('continuation-action'),
- createContinuationPlease: !continuationName,
- debugStubName: Namer.name('stub')
- };
-}
-
-function createUpstreamAdapter(ow, actionBeingDebugged, actionBeingDebuggedNamespace, names) {
- try {
- var work = [
- ow.triggers.create(names),
- ow.packages.create({ packageName: names.debugStubName,
- package: {
- binding: {
- namespace: invokerPackageNamespace,
- name: invokerPackageName
- },
- parameters: [{ key: 'action', value: actionBeingDebugged },
- { key: 'namespace', value: actionBeingDebuggedNamespace },
- { key: 'onDone_trigger', value: names.triggerName }
- ]
- }
- })
- ];
- if (names.createContinuationPlease) {
- work.push(ow.actions.create({ actionName: names.continuationName, action: echoContinuation(actionBeingDebugged,
- actionBeingDebuggedNamespace) }));
- }
- return Promise.all(work)
- .then(() => ow.rules.create({ ruleName: names.ruleName, trigger: names.triggerName, action: names.continuationName }),
- errorWhile('creating upstream adapter part 1'))
- .then(() => names, errorWhile('creating upstream adapter part 2'));
- } catch (e) {
- console.error(e);
- }
-}
-
-/**
- * Create a rule splice
- */
-function splice(ow, entity, entityNamespace, next) {
- try {
- var names = attached[entity] = {
- debugStubName: Namer.name('stub'),
+var UpstreamAdapter = {
+ createNames: function createUpstreamAdapterNames(continuationName) {
+ return {
+ ruleName: Namer.name('continuation-rule'),
triggerName: Namer.name('continuation-trigger'),
- continuationName: Namer.name('continuation-action'),
- ruleName: Namer.name('continuation-rule')
+ continuationName: continuationName || Namer.name('continuation-action'),
+ createContinuationPlease: !continuationName,
+ debugStubName: Namer.name('stub')
};
+ },
- Promise.all([ow.triggers.create(names),
- ow.actions.create({ actionName: names.continuationName, action: echoContinuation(entity, entityNamespace) }),
- ow.packages.create({ packageName: names.debugStubName,
- package: {
- binding: {
- namespace: invokerPackageNamespace,
- name: invokerPackageName
- },
- parameters: [{ key: 'action', value: entity },
- { key: 'namespace', value: entityNamespace },
- { key: 'onDone_trigger', value: names.triggerName }
- ]
- }
- })
- ])
- .then(function onSuccessOfPart1() {
- ow.rules
- .create({ ruleName: names.ruleName, trigger: names.triggerName, action: names.continuationName })
- .then(function() { next(names); },
- errorWhile('attaching to action', next));
- }, errorWhile('attaching to action', next));
+ invokerFQN: function(entityNamespace, names) {
+ return '/' + entityNamespace + '/' + names.debugStubName;// + '/' + invokerActionName;
+ },
+ invokerName: function(names) {
+ return names.debugStubName;// + '/' + invokerActionName;
+ },
- } catch (e) {
- console.error(e);
- next();
+ createInvoker: function createUpstreamAdapterInvoker_withActionClone(ow, names, actionBeingDebugged, actionBeingDebuggedNamespace) {
+ return new Promise((resolve, reject) => {
+ fs.readFile(path.join('..', 'invoker', 'owdbg-invoker.js'), (err, codeBuffer) => {
+ if (err) {
+ reject(err);
+ } else {
+ ow.actions.create({
+ actionName: names.debugStubName,
+ action: {
+ parameters: [{ key: 'action', value: actionBeingDebugged },
+ { key: 'namespace', value: actionBeingDebuggedNamespace },
+ { key: 'broker', value: debugBroker.host },
+ { key: 'onDone_trigger', value: names.triggerName }
+ ],
+ exec: {
+ kind: 'nodejs:6',
+ code: codeBuffer.toString('utf8')
+ }
+ }
+ }).then(resolve);
+ }
+ });
+ });
+ },
+ createInvoker_usingPackageBinding: function createUpstreamAdapterInvoker_usingPackageBinding(ow, names, actionBeingDebugged, actionBeingDebuggedNamespace) {
+ return ow.packages.create({ packageName: names.debugStubName,
+ package: {
+ binding: {
+ namespace: invokerPackageNamespace,
+ name: invokerPackageName
+ },
+ parameters: [{ key: 'action', value: actionBeingDebugged },
+ { key: 'namespace', value: actionBeingDebuggedNamespace },
+ { key: 'onDone_trigger', value: names.triggerName }
+ ]
+ }
+ });
+ },
+ create: function createUpstreamAdapter(ow, actionBeingDebugged, actionBeingDebuggedNamespace, names) {
+ try {
+ if (!names) {
+ names = UpstreamAdapter.createNames();
+ }
+ var work = [
+ ow.triggers.create(names), // create onDone_trigger
+ UpstreamAdapter.createInvoker(ow, names, actionBeingDebugged, actionBeingDebuggedNamespace),
+ ];
+ if (names.createContinuationPlease) {
+ work.push(ow.actions.create({ actionName: names.continuationName, action: echoContinuation(actionBeingDebugged,
+ actionBeingDebuggedNamespace) }));
+ }
+ return Promise.all(work)
+ .then(() => ow.rules.create({ ruleName: names.ruleName, trigger: names.triggerName, action: names.continuationName }),
+ errorWhile('creating upstream adapter part 1'))
+ .then(() => names, errorWhile('creating upstream adapter part 2'));
+ } catch (e) {
+ console.error(e);
+ console.error(e.stack);
+ }
}
-}
+};
/**
* Does the given sequence entity use the given action entity located in the given entityNamespace?
@@ -199,9 +218,9 @@
rewrite: function cloneRule(ow, ruleEntityWithDetails, entity, entityNamespace, names) {
return ow.rules.create({ ruleName: Namer.name('rule-clone'),
trigger: ruleEntityWithDetails.trigger,
- action: names.debugStubName // FIXME we need to copy the invoker into the user's namespace :(
- });
- //.then((newRule) => chainAttached(ruleEntityWithDetails.name) = newRule
+ action: names.debugStubName
+ })
+ .then(newRule => chainAttached[ruleEntityWithDetails.name] = names);
}
};
@@ -231,9 +250,9 @@
var fqn = '/' + entityNamespace + '/' + entity;
var afterSpliceContinuation = Namer.name('sequence-splice-after');
- var upstreamAdapterNames = createUpstreamAdapterNames(afterSpliceContinuation);
+ var upstreamAdapterNames = UpstreamAdapter.createNames(afterSpliceContinuation);
- var beforeSpliceUpstream = '/' + entityNamespace + '/' + upstreamAdapterNames.debugStubName + '/' + invokerActionName;
+ var beforeSpliceUpstream = UpstreamAdapter.invokerFQN(entityNamespace, upstreamAdapterNames);
//var afterSpliceContinuation = '/' + entityNamespace + '/' + upstreamAdapterNames.continuationName;
return Promise.all([
@@ -246,9 +265,9 @@
sequence,
afterSpliceSplitter.bind(undefined, fqn, finalBit)) // after: -\__continuation
- ]).then((beforeAndAfter) => { // a destructuring bind would clean this up
+ ]).then(beforeAndAfter => { // a destructuring bind would clean this up
// after the breakpoint, continue with the afterSplice
- return createUpstreamAdapter(ow, entity, entityNamespace, upstreamAdapterNames)
+ return UpstreamAdapter.create(ow, entity, entityNamespace, upstreamAdapterNames)
.then(() => {
//
// this sequence splice uses its own downstream trigger, not the generic one from the action splice
@@ -271,14 +290,14 @@
function doPar(ow, type, entity, next, each) {
var types = type + 's';
ow[types].list({ limit: 200 })
- .then((entities) => {
+ .then(entities => {
var counter = entities.length;
function countDown(names) {
if (--counter <= 0) {
ok_(next);
}
}
- entities.forEach((otherEntity) => {
+ entities.forEach(otherEntity => {
if (otherEntity.name === entity) {
// this is the entity itself. skip, because
// we're looking for uses in *other* entities
@@ -288,7 +307,7 @@
var opts = { namespace: otherEntity.namespace };
opts[type + 'Name'] = otherEntity.name;
ow[types].get(opts)
- .then((otherEntityWithDetails) => each(otherEntityWithDetails, countDown))
+ .then(otherEntityWithDetails => each(otherEntityWithDetails, countDown))
.catch(errorWhile('processing one ' + type, countDown));
}
});
@@ -318,7 +337,10 @@
var ow = setupOpenWhisk(wskprops);
console.log(' Creating action trampoline'.green);
- splice(ow, entity, entityNamespace, function afterSplice(names) {
+ UpstreamAdapter.create(ow, entity, entityNamespace).then(names => {
+ // remember the names, so that we can route invocations to the debug version
+ attached[entity] = names;
+
if (options && options['action-only']) {
//
// user asked not to instrument any rules or sequences
@@ -404,28 +426,33 @@
if (names) {
try {
var ow = setupOpenWhisk(wskprops);
- ow.rules.disable(names).then(function() {
- try {
- // first delete the action and rule and debug package
- Promise.all([ow.triggers.delete(names),
- ow.actions.delete({ actionName: names.continuationName }),
- ow.packages.delete({ packageName: names.debugStubName })
- ])
- .then(function(values) {
- // then we can delete the rule
- ow.rules.delete(names).then(function() {
- try {
- delete attached[entity];
- ok_(next);
- } catch (err) {
- errlog(5, true)(err);
- }
- }, errlog(4));
- }, errlog(3));
- } catch (err) { errlog(2, true)(err); }
- }, errlog(1));
- } catch (e) {
- console.error(e);
+ ow.rules.disable(names)
+ .then(() => {
+ try {
+ // first delete the action and rule and debug package
+ Promise.all([ow.triggers.delete(names),
+ ow.actions.delete({ actionName: names.continuationName }),
+ ow.actions.delete({ actionName: names.debugStubName }) // keep in sync with UpstreamAdapter
+ ])
+ .then(() => {
+ // then we can delete the rule
+ ow.rules.delete(names)
+ .then(() => {
+ try {
+ delete attached[entity];
+ ok_(next);
+ } catch (err) {
+ errlog(5, true)(err);
+ }
+ }).
+ catch(errlog(4));
+ })
+ .catch(errlog(3));
+ }
+ catch (err) { errlog(2, true)(err); }
+ }).catch(errlog(1));
+ } catch (err) {
+ errlog(0)(err);
}
}
}
@@ -485,8 +512,15 @@
if (!attachedTo) {
var seq = chainAttached[action];
if (seq) {
- invokeThisAction = seq.before;
- waitForThisAction = seq.after;
+ if (seq.before) {
+ // sequence
+ invokeThisAction = seq.before;
+ waitForThisAction = seq.after;
+ } else {
+ // rule: invoke the rule's action
+ invokeThisAction = seq.debugStubName;
+ waitForThisAction = seq.continuationName;
+ }
} else {
invokeThisAction = action;
@@ -494,7 +528,7 @@
}
} else {
- invokeThisAction = attachedTo.debugStubName + '/' + invokerActionName;
+ invokeThisAction = UpstreamAdapter.invokerName(attachedTo);
// these are now part of the debug stub binding
// params.action = action;
@@ -510,48 +544,10 @@
return next();
}
- var key = wskprops.AUTH;
var ow = setupOpenWhisk(wskprops);
- var owForActivations = openwhisk({
- api: api.host + api.path,
- api_key: key,
- namespace: '_'
- });
- ow.actions.invoke({
- actionName: invokeThisAction,
- params: params
- }).then(function onSuccess(activation) {
- if (activation && activation.activationId) {
- // successfully invoked
- if (!attachedTo) {
- console.log('Successfully invoked with activationId', activation.activationId);
- } else {
- // we'll wait for the result...
- }
-
- //
- // wait for activation completion
- //
- var timer = setInterval(function waitForResponse() {
- owForActivations.activations.list({ limit: 10 }).then(function(list) {
- for (var i = 0; i < list.length; i++) {
- var activation = list[i];
- if (activation.name === waitForThisAction) {
- clearInterval(timer);
- owForActivations.activations.get({ activation: activation.activationId }).then(function(activation) {
- console.log(JSON.stringify(activation, undefined, 4));
- eventBus.emit('invocation-done', activation);
- ok_(next);
- });
- break;
- }
- }
- });
- }, 300);
- }
- }, function onError(err) {
- console.error('Unable to invoke your specified action');
- next();
- });
+ ow.actions.invoke({ actionName: invokeThisAction, params: params })
+ .then(waitForActivationCompletion.bind(undefined, wskprops, eventBus, waitForThisAction))
+ .then(ok(next))
+ .catch(errorWhile('invoking your specified action', next));
};