Do not skip last sequence if an exception occurs (#364)
diff --git a/provider/service.py b/provider/service.py
index fa8e109..b8a272f 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,11 +68,6 @@
# check whether or not the feed is capable of detecting canary
# documents
if change != None:
- # Record the sequence in case the changes feed needs to be
- # restarted. This way the new feed can pick up right where
- # the old one left off.
- self.lastSequence = change['seq']
-
if "deleted" in change and change["deleted"] == True:
logging.info('[changes] Found a delete')
consumer = self.consumers.getConsumerForTrigger(change['id'])
@@ -109,7 +104,20 @@
elif triggerIsAssignedToMe:
logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
- if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
+ # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
+ # so we need to forcefully delete the process before recreating it.
+ logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))
+
+ if existingConsumer.process.is_alive():
+ logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
+ existingConsumer.process.join(1)
+ else:
+ logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
+
+ self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
# disabled trigger has become active
logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
self.createAndRunConsumer(document)
@@ -123,6 +131,11 @@
self.lastCanaryTime = datetime.now()
else:
logging.debug('[changes] Found a change for a non-trigger document')
+
+ # Record the sequence in case the changes feed needs to be
+ # restarted. This way the new feed can pick up right where
+ # the old one left off.
+ self.lastSequence = change['seq']
except Exception as e:
logging.error('[canary] Exception caught from changes feed. Restarting changes feed...')
logging.error(e)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 0c05d63..9974bf5 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,10 +16,9 @@
*/
package system.packages
-import system.utils.KafkaUtils
-
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+import system.utils.KafkaUtils
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
@@ -40,6 +39,8 @@
import org.apache.openwhisk.utils.retry
import org.apache.openwhisk.core.entity.Annotations
import java.util.concurrent.ExecutionException
+import common.ActivationResult
+import common.TestUtils.SUCCESS_EXIT
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -113,6 +114,62 @@
runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
}
+ it should "create a trigger, delete that trigger, and quickly create it again with successful trigger fires" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+ val ruleName = s"dummyMessageHub-helloKafka-$currentTime"
+ val parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson
+ )
+
+ val key = "TheKey"
+ val verificationName = s"trigger-$currentTime"
+ val defaultAction = Some("dat/createTriggerActions.js")
+ val defaultActionName = s"helloKafka-$currentTime"
+
+ createTrigger(assetHelper, triggerName, parameters)
+
+ assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+ action.create(name, defaultAction, annotations = Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true)))
+ }
+
+ assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) =>
+ rule.create(name, trigger = triggerName, action = defaultActionName)
+ }
+
+ assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+ trigger.get(name, NOT_FOUND)
+ }
+
+ produceMessage(topic, key, verificationName)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+
+ wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT)
+ wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT)
+
+ val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+ val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult]
+ activation.response.success shouldBe true
+
+ wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT)
+
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(KafkaUtils.consumerInitTime)
+
+ val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+ consumerExists(uuid)
+
+ produceMessage(topic, key, verificationName)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+ }
+
it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) {
// payload size should be under the payload limit, but greater than 50% of the limit
val testPayloadSize = 600000