blob: e01587877279ffcb29cb3c7d7d79230478dace8d [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.Interpreter
import java.io.{BufferedReader, InputStreamReader, PrintWriter}
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.spark.common.{LineBufferedStream, Starting, State, _}
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import org.apache.commons.io.IOUtils
import org.json4s._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Created by allenlliu on 2018/11/19.
*/
abstract class ProcessInterpreter(process: Process) extends Interpreter with Logging {
implicit val executor: ExecutionContext = ExecutionContext.global
protected[this] var _state: State = Starting()
protected[this] val stdin = new PrintWriter(process.getOutputStream)
protected[this] val stdout = new BufferedReader(new InputStreamReader(process.getInputStream()), 1)
protected[this] val errOut = new LineBufferedStream(process.getErrorStream())
override def state: State = _state
override def execute(code: String): ExecuteResponse = {
if(code == "sc.cancelAllJobs" || code == "sc.cancelAllJobs()") {
sendExecuteRequest(code)
}
_state match {
case (Dead() | ShuttingDown() | Error() | Success()) =>
throw new IllegalStateException("interpreter is not running")
case Idle() =>
require(state == Idle())
code match {
case "SHUTDOWN" =>
sendShutdownRequest()
close()
ErrorExecuteResponse("shutdown",new Exception("shutdown"))
case _ =>
_state = Busy()
sendExecuteRequest(code) match {
case Some(rep) =>
_state = Idle()
// ExecuteComplete(rep)
SuccessExecuteResponse()
case None =>
_state = Error()
val errorMsg = errOut.lines.mkString(", ")
throw new Exception(errorMsg)
}
}
case _ => throw new IllegalStateException(s"interpreter is in ${_state} state, cannot do query.")
}
}
Future {
val exitCode = process.waitFor()
if (exitCode != 0) {
errOut.lines.foreach(println)
println(getClass.getSimpleName+" has stopped with exit code " + process.exitValue)
_state = Error()
} else {
println(getClass.getSimpleName+" has finished.")
_state = Success()
}
}
protected def waitUntilReady(): Unit
protected def sendExecuteRequest(request: String): Option[JValue]
protected def sendShutdownRequest(): Unit = {}
override def close(): Unit = {
val future = Future {
_state match {
case (Dead() | ShuttingDown() | Success()) =>
Future.successful()
case _ =>
sendShutdownRequest()
}
}
_state = Dead()
IOUtils.closeQuietly(stdin)
IOUtils.closeQuietly(stdout)
errOut.close
// Give ourselves 10 seconds to tear down the process.
Utils.tryFinally(Await.result(future, Duration(10, TimeUnit.SECONDS))){
process.destroy()}
}
}