blob: 322c81bd35cc95e4d433e9f65e2a67336a04f28a [file] [log] [blame]
package com.webank.wedatasphere.linkis.engine.shell.executor
import java.io.{BufferedReader, InputStreamReader}
import com.webank.wedatasphere.linkis.engine.execute.{EngineExecutor, EngineExecutorContext}
import com.webank.wedatasphere.linkis.engine.shell.conf.ShellEngineConfiguration
import com.webank.wedatasphere.linkis.engine.shell.exception.ShellCodeErrorException
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.resourcemanager.{LoadInstanceResource, Resource}
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.scheduler.executer._
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
/**
* created by cooperyang on 2019/5/14
* Description:
*/
class ShellEngineExecutor(user:String)
extends EngineExecutor(ShellEngineConfiguration.OUTPUT_LIMIT.getValue, isSupportParallelism = false) with SingleTaskOperateSupport with SingleTaskInfoSupport{
private val name:String = Sender.getThisServiceInstance.getInstance
override def getName: String = name
override def getActualUsedResources: Resource =
new LoadInstanceResource(Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory(), 2, 1)
private var process:Process = _
override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = {
val trimCode = code.trim
info(s"user $user begin to run code $trimCode")
var bufferedReader:BufferedReader = null
var errorsReader:BufferedReader = null
try{
val processBuilder:ProcessBuilder = new ProcessBuilder(generateRunCode(code):_*)
processBuilder.redirectErrorStream(true)
process = processBuilder.start()
bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
var line:String = null
while({line = bufferedReader.readLine(); line != null}){
info(line)
engineExecutorContext.appendStdout(line)
}
val errorLog = Stream.continually(errorsReader.readLine).takeWhile(_ != null).mkString("\n")
val exitCode = process.waitFor()
if (StringUtils.isNotEmpty(errorLog) || exitCode != 0){
error(s"exitCode is $exitCode")
error(errorLog)
engineExecutorContext.appendStdout("shell执行失败")
engineExecutorContext.appendStdout(errorLog)
ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
}else SuccessExecuteResponse()
}catch{
case e:Exception => {
error("Execute shell code failed, reason:", e)
ErrorExecuteResponse("run shell failed" ,e)
}
case t:Throwable => ErrorExecuteResponse("执行shell进程内部错误", t)
}finally {
IOUtils.closeQuietly(bufferedReader)
IOUtils.closeQuietly(errorsReader)
}
}
private def generateRunCode(code: String):Array[String] = {
Array("sh", "-c", code)
}
override protected def executeCompletely(engineExecutorContext: EngineExecutorContext, code: String, completedLine: String): ExecuteResponse = {
val completeCode = code + completedLine
executeLine(engineExecutorContext, completeCode)
}
override def kill(): Boolean = {
try{
process.destroy()
true
}catch{
case e:Exception => error(s"kill process ${process.toString} failed ", e)
false
case t:Throwable => error(s"kill process ${process.toString} failed " ,t)
false
}
}
override def pause(): Boolean = true
override def resume(): Boolean = true
override def progress(): Float = 0.5f
override def getProgressInfo: Array[JobProgressInfo] = Array()
override def log(): String = ""
override def close(): Unit = {
}
}