Do not skip last sequence if an exception occurs (#364)

diff --git a/provider/service.py b/provider/service.py
index fa8e109..b8a272f 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,11 +68,6 @@
                     # check whether or not the feed is capable of detecting canary
                     # documents
                     if change != None:
-                        # Record the sequence in case the changes feed needs to be
-                        # restarted. This way the new feed can pick up right where
-                        # the old one left off.
-                        self.lastSequence = change['seq']
-
                         if "deleted" in change and change["deleted"] == True:
                             logging.info('[changes] Found a delete')
                             consumer = self.consumers.getConsumerForTrigger(change['id'])
@@ -109,7 +104,20 @@
                                 elif triggerIsAssignedToMe:
                                     logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
 
-                                    if existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                    if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
+                                        # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
+                                        # so we need to forcefully delete the process before recreating it.
+                                        logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))
+
+                                        if existingConsumer.process.is_alive():
+                                            logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
+                                            existingConsumer.process.join(1)
+                                        else:
+                                            logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
+
+                                        self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+                                        self.createAndRunConsumer(document)
+                                    elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
                                         # disabled trigger has become active
                                         logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
                                         self.createAndRunConsumer(document)
@@ -123,6 +131,11 @@
                             self.lastCanaryTime = datetime.now()
                         else:
                             logging.debug('[changes] Found a change for a non-trigger document')
+
+                        # Record the sequence in case the changes feed needs to be
+                        # restarted. This way the new feed can pick up right where
+                        # the old one left off.
+                        self.lastSequence = change['seq']
             except Exception as e:
                 logging.error('[canary] Exception caught from changes feed. Restarting changes feed...')
                 logging.error(e)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 0c05d63..9974bf5 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,10 +16,9 @@
  */
 package system.packages
 
-import system.utils.KafkaUtils
-
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+import system.utils.KafkaUtils
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -40,6 +39,8 @@
 import org.apache.openwhisk.utils.retry
 import org.apache.openwhisk.core.entity.Annotations
 import java.util.concurrent.ExecutionException
+import common.ActivationResult
+import common.TestUtils.SUCCESS_EXIT
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -113,6 +114,62 @@
     runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false)
   }
 
+  it should "create a trigger, delete that trigger, and quickly create it again with successful trigger fires" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      val ruleName = s"dummyMessageHub-helloKafka-$currentTime"
+      val parameters = Map(
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
+        "topic" -> topic.toJson
+      )
+
+      val key = "TheKey"
+      val verificationName = s"trigger-$currentTime"
+      val defaultAction = Some("dat/createTriggerActions.js")
+      val defaultActionName = s"helloKafka-$currentTime"
+
+      createTrigger(assetHelper, triggerName, parameters)
+
+      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) =>
+        action.create(name, defaultAction, annotations = Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true)))
+      }
+
+      assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) =>
+        rule.create(name, trigger = triggerName, action = defaultActionName)
+      }
+
+      assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      produceMessage(topic, key, verificationName)
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+
+      wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT)
+      wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT)
+
+      val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+      val activation = wsk.parseJsonString(feedCreationResult.stdout.substring(0, feedCreationResult.stdout.indexOf("ok: created trigger"))).convertTo[ActivationResult]
+      activation.response.success shouldBe true
+
+      wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT)
+
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(KafkaUtils.consumerInitTime)
+
+      val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "")
+      consumerExists(uuid)
+
+      produceMessage(topic, key, verificationName)
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+  }
+
   it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) {
     // payload size should be under the payload limit, but greater than 50% of the limit
     val testPayloadSize = 600000