Verify trigger fire by sideeffect. (#272)

Until now, we verify the kafkaprovider, by looking if the action in OpenWhisk has been executed with `activation list`. The problem with this call is, that the activation may not be in the list in time, because it only returns the result of an CouchDB view. If there is much load on the database, the view-computation may be behind and not return the activation.
As side effect, we use a trigger creation. The name of the trigger will be read from the kafka message. All other things (like credentials, ...) are already present in the action, that is invoked anyway.
To check if the trigger exists, we use the ID of the trigger. So no view is involved anymore.
diff --git a/tests/dat/createTriggerActions.js b/tests/dat/createTriggerActions.js
new file mode 100644
index 0000000..58044ea
--- /dev/null
+++ b/tests/dat/createTriggerActions.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+    console.log(JSON.stringify(params));
+    var name = params.messages[0].value;
+    var ow = openwhisk();
+    return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 090352c..2b99938 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,25 +19,20 @@
 
 import java.util.concurrent.{TimeUnit, TimeoutException}
 
+import com.jayway.restassured.RestAssured
+import common.TestUtils.NOT_FOUND
+import common._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
 import system.utils.KafkaUtils
+import whisk.utils.retry
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
-import org.scalatest.junit.JUnitRunner
-import common.JsHelpers
-import common.TestHelpers
-import common.TestUtils
-import common.Wsk
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import com.jayway.restassured.RestAssured
-import org.apache.kafka.clients.producer.ProducerRecord
-import whisk.utils.retry;
 
 @RunWith(classOf[JUnitRunner])
 class BasicHealthTest
@@ -180,7 +175,10 @@
           }, N = 10, waitBeforeRetry = Some(1.second))
       }
 
-      val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
+      // This action creates a trigger if it gets executed.
+      // The name of the trigger will be the message, that has been send to kafka.
+      // We only create this trigger to verify, that the action has been executed after sending the message to kafka.
+      val defaultAction = Some("dat/createTriggerActions.js")
       val defaultActionName = s"helloKafka-$currentTime"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
@@ -194,9 +192,17 @@
       // key to use for the produced message
       val key = "TheKey"
 
-      println(s"Producing message with key: $key and value: $currentTime")
+      val verificationName = s"trigger-$currentTime"
+
+      // Check that the verification trigger does not exist before the action ran.
+      // This will also clean up the trigger after the test.
+      assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      println(s"Producing message with key: $key and value: $verificationName")
       val producer = kafkaUtils.createProducer()
-      val record = new ProducerRecord(topic, key, currentTime)
+      val record = new ProducerRecord(topic, key, verificationName)
       val future = producer.send(record)
 
       producer.flush()
@@ -212,11 +218,10 @@
         case e: Exception => throw e
       }
 
-      retry({
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries)
-        assert(activations.nonEmpty)
-      }, N = 3)
+      // Check if the trigger, that should have been created as reaction on the kafka-message, has been created.
+      // The trigger should have been created by the action, that has been triggered by the kafka message.
+      // If we cannot find it, the most probably the action did not run.
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
   it should "return correct status and configuration" in withAssetCleaner(wskprops) {