blob: dee4a885ddde4b6e2a2de22f37c28aaae2598d62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.linkis.engineconn.once.executor
import org.apache.linkis.bml.client.BmlClientFactory
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.hook.ShutdownHook
import org.apache.linkis.engineconn.core.util.EngineConnUtils
import org.apache.linkis.engineconn.executor.entity.{
ExecutableExecutor,
LabelExecutor,
ResourceExecutor
}
import org.apache.linkis.engineconn.executor.service.ManagerService
import org.apache.linkis.engineconn.once.executor.exception.OnceEngineConnErrorException
import org.apache.linkis.governance.common.protocol.task.RequestTask
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.engine.EngineConnReleaseRequest
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.entity.{JobLabel, Label}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.executer.{
AsynReturnExecuteResponse,
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import java.util
import scala.collection.convert.wrapAsScala._
import scala.collection.mutable.ArrayBuffer
trait OnceExecutor extends ExecutableExecutor[ExecuteResponse] with LabelExecutor with Logging {
private var executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]
override def getExecutorLabels(): util.List[Label[_]] = executorLabels
override def setExecutorLabels(labels: util.List[Label[_]]): Unit = this.executorLabels = labels
override final def execute(engineCreationContext: EngineCreationContext): ExecuteResponse = {
val onceExecutorExecutionContext = createOnceExecutorExecutionContext(engineCreationContext)
val arrayBuffer = new ArrayBuffer[Label[_]]
executorLabels.foreach(l => arrayBuffer += l)
onceExecutorExecutionContext.setLabels(arrayBuffer.toArray)
initOnceExecutorExecutionContext(onceExecutorExecutionContext)
execute(onceExecutorExecutionContext)
}
def execute(onceExecutorExecutionContext: OnceExecutorExecutionContext): ExecuteResponse
protected def createOnceExecutorExecutionContext(
engineCreationContext: EngineCreationContext
): OnceExecutorExecutionContext = {
val resource =
engineCreationContext.getOptions.get(OnceExecutorContentUtils.ONCE_EXECUTOR_CONTENT_KEY)
if (StringUtils.isEmpty(resource)) {
throw new OnceEngineConnErrorException(
12560,
OnceExecutorContentUtils.ONCE_EXECUTOR_CONTENT_KEY + " is not exist."
)
}
val bmlResource = OnceExecutorContentUtils.valueToResource(resource)
val bmlClient = BmlClientFactory.createBmlClient(engineCreationContext.getUser)
val contentStr = Utils.tryFinally {
val inputStream = bmlClient
.downloadResource(
engineCreationContext.getUser,
bmlResource.getResourceId,
bmlResource.getVersion
)
.inputStream
Utils.tryFinally(IOUtils.toString(inputStream, Configuration.BDP_ENCODING.getValue))(
IOUtils.closeQuietly(inputStream)
)
}(bmlClient.close())
val contentMap = EngineConnUtils.GSON.fromJson(contentStr, classOf[util.Map[String, Object]])
val onceExecutorContent = OnceExecutorContentUtils.mapToContent(contentMap)
new OnceExecutorExecutionContext(engineCreationContext, onceExecutorContent)
}
protected def initOnceExecutorExecutionContext(
onceExecutorExecutionContext: OnceExecutorExecutionContext
): Unit = {
val properties = onceExecutorExecutionContext.getOnceExecutorContent.getRuntimeMap
if (properties.containsKey(RequestTask.RESULT_SET_STORE_PATH)) {
onceExecutorExecutionContext.setStorePath(
properties.get(RequestTask.RESULT_SET_STORE_PATH).toString
)
logger.info(s"ResultSet storePath: ${onceExecutorExecutionContext.getStorePath}.")
}
if (onceExecutorExecutionContext.getOnceExecutorContent.getExtraLabels != null) {
val extraLabelsList = LabelBuilderFactoryContext.getLabelBuilderFactory
.getLabels(onceExecutorExecutionContext.getOnceExecutorContent.getExtraLabels)
val extraLabels = new ArrayBuffer[Label[_]]()
extraLabelsList.foreach(executorLabels += _)
onceExecutorExecutionContext.setLabels(
onceExecutorExecutionContext.getLabels ++: extraLabels.toArray
)
}
onceExecutorExecutionContext.getLabels.foreach {
case jobLabel: JobLabel =>
onceExecutorExecutionContext.setJobId(jobLabel.getJobId)
logger.info(s"JobId: ${onceExecutorExecutionContext.getJobId}.")
case _ =>
}
}
override def init(): Unit = tryReady()
override def tryReady(): Boolean = true
}
trait ManageableOnceExecutor extends AccessibleExecutor with OnceExecutor with ResourceExecutor {
private val notifyListeners = new ArrayBuffer[ExecuteResponse => Unit]
private var response: ExecuteResponse = _
override def tryReady(): Boolean = {
transition(NodeStatus.Running)
super.tryReady()
}
override def execute(
onceExecutorExecutionContext: OnceExecutorExecutionContext
): ExecuteResponse = {
submit(onceExecutorExecutionContext)
waitToRunning()
transition(NodeStatus.Busy)
new AsynReturnExecuteResponse {
override def notify(rs: ExecuteResponse => Unit): Unit = notifyListeners += rs
}
}
protected def submit(onceExecutorExecutionContext: OnceExecutorExecutionContext): Unit
protected def waitToRunning(): Unit
def waitForComplete(): Unit = this synchronized wait()
def getResponse: ExecuteResponse = response
protected def setResponse(response: ExecuteResponse): Unit = this.response = response
override protected def onStatusChanged(fromStatus: NodeStatus, toStatus: NodeStatus): Unit = {
if (NodeStatus.isCompleted(toStatus)) {
if (response == null) toStatus match {
case NodeStatus.Success => response = SuccessExecuteResponse()
case _ => response = ErrorExecuteResponse("Unknown reason.", null)
}
Utils.tryFinally(notifyListeners.foreach(_(getResponse)))(this synchronized notifyAll)
}
super.onStatusChanged(fromStatus, toStatus)
}
override def tryShutdown(): Boolean = tryFailed()
def tryFailed(): Boolean = {
if (isClosed) return true
val msg = s"$getId has failed with old status $getStatus, now stop it."
logger.error(msg)
Utils.tryFinally {
this.ensureAvailable(transition(NodeStatus.Failed))
close()
}(stopOnceExecutor(msg))
true
}
def trySucceed(): Boolean = {
if (isClosed) return true
val msg = s"$getId has succeed with old status $getStatus, now stop it."
logger.warn(msg)
Utils.tryFinally {
this.ensureAvailable(transition(NodeStatus.Success))
close()
}(stopOnceExecutor(msg))
true
}
private def stopOnceExecutor(msg: String): Unit = {
val engineReleaseRequest = new EngineConnReleaseRequest(
Sender.getThisServiceInstance,
Utils.getJvmUser,
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
engineReleaseRequest.setNodeStatus(getStatus)
Utils.tryAndWarn(Thread.sleep(500))
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
ShutdownHook.getShutdownHook.notifyStop()
}
override protected def callback(): Unit = {}
override def supportCallBackLogs(): Boolean = true
}