blob: 2e70f42ec9c2caecb9b343b9c22ec278337c9dd8 [file] [log] [blame]
/*
*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.manager.am.service.engine
import java.util
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.manager.am.recycle.RecyclingRuleExecutor
import com.webank.wedatasphere.linkis.manager.common.protocol.engine.{EngineRecyclingRequest, EngineStopRequest}
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.message.publisher.MessagePublisher
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import scala.collection.JavaConversions._
@Service
class DefaultEngineRecycleService extends AbstractEngineService with EngineRecycleService with Logging {
@Autowired
private var ruleExecutorList: util.List[RecyclingRuleExecutor] = _
@Autowired
private var publisher: MessagePublisher = _
@Receiver
override def recycleEngine(engineRecyclingRequest: EngineRecyclingRequest): Array[ServiceInstance] = {
if (null == ruleExecutorList) {
error("has not recycling rule")
return null
}
info(s"start to recycle engine by ${engineRecyclingRequest.getUser}")
//1. 规则解析
val ruleList = engineRecyclingRequest.getRecyclingRuleList
//2. 返回一系列待回收Engine,
val recyclingNodeSet = ruleList.flatMap { rule =>
val ruleExecutorOption = ruleExecutorList.find(_.ifAccept(rule))
if (ruleExecutorOption.isDefined) {
ruleExecutorOption.get.executeRule(rule)
} else {
Nil
}
}.filter(null != _).toSet
if (null == recyclingNodeSet) {
return null
}
info(s"The list of engines recycled this time is as follows:${recyclingNodeSet}")
//3. 调用EMService stopEngine
recyclingNodeSet.foreach { serviceInstance =>
val stopEngineRequest = new EngineStopRequest(serviceInstance, engineRecyclingRequest.getUser)
publisher.publish(stopEngineRequest)
}
info(s"Finished to recycle engine ,num ${recyclingNodeSet.size}")
recyclingNodeSet.toArray
}
}