blob: 8e24d6b8d464e734fc05785cc74957bbf5c9d711 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.linkis.entrance.restful;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.annotation.EntranceServerBeanAnnotation;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.log.LogReader;
import org.apache.linkis.entrance.utils.JobHistoryHelper;
import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.protocol.constants.TaskConstant;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.listener.LogListener;
import org.apache.linkis.scheduler.queue.Job;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SecurityFilter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import scala.Option;
import javax.servlet.http.HttpServletRequest;
import java.util.*;
/**
* Description: an implementation class of EntranceRestfulRemote
*/
@RestController
@RequestMapping(path = "/entrance")
public class EntranceRestfulApi implements EntranceRestfulRemote {
private EntranceServer entranceServer;
private static final Logger logger = LoggerFactory.getLogger(EntranceRestfulApi.class);
@EntranceServerBeanAnnotation.EntranceServerAutowiredAnnotation
public void setEntranceServer(EntranceServer entranceServer) {
this.entranceServer = entranceServer;
}
/**
* The execute function handles the request submitted by the user to execute the task, and the execution ID is returned to the user.
* execute函数处理的是用户提交执行任务的请求,返回给用户的是执行ID
* json Incoming key-value pair(传入的键值对)
* Repsonse
*/
@RequestMapping(path = "/execute",method = RequestMethod.POST)
public Message execute(HttpServletRequest req,@RequestBody Map<String, Object> json) {
Message message = null;
// try{
logger.info("Begin to get an execID");
json.put(TaskConstant.UMUSER, SecurityFilter.getLoginUsername(req));
HashMap<String, String> map = (HashMap) json.get(TaskConstant.SOURCE);
if(map == null){
map = new HashMap<>();
json.put(TaskConstant.SOURCE, map);
}
String ip = JobHistoryHelper.getRequestIpAddr(req);
map.put(TaskConstant.REQUEST_IP, ip);
String execID = entranceServer.execute(json);
Job job = entranceServer.getJob(execID).get();
JobRequest jobReq = ((EntranceJob) job).getJobRequest();
Long taskID = jobReq.getId();
pushLog(LogUtils.generateInfo("You have submitted a new job, script code (after variable substitution) is"), job);
pushLog("************************************SCRIPT CODE************************************", job);
pushLog(jobReq.getExecutionCode(), job);
pushLog("************************************SCRIPT CODE************************************", job);
pushLog(LogUtils.generateInfo("Your job is accepted, jobID is " + execID + " and taskID is " + taskID + " in " + Sender.getThisServiceInstance().toString() + ". Please wait it to be scheduled"), job);
execID = ZuulEntranceUtils.generateExecID(execID, Sender.getThisServiceInstance().getApplicationName(), new String[]{Sender.getThisInstance()});
message = Message.ok();
message.setMethod("/api/entrance/execute");
message.data("execID", execID);
message.data("taskID", taskID);
logger.info("End to get an an execID: {}, taskID: {}", execID, taskID);
// }catch(ErrorException e){
// message = Message.error(e.getDesc());
// message.setStatus(1);
// message.setMethod("/api/entrance/execute");
// }
return message;
}
@RequestMapping(path = "/submit",method = RequestMethod.POST)
public Message submit(HttpServletRequest req, @RequestBody Map<String, Object> json) {
Message message = null;
logger.info("Begin to get an execID");
json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req));
HashMap<String, String> map = (HashMap) json.get(TaskConstant.SOURCE);
if(map == null){
map = new HashMap<>();
json.put(TaskConstant.SOURCE, map);
}
String ip = JobHistoryHelper.getRequestIpAddr(req);
map.put(TaskConstant.REQUEST_IP, ip);
String execID = entranceServer.execute(json);
Job job = entranceServer.getJob(execID).get();
JobRequest jobRequest = ((EntranceJob) job).getJobRequest();
Long taskID = jobRequest.getId();
pushLog(LogUtils.generateInfo("You have submitted a new job, script code (after variable substitution) is"), job);
pushLog("************************************SCRIPT CODE************************************", job);
pushLog(jobRequest.getExecutionCode(), job);
pushLog("************************************SCRIPT CODE************************************", job);
pushLog(LogUtils.generateInfo("Your job is accepted, jobID is " + execID + " and taskID is " + taskID + " in " + Sender.getThisServiceInstance().toString() + ". Please wait it to be scheduled"), job);
execID = ZuulEntranceUtils.generateExecID(execID, Sender.getThisServiceInstance().getApplicationName(), new String[]{Sender.getThisInstance()});
message = Message.ok();
message.setMethod("/api/entrance/submit");
message.data("execID", execID);
message.data("taskID", taskID);
logger.info("End to get an an execID: {}, taskID: {}", execID, taskID);
return message;
}
private void pushLog(String log, Job job) {
entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log);
}
@RequestMapping(path = "/{id}/status",method = RequestMethod.GET)
public Message status(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) String taskID) {
Message message = null;
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option<Job> job = Option.apply(null);
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
logger.warn("获取任务 {} 状态时出现错误", realId, e.getMessage());
long realTaskID = Long.parseLong(taskID);
String status = JobHistoryHelper.getStatusByTaskID(realTaskID);
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/status");
message.data("status", status).data("execID", id);
return message;
}
if (job.isDefined()) {
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/status");
message.data("status", job.get().getState().toString()).data("execID", id);
} else {
message = Message.error("ID The corresponding job is empty and cannot obtain the corresponding task status.(ID 对应的job为空,不能获取相应的任务状态)");
}
return message;
}
@RequestMapping(path = "/{id}/progress",method = RequestMethod.GET)
public Message progress(@PathVariable("id") String id) {
Message message = null;
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option<Job> job = entranceServer.getJob(realId);
if (job.isDefined()) {
JobProgressInfo[] jobProgressInfos = ((EntranceJob) job.get()).getProgressInfo();
if (jobProgressInfos == null) {
message = Message.error("Can not get the corresponding progress information, it may be that the corresponding progress information has not been generated(不能获取相应的进度信息,可能是相应的进度信息还未生成)");
message.setMethod("/api/entrance/" + id + "/progress");
} else {
List<Map<String, Object>> list = new ArrayList<>();
for (JobProgressInfo jobProgressInfo : jobProgressInfos) {
if ("true".equals(EntranceConfiguration.PROGRESS_PUSH().getValue()) || jobProgressInfo.totalTasks() > 0) {
setJobProgressInfos(list, jobProgressInfo);
}
}
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/progress");
//TODO 去掉绝对值判断
message.data("progress", Math.abs(job.get().getProgress())).data("execID", id).data("progressInfo", list);
}
} else {
message = Message.error("The job corresponding to the ID is empty, and the corresponding task progress cannot be obtained.(ID 对应的job为空,不能获取相应的任务进度)");
}
return message;
}
private void setJobProgressInfos(List<Map<String, Object>> list, JobProgressInfo jobProgressInfo) {
Map<String, Object> map = new HashMap<>();
map.put("id", jobProgressInfo.id());
map.put("succeedTasks", jobProgressInfo.succeedTasks());
map.put("failedTasks", jobProgressInfo.failedTasks());
map.put("runningTasks", jobProgressInfo.runningTasks());
map.put("totalTasks", jobProgressInfo.totalTasks());
list.add(map);
}
@RequestMapping(path = "/{id}/log",method = RequestMethod.GET)
public Message log(HttpServletRequest req, @PathVariable("id") String id) {
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option<Job> job = Option.apply(null);
Message message = null;
try {
job = entranceServer.getJob(realId);
} catch (final Throwable t) {
message = Message.error("The job you just executed has ended. This interface no longer provides a query. It is recommended that you download the log file for viewing.(您刚刚执行的job已经结束,本接口不再提供查询,建议您下载日志文件进行查看)");
message.setMethod("/api/entrance/" + id + "/log");
return message;
}
if (job.isDefined()) {
logger.debug("begin to get log for {}(开始获取 {} 的日志)", job.get().getId(),job.get().getId());
LogReader logReader = entranceServer.getEntranceContext().getOrCreateLogManager().getLogReader(realId);
int fromLine = 0;
int size = 100;
boolean distinctLevel = true;
if (req != null) {
String fromLineStr = req.getParameter("fromLine");
String sizeStr = req.getParameter("size");
if (StringUtils.isNotBlank(fromLineStr)) {
fromLine = Math.max(Integer.parseInt(fromLineStr), 0);
}
if (StringUtils.isNotBlank(sizeStr)) {
size = Integer.parseInt(sizeStr) >= 0 ? Integer.parseInt(sizeStr) : 10000;
}
String distinctLevelStr = req.getParameter("distinctLevel");
if ("false".equals(distinctLevelStr)) {
distinctLevel = false;
}
}
Object retLog = null;
int retFromLine = 0;
try {
if (distinctLevel) {
String[] logs = new String[4];
retFromLine = logReader.readArray(logs, fromLine, size);
retLog = new ArrayList<String>(Arrays.asList(logs));
} else {
StringBuilder sb = new StringBuilder();
retFromLine = logReader.read(sb, fromLine, size);
retLog = sb.toString();
}
} catch (IllegalStateException e) {
logger.debug("Failed to get log information for :{}(为 {} 获取日志失败)", job.get().getId(), job.get().getId(),e);
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/log");
message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine);
} catch (final IllegalArgumentException e) {
logger.debug("Failed to get log information for :{}(为 {} 获取日志失败)", job.get().getId(), job.get().getId(),e);
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/log");
message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine);
return message;
} catch (final Exception e1) {
logger.debug("Failed to get log information for :{}(为 {} 获取日志失败)", job.get().getId(), job.get().getId(),e1);
message = Message.error("Failed to get log information(获取日志信息失败)");
message.setMethod("/api/entrance/" + id + "/log");
message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine);
return message;
} finally {
if (null != logReader && job.get().isCompleted()) {
IOUtils.closeQuietly(logReader);
}
}
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/log");
message.data("log", retLog).data("execID", id).data("fromLine", retFromLine + fromLine);
logger.debug("success to get log for {} (获取 {} 日志成功)", job.get().getId(),job.get().getId());
} else {
message = Message.error("Can't find execID(不能找到execID): " + id + "Corresponding job, can not get the corresponding log(对应的job,不能获得对应的日志)");
message.setMethod("/api/entrance/" + id + "/log");
}
return message;
}
@RequestMapping(path = "/killJobs",method = RequestMethod.POST)
public Message killJobs(HttpServletRequest req,@RequestBody JsonNode jsonNode) {
JsonNode idNode = jsonNode.get("idList");
JsonNode taskIDNode = jsonNode.get("taskIDList");
ArrayList<Long> waitToForceKill = new ArrayList<>();
if(idNode.size() != taskIDNode.size()){
return Message.error("The length of the ID list does not match the length of the TASKID list(id列表的长度与taskId列表的长度不一致)");
}
if(!idNode.isArray() || !taskIDNode.isArray()){
return Message.error("Request parameter error, please use array(请求参数错误,请使用数组)");
}
ArrayList<Message> messages = new ArrayList<>();
for(int i = 0; i < idNode.size(); i++){
String id = idNode.get(i).asText();
Long taskID = taskIDNode.get(i).asLong();
String realId = ZuulEntranceUtils.parseExecID(id)[3];
//通过jobid获取job,可能会由于job找不到而导致有looparray的报错,一旦报错的话,就可以将该任务直接置为Cancenlled
Option<Job> job = Option.apply(null);
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
logger.warn("can not find a job in entranceServer, will force to kill it", e.getMessage());
//如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的
waitToForceKill.add(taskID);
Message message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
messages.add(message);
continue;
}
Message message = null;
if (job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
waitToForceKill.add(taskID);
message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/killJobs");
message.setStatus(0);
messages.add(message);
} else {
try {
logger.info("begin to kill job {} ", job.get().getId());
job.get().kill();
message = Message.ok("Successfully killed the job(成功kill了job)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
message.data("execID", id);
//ensure the job's state is cancelled in database
if (job.get() instanceof EntranceJob) {
EntranceJob entranceJob = (EntranceJob) job.get();
JobRequest jobReq = entranceJob.getJobRequest();
entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString());
jobReq.setProgress("1.0f");
LogListener logListener = entranceJob.getLogListener().getOrElse(null);
if (null != logListener) {
logListener.onLogUpdate(entranceJob, "Job " + jobReq.getId() + " was kill by user successfully(任务" + jobReq.getId() + "已成功取消)");
}
this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq);
}
logger.info("end to kill job {} ", job.get().getId());
} catch (Throwable t) {
logger.error("kill job {} failed ", job.get().getId(), t);
message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(1);
}
}
messages.add(message);
}
if(!waitToForceKill.isEmpty()){
JobHistoryHelper.forceBatchKill(waitToForceKill);
}
return Message.ok("停止任务成功").data("messages", messages);
}
@RequestMapping(path = "/{id}/kill",method = RequestMethod.GET)
public Message kill(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) Long taskID) {
String realId = ZuulEntranceUtils.parseExecID(id)[3];
//通过jobid获取job,可能会由于job找不到而导致有looparray的报错,一旦报错的话,就可以将该任务直接置为Cancenlled
Option<Job> job = Option.apply(null);
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
logger.warn("can not find a job in entranceServer, will force to kill it", e);
//如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的
JobHistoryHelper.forceKill(taskID);
Message message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
}
Message message = null;
if (job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
//如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的
JobHistoryHelper.forceKill(taskID);
message = Message.ok("Forced Kill task (强制杀死任务)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
return message;
} else {
try {
logger.info("begin to kill job {} ", job.get().getId());
job.get().kill();
message = Message.ok("Successfully killed the job(成功kill了job)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(0);
message.data("execID", id);
//ensure the job's state is cancelled in database
if (job.get() instanceof EntranceJob) {
EntranceJob entranceJob = (EntranceJob) job.get();
JobRequest jobReq = entranceJob.getJobRequest();
entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString());
this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq);
}
logger.info("end to kill job {} ", job.get().getId());
} catch (Throwable t) {
logger.error("kill job {} failed ", job.get().getId(), t);
message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)");
message.setMethod("/api/entrance/" + id + "/kill");
message.setStatus(1);
}
}
return message;
}
@RequestMapping(path = "/{id}/pause",method = RequestMethod.GET)
public Message pause(@PathVariable("id") String id) {
String realId = ZuulEntranceUtils.parseExecID(id)[3];
Option<Job> job = entranceServer.getJob(realId);
Message message = null;
if (job.isEmpty()) {
message = Message.error("can not find the job of exexID :" + id +" can not pause (不能找到execID: " + id + "对应的job,不能进行pause)");
message.setMethod("/api/entrance/" + id + "/pause");
message.setStatus(1);
} else {
try {
//todo job pause 接口还未实现和给出
//job.pause();
logger.info("begin to pause job {} ", job.get().getId());
message = Message.ok("success to pause job (成功pause了job)");
message.setStatus(0);
message.data("execID", id);
message.setMethod("/api/entrance/" + id + "/pause");
logger.info("end to pause job {} ", job.get().getId());
} catch (Throwable t) {
logger.info("pause job {} failed ", job.get().getId());
message = Message.error("Abnormal when pausing job, pause failed(pause job的时候出现了异常,pause失败)");
message.setMethod("/api/entrance/" + id + "/pause");
message.setStatus(1);
}
}
return message;
}
}