Report the final exit status when EC exits (#4574)
* Report the final exit status when EC exits
* Report the final exit status when EC exits
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
index 67f8af0..dee4a88 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
@@ -209,6 +209,7 @@
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
+ engineReleaseRequest.setNodeStatus(getStatus)
Utils.tryAndWarn(Thread.sleep(500))
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index fb5dbb6..581b08a 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -94,15 +94,16 @@
)
return
}
+ val nodeStatus = accessibleExecutor.getStatus
if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
logger.error(
s"${accessibleExecutor.getId} has completed with status ${accessibleExecutor.getStatus}, now stop it."
)
- requestManagerReleaseExecutor("Completed release")
+ requestManagerReleaseExecutor("Completed release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
- requestManagerReleaseExecutor(" ShuttingDown release")
+ requestManagerReleaseExecutor(" ShuttingDown release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (
maxFreeTime > 0 && (NodeStatus.Unlock.equals(
@@ -118,7 +119,7 @@
s"${accessibleExecutor.getId} has not been used for $maxFreeTimeStr, now try to shutdown it."
)
accessibleExecutor.tryShutdown()
- requestManagerReleaseExecutor(" idle release")
+ requestManagerReleaseExecutor(" idle release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
}
@@ -131,14 +132,14 @@
)
}
- def requestManagerReleaseExecutor(msg: String): Unit = {
+ def requestManagerReleaseExecutor(msg: String, nodeStatus: NodeStatus): Unit = {
val engineReleaseRequest = new EngineConnReleaseRequest(
Sender.getThisServiceInstance,
Utils.getJvmUser,
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
- Utils.tryAndWarn(Thread.sleep(100))
+ engineReleaseRequest.setNodeStatus(nodeStatus)
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
index f103a66..604a9a1 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
@@ -32,6 +32,7 @@
def dealEngineStopRequest(engineSuicideRequest: EngineSuicideRequest, sender: Sender): Unit
+ @deprecated
def requestManagerReleaseExecutor(msg: String): Unit
def dealRequestNodeStatus(requestNodeStatus: RequestNodeStatus): ResponseNodeStatus
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 3c8b327..421c92a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -196,6 +196,7 @@
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
)
+ failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw t
@@ -220,6 +221,7 @@
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
)
+ failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw new LinkisRetryException(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
index 757fb39..1dc4d92 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
@@ -20,6 +20,7 @@
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.am.conf.AMConfiguration
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.node.EngineNode
import org.apache.linkis.manager.common.exception.RMErrorException
import org.apache.linkis.manager.common.protocol.engine.{
@@ -78,6 +79,9 @@
logger.info(s"Finished to kill engine invoke enginePointer ${node.getServiceInstance}")
}(s"Failed to stop engine ${node.getServiceInstance}")
node.setLabels(nodeLabelService.getNodeLabels(engineStopRequest.getServiceInstance))
+ if (null == node.getNodeStatus) {
+ node.setNodeStatus(NodeStatus.ShuttingDown)
+ }
engineConnInfoClear(node)
logger.info(
s" user ${engineStopRequest.getUser} finished to stop engine ${engineStopRequest.getServiceInstance}"
@@ -92,7 +96,7 @@
logger.info(s"Start to clear ec info $ecNode")
// 1. to clear engine resource
Utils.tryCatch {
- resourceManager.resourceReleased(ecNode.getLabels)
+ resourceManager.resourceReleased(ecNode)
} {
case exception: RMErrorException =>
if (exception.getErrCode != RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode) {
@@ -136,6 +140,7 @@
s"Send stop engine request ${engineConnReleaseRequest.getServiceInstance.toString}"
)
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+ engineNode.setNodeStatus(engineConnReleaseRequest.getNodeStatus)
engineConnInfoClear(engineNode)
} else {
logger.warn(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
index d22ad64..78e414d 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.service
import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.manager.common.entity.node.EngineNode
import org.apache.linkis.manager.common.entity.resource.NodeResource
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.rm.{ResourceInfo, ResultResource}
@@ -82,9 +83,9 @@
/**
* Method called when the resource usage is released 当资源使用完成释放后,调用的方法
*
- * @param labels
+ * @param ecNode
*/
- def resourceReleased(labels: util.List[Label[_]]): Unit
+ def resourceReleased(ecNode: EngineNode): Unit
/**
* If the IP and port are empty, return the resource status of all modules of a module * Return
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
index 8358c21..e720e47 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
@@ -23,7 +23,7 @@
import org.apache.linkis.manager.am.service.engine.EngineStopService
import org.apache.linkis.manager.common.conf.RMConfiguration
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, InfoRMNode}
+import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, EngineNode, InfoRMNode}
import org.apache.linkis.manager.common.entity.persistence.{
PersistenceLabel,
PersistenceLock,
@@ -211,6 +211,7 @@
ecNodes.foreach { engineNode =>
Utils.tryAndWarn {
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+ engineNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(engineNode)
}
}
@@ -368,9 +369,6 @@
* 当资源被实例化后,返回实际占用的资源总量
*
* @param labels
- * In general, resourceReleased will release the resources occupied by the user, but if the
- * process that uses the resource does not have time to call the resourceReleased method to die,
- * you need to unregister to release the resource.
* @param usedResource
*/
override def resourceUsed(labels: util.List[Label[_]], usedResource: NodeResource): Unit = {
@@ -610,10 +608,10 @@
/**
* Method called when the resource usage is released 当资源使用完成释放后,调用的方法
*
- * @param labels
+ * @param ecNode
*/
- override def resourceReleased(labels: util.List[Label[_]]): Unit = {
- val labelContainer = labelResourceService.enrichLabels(labels)
+ override def resourceReleased(ecNode: EngineNode): Unit = {
+ val labelContainer = labelResourceService.enrichLabels(ecNode.getLabels)
if (null == labelContainer.getEngineInstanceLabel) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode,
@@ -638,7 +636,12 @@
logger.info(
s"resourceRelease ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node resource${usedResource}"
)
- val status = getNodeStatus(labelContainer.getEngineInstanceLabel)
+
+ val status = if (null == ecNode.getNodeStatus) {
+ getNodeStatus(labelContainer.getEngineInstanceLabel)
+ } else {
+ ecNode.getNodeStatus
+ }
labelContainer.getResourceLabels.asScala
.filter(!_.isInstanceOf[EngineInstanceLabel])
@@ -844,7 +847,11 @@
logger.warn(
s"serviceInstance ${engineInstanceLabel.getServiceInstance} lock resource timeout, clear resource"
)
- resourceReleased(labels)
+ val ecNode = new AMEngineNode()
+ ecNode.setServiceInstance(engineInstanceLabel.getServiceInstance)
+ ecNode.setNodeStatus(NodeStatus.Failed)
+ ecNode.setLabels(labels)
+ resourceReleased(ecNode)
case _ =>
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
index b7c8172..31e269d 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
@@ -18,6 +18,7 @@
package org.apache.linkis.manager.common.protocol.engine;
import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
public class EngineConnReleaseRequest implements EngineRequest {
@@ -27,6 +28,8 @@
private String msg;
+ private NodeStatus nodeStatus;
+
public String getTicketId() {
return ticketId;
}
@@ -71,4 +74,12 @@
public void setMsg(String msg) {
this.msg = msg;
}
+
+ public NodeStatus getNodeStatus() {
+ return nodeStatus;
+ }
+
+ public void setNodeStatus(NodeStatus nodeStatus) {
+ this.nodeStatus = nodeStatus;
+ }
}