blob: be87da725775411224aa072caf6f7a04465d9307 [file] [log] [blame]
/*
*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.manager.am.service.engine
import java.util
import java.util.concurrent.{TimeUnit, TimeoutException}
import com.webank.wedatasphere.linkis.common.exception.LinkisRetryException
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.conf.GovernanceCommonConf
import com.webank.wedatasphere.linkis.manager.am.conf.AMConfiguration
import com.webank.wedatasphere.linkis.manager.am.label.EngineReuseLabelChooser
import com.webank.wedatasphere.linkis.manager.am.selector.NodeSelector
import com.webank.wedatasphere.linkis.manager.am.utils.AMUtils
import com.webank.wedatasphere.linkis.manager.common.constant.AMConstant
import com.webank.wedatasphere.linkis.manager.common.entity.node.EngineNode
import com.webank.wedatasphere.linkis.manager.common.protocol.engine.EngineReuseRequest
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.entity.engine.ReuseExclusionLabel
import com.webank.wedatasphere.linkis.manager.label.entity.node.AliasServiceInstanceLabel
import com.webank.wedatasphere.linkis.manager.label.service.{NodeLabelService, UserLabelService}
import com.webank.wedatasphere.linkis.manager.label.utils.LabelUtils
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import scala.collection.JavaConversions._
import scala.concurrent.duration.Duration
@Service
class DefaultEngineReuseService extends AbstractEngineService with EngineReuseService with Logging {
@Autowired
private var nodeSelector: NodeSelector = _
@Autowired
private var nodeLabelService: NodeLabelService = _
@Autowired
private var userLabelService: UserLabelService = _
@Autowired
private var engineReuseLabelChoosers: util.List[EngineReuseLabelChooser] = _
@Receiver
@throws[LinkisRetryException]
override def reuseEngine(engineReuseRequest: EngineReuseRequest): EngineNode = {
info(s"Start to reuse Engine for request: $engineReuseRequest")
//TODO Label Factory And builder
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
//Label:传入的Label和用户默认的Label 去重
var labelList: util.List[Label[_]] = LabelUtils.distinctLabel(labelBuilderFactory.getLabels(engineReuseRequest.getLabels),
userLabelService.getUserLabels(engineReuseRequest.getUser))
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
labelList.add(engineConnAliasLabel)
//label chooser
if (null != engineReuseLabelChoosers) {
engineReuseLabelChoosers.foreach { chooser =>
labelList = chooser.chooseLabels(labelList)
}
}
val instances = nodeLabelService.getScoredNodeMapsByLabels(labelList)
if (null == instances || instances.isEmpty) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, s"No engine can be reused")
}
labelList.find(_.isInstanceOf[ReuseExclusionLabel]) match {
case Some(l) =>
val exclusionInstances = l.asInstanceOf[ReuseExclusionLabel].getInstances
val instancesIterator = instances.iterator
while(instancesIterator.hasNext){
val instance = instancesIterator.next
if(exclusionInstances.contains(instance._1.getServiceInstance.getInstance)){
instancesIterator.remove
}
}
case None =>
}
if (null == instances || instances.isEmpty) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, s"No engine can be reused")
}
var engineScoreList = getEngineNodeManager.getEngineNodes(instances.map(_._1).toSeq.toArray)
var engine: EngineNode = null
var count = 1
val timeout = if (engineReuseRequest.getTimeOut <= 0) AMConfiguration.ENGINE_REUSE_MAX_TIME.getValue.toLong else engineReuseRequest.getTimeOut
val reuseLimit = if (engineReuseRequest.getReuseCount <= 0) AMConfiguration.ENGINE_REUSE_COUNT_LIMIT.getValue else engineReuseRequest.getReuseCount
def selectEngineToReuse: Boolean = {
if (count > reuseLimit) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, s"Engine reuse exceeds limit: $reuseLimit")
}
//3. 执行Select 判断label分数、判断是否可用、判断负载
val choseNode = nodeSelector.choseNode(engineScoreList.toArray)
//4. 获取Select后排在第一的engine,修改EngineNode的Label为新标签,并调用EngineNodeManager的reuse请求
if (choseNode.isEmpty) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused")
}
//TODO 需要加上Label不匹配判断?如果
//5. 调用EngineNodeManager 进行reuse 如果reuse失败,则去掉该engine进行重新reuse走3和4
engine = Utils.tryAndWarn(getEngineNodeManager.reuseEngine(choseNode.get.asInstanceOf[EngineNode]))
if (null == engine) {
count = count + 1
engineScoreList = engineScoreList.filter(!_.equals(choseNode.get))
}
null != engine
}
val startTime = System.currentTimeMillis()
try {
Utils.waitUntil(() => selectEngineToReuse, Duration(timeout, TimeUnit.MILLISECONDS))
} catch {
case e: TimeoutException =>
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, s"Waiting for Engine initialization failure, already waiting $timeout ms")
case t: Throwable =>
info(s"Failed to reuse engineConn time taken ${System.currentTimeMillis() - startTime}")
throw t
}
info(s"Finished to reuse Engine for request: $engineReuseRequest get EngineNode $engine, time taken ${System.currentTimeMillis() - startTime}")
val engineServiceLabelList = instances.filter(kv => kv._1.getServiceInstance.equals(engine.getServiceInstance))
if (null != engineServiceLabelList && engineServiceLabelList.nonEmpty) {
engine.setLabels(engineServiceLabelList.head._2)
} else {
error("Get choosen engineNode : " + AMUtils.GSON.toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances))
}
engine
}
}