blob: da2188f763f6dc992aa9b3d45222f914935e3da9 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance
import com.webank.wedatasphere.linkis.common.exception.{DWCException, DWCRuntimeException, ErrorException}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException}
import com.webank.wedatasphere.linkis.entrance.execute.EntranceJob
import com.webank.wedatasphere.linkis.entrance.log.LogReader
import com.webank.wedatasphere.linkis.protocol.query.RequestPersistTask
import com.webank.wedatasphere.linkis.scheduler.queue.{Job, SchedulerEventState}
import com.webank.wedatasphere.linkis.server.conf.ServerConfiguration
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils
/**
* Created by enjoyyin on 2018/9/4.
*/
abstract class EntranceServer extends Logging {
private var entranceWebSocketService: Option[EntranceWebSocketService] = None
def init(): Unit
def getName: String
def getEntranceContext: EntranceContext
/**
* Execute a task and return an execId(执行一个task,返回一个execId)
* @param params
* @return
*/
def execute(params: java.util.Map[String, Any]): String = {
if(!params.containsKey(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)) info("received a request: " + params)
else params.remove(EntranceServer.DO_NOT_PRINT_PARAMS_LOG)
var task = getEntranceContext.getOrCreateEntranceParser().parseToTask(params)
task match {
case t: RequestPersistTask => if(StringUtils.isBlank(t.getRequestApplicationName))
throw new EntranceErrorException(20038, "requestApplicationName cannot be empty.")
case _ =>
}
//After parse the map into a task, we need to store it in the database, and the task can get a unique taskID.
//将map parse 成 task 之后,我们需要将它存储到数据库中,task可以获得唯一的taskID
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().persist(task)
val logAppender = new java.lang.StringBuilder()
Utils.tryThrow(getEntranceContext.getOrCreateEntranceInterceptors().foreach(int => task = int.apply(task, logAppender))) { t =>
val error = t match {
case error: ErrorException => error
case t1:Throwable => val exception = new EntranceErrorException(20039, "解析task失败!原因:" + ExceptionUtils.getRootCauseMessage(t1))
exception.initCause(t1)
exception
case _ => new EntranceErrorException(20039, "解析task失败!原因:" + ExceptionUtils.getRootCauseMessage(t))
}
task match {
case t: RequestPersistTask =>
t.setErrCode(error.getErrCode)
t.setErrDesc(error.getDesc)
t.setStatus(SchedulerEventState.Failed.toString)
t.setProgress(1.0f)
case _ =>
}
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(task)
error
}
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(task)
val job = getEntranceContext.getOrCreateEntranceParser().parseToJob(task)
job.init()
job.setLogListener(getEntranceContext.getOrCreateLogManager())
job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager())
job.setJobListener(getEntranceContext.getOrCreatePersistenceManager())
job match {
case entranceJob: EntranceJob => entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus)
case _ =>
}
Utils.tryCatch{
if(logAppender.length() > 0) job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim))
}{
t => logger.error("Failed to write init log, reason: ", t)
}
Utils.tryThrow{
getEntranceContext.getOrCreateScheduler().submit(job)
}{t =>
job.onFailure("Submitting the query failed!(提交查询失败!)", t)
val _task = getEntranceContext.getOrCreateEntranceParser().parseToTask(job)
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(_task)
t match {
case e: DWCException => e
case e: DWCRuntimeException => e
case t: Throwable =>
new SubmitFailedException(30009, "Submitting the query failed!(提交查询失败!)" + ExceptionUtils.getRootCauseMessage(t), t)
}
}
task match {
case requestPersistTask:RequestPersistTask => logger.info(s"Job ${job.getId} submitted and taskID is ${requestPersistTask.getTaskID}")
case _ => info(s"Job $job submitted!")
}
job.getId
}
def logReader(execId: String): LogReader
def getJob(execId: String): Option[Job] = getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job])
private[entrance] def getEntranceWebSocketService: Option[EntranceWebSocketService] = if(ServerConfiguration.BDP_SERVER_SOCKET_MODE.getValue) {
if(entranceWebSocketService.isEmpty) synchronized {
if(entranceWebSocketService.isEmpty) {
entranceWebSocketService = Some(new EntranceWebSocketService)
entranceWebSocketService.foreach(_.setEntranceServer(this))
entranceWebSocketService.foreach(getEntranceContext.getOrCreateEventListenerBus.addListener)
}
}
entranceWebSocketService
} else None
}
object EntranceServer {
val DO_NOT_PRINT_PARAMS_LOG = "doNotPrintParamsLog"
}