| /* |
| * |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package com.webank.wedatasphere.linkis.manager.am.manager |
| |
| import java.util |
| |
| import com.webank.wedatasphere.linkis.common.ServiceInstance |
| import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} |
| import com.webank.wedatasphere.linkis.manager.common.entity.node._ |
| import com.webank.wedatasphere.linkis.manager.common.entity.persistence.PersistenceNodeEntity |
| import com.webank.wedatasphere.linkis.manager.common.protocol.engine.EngineStopRequest |
| import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest |
| import com.webank.wedatasphere.linkis.manager.exception.NodeInstanceDuplicateException |
| import com.webank.wedatasphere.linkis.manager.persistence.{NodeManagerPersistence, NodeMetricManagerPersistence} |
| import com.webank.wedatasphere.linkis.manager.service.common.metrics.MetricsConverter |
| import com.webank.wedatasphere.linkis.manager.service.common.pointer.NodePointerBuilder |
| import com.webank.wedatasphere.linkis.resourcemanager.service.ResourceManager |
| import org.springframework.beans.factory.annotation.Autowired |
| import org.springframework.stereotype.Component |
| |
| import scala.collection.JavaConversions._ |
| |
| |
| @Component |
| class DefaultEMNodeManager extends EMNodeManager with Logging { |
| |
| @Autowired |
| private var nodeManagerPersistence: NodeManagerPersistence = _ |
| |
| @Autowired |
| private var nodeMetricManagerPersistence: NodeMetricManagerPersistence = _ |
| |
| @Autowired |
| private var metricsConverter: MetricsConverter = _ |
| |
| @Autowired |
| private var nodePointerBuilder: NodePointerBuilder = _ |
| |
| @Autowired |
| private var resourceManager: ResourceManager = _ |
| |
| |
| override def emRegister(emNode: EMNode): Unit = { |
| nodeManagerPersistence.addNodeInstance(emNode) |
| // init metric |
| nodeMetricManagerPersistence.addOrupdateNodeMetrics(metricsConverter.getInitMetric(emNode.getServiceInstance)) |
| } |
| |
| override def addEMNodeInstance(emNode: EMNode):Unit = { |
| Utils.tryCatch(nodeManagerPersistence.addNodeInstance(emNode)){ |
| case e: NodeInstanceDuplicateException => |
| warn(s"em instance had exists, $emNode") |
| nodeManagerPersistence.updateEngineNode(emNode.getServiceInstance, emNode) |
| case t: Throwable => throw t |
| } |
| } |
| |
| override def initEMNodeMetrics(emNode: EMNode): Unit = { |
| nodeMetricManagerPersistence.addOrupdateNodeMetrics(metricsConverter.getInitMetric(emNode.getServiceInstance)) |
| } |
| |
| override def listEngines(emNode: EMNode): util.List[EngineNode] = { |
| val nodes = nodeManagerPersistence.getEngineNodeByEM(emNode.getServiceInstance) |
| val metricses = nodeMetricManagerPersistence.getNodeMetrics(nodes).map(m => (m.getServiceInstance.toString,m)).toMap |
| nodes.map{ node => |
| metricses.get(node.getServiceInstance.toString).foreach(metricsConverter.fillMetricsToNode(node,_)) |
| node |
| } |
| nodes |
| } |
| |
| |
| override def listUserEngines(emNode: EMNode, user: String): util.List[EngineNode] = { |
| listEngines(emNode).filter(_.getOwner.equals(user)) |
| } |
| |
| def listUserNodes(user: String): java.util.List[Node] = { |
| nodeManagerPersistence.getNodes(user) |
| } |
| |
| /** |
| * Get detailed em information from the persistence |
| * TODO add label to node ? |
| * |
| * @param scoreServiceInstances |
| * @return |
| */ |
| override def getEMNodes(scoreServiceInstances: Array[ScoreServiceInstance]): Array[EMNode] = { |
| |
| if (null == scoreServiceInstances || scoreServiceInstances.isEmpty) { |
| return null |
| } |
| val emNodes = scoreServiceInstances.map { |
| scoreServiceInstances => |
| val emNode = new AMEMNode() |
| emNode.setScore(scoreServiceInstances.getScore) |
| emNode.setServiceInstance(scoreServiceInstances.getServiceInstance) |
| emNode |
| } |
| //1. 增加nodeMetrics 2 增加RM信息 |
| val resourceInfo = resourceManager.getResourceInfo(scoreServiceInstances.map(_.getServiceInstance)) |
| val nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(emNodes.toList) |
| emNodes.map { emNode => |
| val optionMetrics = nodeMetrics.find(_.getServiceInstance.equals(emNode.getServiceInstance)) |
| val optionRMNode = resourceInfo.resourceInfo.find(_.getServiceInstance.equals(emNode.getServiceInstance)) |
| optionMetrics.foreach(metricsConverter.fillMetricsToNode(emNode, _)) |
| optionRMNode.foreach(rmNode => emNode.setNodeResource(rmNode.getNodeResource)) |
| emNode |
| } |
| emNodes.toArray |
| } |
| |
| override def getEM(serviceInstance: ServiceInstance): EMNode = { |
| val node = nodeManagerPersistence.getNode(serviceInstance) |
| if (null == node) { |
| info(s"This em of $serviceInstance not exists in db") |
| return null |
| } |
| val emNode = new AMEMNode() |
| emNode.setOwner(node.getOwner) |
| emNode.setServiceInstance(node.getServiceInstance) |
| node match { |
| case a: PersistenceNodeEntity => emNode.setStartTime(a.getStartTime) |
| case _ => |
| } |
| emNode.setMark(emNode.getMark) |
| metricsConverter.fillMetricsToNode(emNode, nodeMetricManagerPersistence.getNodeMetrics(emNode)) |
| emNode |
| } |
| |
| override def stopEM(emNode: EMNode): Unit = { |
| nodePointerBuilder.buildEMNodePointer(emNode).stopNode() |
| } |
| |
| override def deleteEM(emNode: EMNode): Unit = { |
| |
| nodeManagerPersistence.removeNodeInstance(emNode) |
| info("Finished to clear emNode instance info") |
| nodeMetricManagerPersistence.deleteNodeMetrics(emNode) |
| info("Finished to clear emNode metrics info") |
| } |
| |
| override def pauseEM(serviceInstance: ServiceInstance): Unit = { |
| |
| } |
| |
| /** |
| * 1. request engineManager to launch engine |
| * |
| * @param engineBuildRequest |
| * @param emNode |
| * @return |
| */ |
| override def createEngine(engineBuildRequest: EngineConnBuildRequest, emNode: EMNode): EngineNode = { |
| nodePointerBuilder.buildEMNodePointer(emNode).createEngine(engineBuildRequest) |
| } |
| |
| override def stopEngine(engineStopRequest: EngineStopRequest, emNode: EMNode): Unit = { |
| nodePointerBuilder.buildEMNodePointer(emNode).stopEngine(engineStopRequest) |
| } |
| |
| |
| } |