blob: 25f5d976b4fb7112766234915ccb2bd5655685c6 [file] [log] [blame]
/*
*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.manager.am.service.engine
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.conf.GovernanceCommonConf
import com.webank.wedatasphere.linkis.manager.am.conf.AMConfiguration
import com.webank.wedatasphere.linkis.manager.common.protocol.engine.{EngineInfoClearRequest, EngineStopRequest, EngineSuicideRequest}
import com.webank.wedatasphere.linkis.manager.label.service.NodeLabelService
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.message.builder.ServiceMethodContext
import com.webank.wedatasphere.linkis.protocol.label.NodeLabelRemoveRequest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class DefaultEngineStopService extends AbstractEngineService with EngineStopService with Logging {
@Autowired
private var nodeLabelService: NodeLabelService = _
@Receiver
override def stopEngine(engineStopRequest: EngineStopRequest, smc: ServiceMethodContext): Unit = {
//TODO delete
engineStopRequest.getServiceInstance.setApplicationName(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
info(s" user ${engineStopRequest.getUser} prepare to stop engine ${engineStopRequest.getServiceInstance}")
val node = getEngineNodeManager.getEngineNode(engineStopRequest.getServiceInstance)
if (null == node) {
info(s" engineConn is not exists in db: $engineStopRequest ")
return
}
node.setLabels(nodeLabelService.getNodeLabels(engineStopRequest.getServiceInstance))
//clear RM and AM info
val engineInfoClearRequest = new EngineInfoClearRequest
engineInfoClearRequest.setEngineNode(node)
engineInfoClearRequest.setUser(engineStopRequest.getUser)
val job = smc.publish(engineInfoClearRequest)
Utils.tryAndWarn(job.get(AMConfiguration.STOP_ENGINE_WAIT.getValue.toLong, TimeUnit.MILLISECONDS))
info(s"Finished to clear RM info and stop Engine $node")
// clear Label
val instanceLabelRemoveRequest = new NodeLabelRemoveRequest(node.getServiceInstance, true)
val labelJob = smc.publish(instanceLabelRemoveRequest)
Utils.tryAndWarn(labelJob.get(AMConfiguration.STOP_ENGINE_WAIT.getValue.toLong, TimeUnit.MILLISECONDS))
info(s"Finished to clear engineNode $node Label info")
getEngineNodeManager.deleteEngineNode(node)
info(s" user ${engineStopRequest.getUser} finished to stop engine ${engineStopRequest.getServiceInstance}")
}
@Receiver
override def engineSuicide(engineSuicideRequest: EngineSuicideRequest, smc: ServiceMethodContext): Unit = {
info(s"Will ask engine : ${engineSuicideRequest.getServiceInstance.toString} of user : ${engineSuicideRequest.getUser} to suicide.")
EngineStopService.askEngineToSuicide(engineSuicideRequest)
}
}