| /* |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.webank.wedatasphere.linkis.ecm.server.service.impl |
| |
| import java.util |
| import java.util.concurrent.ConcurrentHashMap |
| |
| import com.google.common.collect.Interners |
| import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} |
| import com.webank.wedatasphere.linkis.ecm.core.engineconn.{EngineConn, YarnEngineConn} |
| import com.webank.wedatasphere.linkis.ecm.core.launch.EngineConnLaunchRunner |
| import com.webank.wedatasphere.linkis.ecm.core.listener.{ECMEvent, ECMEventListener} |
| import com.webank.wedatasphere.linkis.ecm.server.ECMApplication |
| import com.webank.wedatasphere.linkis.ecm.server.converter.ECMEngineConverter |
| import com.webank.wedatasphere.linkis.ecm.server.listener._ |
| import com.webank.wedatasphere.linkis.ecm.server.service.EngineConnListService |
| import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus |
| import com.webank.wedatasphere.linkis.manager.common.entity.resource.{Resource, ResourceType} |
| |
| import scala.collection.JavaConversions._ |
| |
| |
| class DefaultEngineConnListService extends EngineConnListService with ECMEventListener with Logging { |
| /** |
| * key:tickedId,value :engineConn |
| */ |
| private val engineConnMap = new ConcurrentHashMap[String, EngineConn] |
| |
| val lock = Interners.newWeakInterner[String] |
| |
| override def init(): Unit = {} |
| |
| override def getEngineConn(engineConnId: String): Option[EngineConn] = Option(engineConnMap.get(engineConnId)) |
| |
| override def getEngineConns: util.List[EngineConn] = engineConnMap.values().toList |
| |
| override def addEngineConn(engineConn: EngineConn): Unit = { |
| if (ECMApplication.isReady) |
| engineConnMap.put(engineConn.getTickedId, engineConn) |
| } |
| |
| override def killEngineConn(engineConnId: String): Unit = { |
| val conn = engineConnMap.remove(engineConnId) |
| if (conn != null) { |
| Utils.tryAndWarn{ |
| conn.close() |
| info(s"engineconn ${conn.getPid} was closed.") |
| } |
| } |
| } |
| |
| override def getUsedResources: Resource = engineConnMap.values().map(_.getResource.getMinResource).fold(Resource.initResource(ResourceType.Default))(_ + _) |
| |
| override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = ??? |
| |
| def updateYarnAppId(event: YarnAppIdCallbackEvent): Unit = { |
| updateYarnEngineConn(x => x.setApplicationId(event.protocol.applicationId), event.protocol.nodeId) |
| } |
| |
| def updateYarnEngineConn(implicit updateFunction: YarnEngineConn => Unit, nodeId: String): Unit = { |
| lock.intern(nodeId) synchronized { |
| engineConnMap.get(nodeId) match { |
| case e: YarnEngineConn => updateFunction(e) |
| case e: EngineConn => |
| engineConnMap.put(nodeId, ECMEngineConverter.engineConn2YarnEngineConn(e)) |
| } |
| } |
| } |
| |
| def updateEngineConn(updateFunction: EngineConn => Unit, nodeId: String): Unit = { |
| lock.intern(nodeId) synchronized { |
| engineConnMap.get(nodeId) match { |
| case e: EngineConn => updateFunction(e) |
| } |
| } |
| } |
| |
| def updateYarnInfo(event: YarnInfoCallbackEvent): Unit = { |
| updateYarnEngineConn(x => x.setApplicationURL(event.protocol.uri), event.protocol.nodeId) |
| } |
| |
| def updatePid(event: EngineConnPidCallbackEvent): Unit = { |
| updateEngineConn(x => { |
| x.setPid(event.protocol.pid) |
| x.setServiceInstance(event.protocol.serviceInstance) |
| }, event.protocol.ticketId) |
| } |
| |
| def updateEngineConnStatus(tickedId: String, updateStatus: NodeStatus): Unit = { |
| updateEngineConn(x => x.setStatus(updateStatus), tickedId) |
| } |
| |
| override def onEvent(event: ECMEvent): Unit = { |
| info(s"Deal event $event") |
| event match { |
| case event: ECMClosedEvent => shutdownEngineConns(event) |
| case event: YarnAppIdCallbackEvent => updateYarnAppId(event) |
| case event: YarnInfoCallbackEvent => updateYarnInfo(event) |
| case event: EngineConnPidCallbackEvent => updatePid(event) |
| case EngineConnAddEvent(engineConn) => addEngineConn(engineConn) |
| case EngineConnStatusChangeEvent(tickedId, updateStatus) => updateEngineConnStatus(tickedId, updateStatus) |
| case _ => |
| } |
| } |
| |
| private def shutdownEngineConns(event: ECMClosedEvent): Unit = { |
| engineConnMap.keys().foreach(killEngineConn) |
| } |
| |
| } |