blob: 04d20229c18cca35eea6c7dbd0e342a62658e00c [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.ecm.server.service.impl
import java.util
import java.util.concurrent.ConcurrentHashMap
import com.google.common.collect.Interners
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.ecm.core.engineconn.{EngineConn, YarnEngineConn}
import com.webank.wedatasphere.linkis.ecm.core.launch.EngineConnLaunchRunner
import com.webank.wedatasphere.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
import com.webank.wedatasphere.linkis.ecm.server.ECMApplication
import com.webank.wedatasphere.linkis.ecm.server.converter.ECMEngineConverter
import com.webank.wedatasphere.linkis.ecm.server.listener._
import com.webank.wedatasphere.linkis.ecm.server.service.EngineConnListService
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{Resource, ResourceType}
import scala.collection.JavaConversions._
class DefaultEngineConnListService extends EngineConnListService with ECMEventListener with Logging {
/**
* key:tickedId,value :engineConn
*/
private val engineConnMap = new ConcurrentHashMap[String, EngineConn]
val lock = Interners.newWeakInterner[String]
override def init(): Unit = {}
override def getEngineConn(engineConnId: String): Option[EngineConn] = Option(engineConnMap.get(engineConnId))
override def getEngineConns: util.List[EngineConn] = engineConnMap.values().toList
override def addEngineConn(engineConn: EngineConn): Unit = {
if (ECMApplication.isReady)
engineConnMap.put(engineConn.getTickedId, engineConn)
}
override def killEngineConn(engineConnId: String): Unit = {
val conn = engineConnMap.remove(engineConnId)
if (conn != null) {
Utils.tryAndWarn{
conn.close()
info(s"engineconn ${conn.getPid} was closed.")
}
}
}
override def getUsedResources: Resource = engineConnMap.values().map(_.getResource.getMinResource).fold(Resource.initResource(ResourceType.Default))(_ + _)
override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = ???
def updateYarnAppId(event: YarnAppIdCallbackEvent): Unit = {
updateYarnEngineConn(x => x.setApplicationId(event.protocol.applicationId), event.protocol.nodeId)
}
def updateYarnEngineConn(implicit updateFunction: YarnEngineConn => Unit, nodeId: String): Unit = {
lock.intern(nodeId) synchronized {
engineConnMap.get(nodeId) match {
case e: YarnEngineConn => updateFunction(e)
case e: EngineConn =>
engineConnMap.put(nodeId, ECMEngineConverter.engineConn2YarnEngineConn(e))
}
}
}
def updateEngineConn(updateFunction: EngineConn => Unit, nodeId: String): Unit = {
lock.intern(nodeId) synchronized {
engineConnMap.get(nodeId) match {
case e: EngineConn => updateFunction(e)
}
}
}
def updateYarnInfo(event: YarnInfoCallbackEvent): Unit = {
updateYarnEngineConn(x => x.setApplicationURL(event.protocol.uri), event.protocol.nodeId)
}
def updatePid(event: EngineConnPidCallbackEvent): Unit = {
updateEngineConn(x => {
x.setPid(event.protocol.pid)
x.setServiceInstance(event.protocol.serviceInstance)
}, event.protocol.ticketId)
}
def updateEngineConnStatus(tickedId: String, updateStatus: NodeStatus): Unit = {
updateEngineConn(x => x.setStatus(updateStatus), tickedId)
}
override def onEvent(event: ECMEvent): Unit = {
info(s"Deal event $event")
event match {
case event: ECMClosedEvent => shutdownEngineConns(event)
case event: YarnAppIdCallbackEvent => updateYarnAppId(event)
case event: YarnInfoCallbackEvent => updateYarnInfo(event)
case event: EngineConnPidCallbackEvent => updatePid(event)
case EngineConnAddEvent(engineConn) => addEngineConn(engineConn)
case EngineConnStatusChangeEvent(tickedId, updateStatus) => updateEngineConnStatus(tickedId, updateStatus)
case _ =>
}
}
private def shutdownEngineConns(event: ECMClosedEvent): Unit = {
engineConnMap.keys().foreach(killEngineConn)
}
}