blob: ebbd2200ef9fff32d9018aa4157b88c090f6962f [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.core.executor
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineconn.core.engineconn.EngineConnManager
import com.webank.wedatasphere.linkis.engineconn.core.util.EngineConnUtils
import com.webank.wedatasphere.linkis.engineconn.executor.conf.EngineConnExecutorConfiguration
import com.webank.wedatasphere.linkis.engineconn.executor.entity.{Executor, LabelExecutor, SensibleExecutor}
import com.webank.wedatasphere.linkis.manager.engineplugin.common.EngineConnPlugin
import com.webank.wedatasphere.linkis.manager.engineplugin.common.creation.{CodeLanguageLabelExecutorFactory, ExecutorFactory, LabelExecutorFactory, MultiExecutorEngineConnFactory, SingleExecutorEngineConnFactory, SingleLabelExecutorEngineConnFactory}
import com.webank.wedatasphere.linkis.manager.engineplugin.common.exception.{EngineConnPluginErrorCode, EngineConnPluginErrorException}
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import scala.collection.JavaConverters._
trait ExecutorManager {
def getExecutor(id: String): Executor
def removeExecutor(id: String): Executor
def getExecutors: Array[Executor]
def generateExecutorId(): Int
def getReportExecutor: Executor
}
trait LabelExecutorManager extends ExecutorManager {
def getExecutorByLabels(labels: Array[Label[_]]): LabelExecutor
def removeExecutor(labels: Array[Label[_]]): LabelExecutor
}
class LabelExecutorManagerImpl extends LabelExecutorManager with Logging {
private lazy val executors: util.Map[String, LabelExecutor] = new ConcurrentHashMap[String, LabelExecutor](2)
protected val GSON = EngineConnUtils.GSON
private val idCreator = new AtomicInteger()
protected lazy val engineConn: EngineConn = EngineConnManager.getEngineConnManager.getEngineConn
protected val engineConnPlugin: EngineConnPlugin = EngineConnObject.getEngineConnPlugin
protected val (factories, defaultFactory): (Array[ExecutorFactory], ExecutorFactory) = engineConnPlugin.getEngineConnFactory match {
case engineConnFactory: SingleExecutorEngineConnFactory =>
(Array.empty, engineConnFactory)
case engineConnFactory: SingleLabelExecutorEngineConnFactory =>
(Array(engineConnFactory), engineConnFactory)
case engineConnFactory: MultiExecutorEngineConnFactory =>
(engineConnFactory.getExecutorFactories, engineConnFactory.getDefaultExecutorFactory)
case engineConnFactory =>
val errorMsg = "Not supported ExecutorFactory " + engineConnFactory.getClass.getSimpleName
error(errorMsg)
throw new EngineConnPluginErrorException(20011, errorMsg)
}
protected def tryCreateExecutor(engineCreationContext: EngineCreationContext,
labels: Array[Label[_]]): LabelExecutor = {
val labelExecutor = Option(labels).flatMap(_ => factories.find {
case labelExecutorFactory: LabelExecutorFactory => labelExecutorFactory.canCreate(labels)
case _ => false
}.map {
case labelExecutorFactory: LabelExecutorFactory => labelExecutorFactory.createExecutor(engineCreationContext, engineConn, labels)
}).getOrElse(defaultFactory.createExecutor(engineCreationContext, engineConn).asInstanceOf[LabelExecutor])
info(s"Finished create executor ${labelExecutor.getId}.")
labelExecutor.init()
info(s"Finished init executor ${labelExecutor.getId}.")
labelExecutor
}
protected def createExecutor(engineCreationContext: EngineCreationContext): LabelExecutor = {
defaultFactory match {
case labelExecutorFactory: CodeLanguageLabelExecutorFactory =>
createExecutor(engineCreationContext, Array[Label[_]](labelExecutorFactory.getDefaultCodeLanguageLabel))
case _ =>
val executor = tryCreateExecutor(engineCreationContext, null)
executors.put(executor.getId, executor)
executor
}
}
protected def getLabelKey(labels: Array[Label[_]]): String = labels.map(_.getStringValue).mkString("&")
protected def createExecutor(engineCreationContext: EngineCreationContext,
labels: Array[Label[_]]): LabelExecutor = {
if(null == labels || labels.isEmpty) return createExecutor(engineCreationContext)
val labelKey = getLabelKey(labels)
if (null == labelKey) {
val msg = "Cannot get label key. labels : " + GSON.toJson(labels)
throw new EngineConnPluginErrorException(EngineConnPluginErrorCode.INVALID_LABELS, msg)
}
val executor = tryCreateExecutor(engineCreationContext, labels)
executors.put(labelKey, executor)
executor
}
override def generateExecutorId(): Int = idCreator.getAndIncrement()
override def getExecutorByLabels(labels: Array[Label[_]]): LabelExecutor = {
val labelKey = getLabelKey(labels)
if (null == labelKey) return null
if (!executors.containsKey(labelKey)) {
createExecutor(engineConn.getEngineCreationContext, labels)
}
executors.get(labelKey)
}
override def getExecutor(id: String): Executor = executors.values().asScala.find(_.getId == id).orNull
override def getExecutors: Array[Executor] = executors.values().asScala.toArray
override def removeExecutor(labels: Array[Label[_]]): LabelExecutor = {
val labelKey = getLabelKey(labels)
if (labelKey != null && executors.containsKey(labelKey)) executors.remove(labelKey)
else null
}
override def removeExecutor(id: String): Executor = executors.asScala.find(_._2.getId == id).map{
case (k, _) => executors.remove(k)
}.orNull
override def getReportExecutor: Executor = if(getExecutors.isEmpty) createExecutor(engineConn.getEngineCreationContext)
else getExecutors.maxBy {
case executor: SensibleExecutor => executor.getLastActivityTime
case executor: Executor => executor.getId.hashCode
}
}
object ExecutorManager {
private var executorManager: ExecutorManager = _
private def init(): Unit = {
executorManager = Utils.getClassInstance[ExecutorManager](EngineConnExecutorConfiguration.EXECUTOR_MANAGER_CLASS.acquireNew)
}
def getInstance: ExecutorManager = {
if(executorManager == null) synchronized {
if(executorManager == null) init()
}
executorManager
}
}