[GEARPUMP-285] Fix false alarm of shutting down executor time out
Author: huafengw <fvunicorn@gmail.com>
Closes #169 from huafengw/timeout.
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
index c31f01f..0c46aca 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -144,15 +144,29 @@
sealed abstract class ApplicationStatus(val status: String)
extends Serializable{
override def toString: String = status
+
+ def canTransitTo(newStatus: ApplicationStatus): Boolean
+
}
sealed abstract class ApplicationTerminalStatus(override val status: String)
- extends ApplicationStatus(status)
+ extends ApplicationStatus(status) {
+
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+}
object ApplicationStatus {
- case object PENDING extends ApplicationStatus("pending")
+ case object PENDING extends ApplicationStatus("pending") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+ !newStatus.equals(NONEXIST)
+ }
+ }
- case object ACTIVE extends ApplicationStatus("active")
+ case object ACTIVE extends ApplicationStatus("active") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+ !newStatus.equals(NONEXIST) && !newStatus.equals(ACTIVE)
+ }
+ }
case object SUCCEEDED extends ApplicationTerminalStatus("succeeded")
@@ -160,5 +174,7 @@
case object TERMINATED extends ApplicationTerminalStatus("terminated")
- case object NONEXIST extends ApplicationStatus("nonexist")
+ case object NONEXIST extends ApplicationStatus("nonexist") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+ }
}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 049d11d..e41a2c5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -231,35 +231,40 @@
timeStamp: TimeStamp, error: Throwable): Unit = {
applicationRegistry.get(appId) match {
case Some(appRuntimeInfo) =>
- var updatedStatus: ApplicationRuntimeInfo = null
- LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
- newStatus match {
- case ApplicationStatus.ACTIVE =>
- updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
- sender ! AppMasterActivated(appId)
- case succeeded@ApplicationStatus.SUCCEEDED =>
- killAppMaster(appId, appRuntimeInfo.worker)
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
- appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
- client ! ApplicationSucceeded(appId)
- }
- case failed@ApplicationStatus.FAILED =>
- killAppMaster(appId, appRuntimeInfo.worker)
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
- appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
- client ! ApplicationFailed(appId, error)
- }
- case terminated@ApplicationStatus.TERMINATED =>
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
- case status =>
- LOG.error(s"App $appId should not change it's status to $status")
- }
+ if (appRuntimeInfo.status.canTransitTo(newStatus)) {
+ var updatedStatus: ApplicationRuntimeInfo = null
+ LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
+ newStatus match {
+ case ApplicationStatus.ACTIVE =>
+ updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
+ sender ! AppMasterActivated(appId)
+ case succeeded@ApplicationStatus.SUCCEEDED =>
+ killAppMaster(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
+ appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+ client ! ApplicationSucceeded(appId)
+ }
+ case failed@ApplicationStatus.FAILED =>
+ killAppMaster(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
+ appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+ client ! ApplicationFailed(appId, error)
+ }
+ case terminated@ApplicationStatus.TERMINATED =>
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+ case status =>
+ LOG.error(s"App $appId should not change it's status to $status")
+ }
- if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
- kvService ! DeleteKVGroup(appId.toString)
+ if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
+ kvService ! DeleteKVGroup(appId.toString)
+ }
+ applicationRegistry += appId -> updatedStatus
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
+ } else {
+ LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " +
+ s"to $newStatus")
}
- applicationRegistry += appId -> updatedStatus
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
case None =>
LOG.error(s"Can not find application runtime info for appId $appId when it's " +
s"status changed to ${newStatus.toString}")
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
new file mode 100644
index 0000000..743fe34
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+class ApplicationStatusSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+ "ApplicationStatus" should "check status transition properly" in {
+ val pending = ApplicationStatus.PENDING
+ assert(!pending.canTransitTo(ApplicationStatus.NONEXIST))
+ assert(pending.canTransitTo(ApplicationStatus.PENDING))
+ assert(pending.canTransitTo(ApplicationStatus.ACTIVE))
+ assert(pending.canTransitTo(ApplicationStatus.SUCCEEDED))
+
+ val active = ApplicationStatus.ACTIVE
+ assert(active.canTransitTo(ApplicationStatus.SUCCEEDED))
+ assert(active.canTransitTo(ApplicationStatus.PENDING))
+ assert(!active.canTransitTo(ApplicationStatus.ACTIVE))
+ assert(!active.canTransitTo(ApplicationStatus.NONEXIST))
+
+ val succeed = ApplicationStatus.SUCCEEDED
+ assert(!succeed.canTransitTo(ApplicationStatus.NONEXIST))
+ assert(!succeed.canTransitTo(ApplicationStatus.SUCCEEDED))
+ assert(!succeed.canTransitTo(ApplicationStatus.FAILED))
+ }
+}
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
similarity index 67%
rename from core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
rename to core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
index 6593836..664fc9c 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
@@ -22,16 +22,16 @@
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+class ApplicationMetaDataSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
- "ApplicationState" should "check equal with respect to only appId and attemptId" in {
+ "ApplicationMetaData" should "check equal with respect to only appId and attemptId" in {
val appDescription = AppDescription("app", "AppMaster", null)
- val stateA = ApplicationMetaData(0, 0, appDescription, null, null)
- val stateB = ApplicationMetaData(0, 0, appDescription, null, null)
- val stateC = ApplicationMetaData(0, 1, appDescription, null, null)
+ val metaDataA = ApplicationMetaData(0, 0, appDescription, null, null)
+ val metaDataB = ApplicationMetaData(0, 0, appDescription, null, null)
+ val metaDataC = ApplicationMetaData(0, 1, appDescription, null, null)
- assert(stateA == stateB)
- assert(stateA.hashCode == stateB.hashCode)
- assert(stateA != stateC)
+ assert(metaDataA == metaDataB)
+ assert(metaDataA.hashCode == metaDataB.hashCode)
+ assert(metaDataA != metaDataC)
}
}