blob: bde5f337910246c529a799c9ae427f67dfa83de7 [file] [log] [blame]
/*
*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.manager.am.service.em
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.manager.am.conf.AMConfiguration
import com.webank.wedatasphere.linkis.manager.am.manager.EMNodeManager
import com.webank.wedatasphere.linkis.manager.common.protocol.em.{EMInfoClearRequest, StopEMRequest}
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.message.builder.ServiceMethodContext
import com.webank.wedatasphere.linkis.protocol.label.NodeLabelRemoveRequest
import com.webank.wedatasphere.linkis.rpc.utils.RPCUtils
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class DefaultEMUnregisterService extends EMUnregisterService with Logging {
@Autowired
private var emNodeManager: EMNodeManager = _
@Receiver
override def stopEM(stopEMRequest: StopEMRequest, smc: ServiceMethodContext): Unit = {
info(s" user ${stopEMRequest.getUser} prepare to stop em ${stopEMRequest.getEm}")
val node = emNodeManager.getEM(stopEMRequest.getEm)
if (null == node) return
if (node.getOwner != stopEMRequest.getUser) {
info(s" ${stopEMRequest.getUser} are not owner, will not to stopEM")
}
if (!RPCUtils.getServiceInstanceFromSender(smc.getSender).equals(stopEMRequest.getEm)) {
emNodeManager.stopEM(node)
}
info(s" user ${stopEMRequest.getUser} Finished to stop em process ${stopEMRequest.getEm}")
//clear RM info
val emClearRequest = new EMInfoClearRequest
emClearRequest.setEm(node)
emClearRequest.setUser(stopEMRequest.getUser)
val job = smc.publish(emClearRequest)
Utils.tryAndWarn(job.get(AMConfiguration.STOP_ENGINE_WAIT.getValue.toLong, TimeUnit.MILLISECONDS))
// clear Label
val instanceLabelRemoveRequest = new NodeLabelRemoveRequest(node.getServiceInstance, false)
val labelJob = smc.publish(instanceLabelRemoveRequest)
Utils.tryAndWarn(labelJob.get(AMConfiguration.STOP_ENGINE_WAIT.getValue.toLong, TimeUnit.MILLISECONDS))
//此处需要先清理ECM再等待,避免ECM重启过快,导致ECM资源没清理干净
clearEMInstanceInfo(emClearRequest)
info(s" user ${stopEMRequest.getUser} finished to stop em ${stopEMRequest.getEm}")
}
override def clearEMInstanceInfo(emClearRequest: EMInfoClearRequest): Unit = {
info(s" user ${emClearRequest.getUser} prepare to clear em info ${emClearRequest.getEm.getServiceInstance}")
emNodeManager.deleteEM(emClearRequest.getEm)
info(s" user ${emClearRequest.getUser} Finished to clear em info ${emClearRequest.getEm.getServiceInstance}")
}
}