Entrance memory usage optimization
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
index d9b3382..ca19f4d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
@@ -27,8 +27,10 @@
import org.apache.linkis.entrance.log.WebSocketCacheLogReader;
import org.apache.linkis.entrance.log.WebSocketLogWriter;
import org.apache.linkis.entrance.persistence.PersistenceManager;
+import org.apache.linkis.entrance.utils.CommonLogPathUtils;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
+import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.governance.common.protocol.task.RequestTask$;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.orchestrator.plans.ast.QueryParams$;
@@ -125,11 +127,12 @@
// add resultSet path root
Map<String, String> starupMapTmp = new HashMap<>();
Map<String, Object> starupMapOri = TaskUtils.getStartupMap(getParams());
+ JobRequest jobRequest = getJobRequest();
if (starupMapOri.isEmpty()) {
TaskUtils.addStartupMap(getParams(), starupMapOri);
}
if (!starupMapOri.containsKey(JobRequestConstants.JOB_REQUEST_LIST())) {
- starupMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(getJobRequest().getId()));
+ starupMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(jobRequest.getId()));
}
for (Map.Entry<String, Object> entry : starupMapOri.entrySet()) {
if (null != entry.getKey() && null != entry.getValue()) {
@@ -142,7 +145,7 @@
runtimeMapOri = TaskUtils.getRuntimeMap(getParams());
}
if (!runtimeMapOri.containsKey(JobRequestConstants.JOB_ID())) {
- runtimeMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(getJobRequest().getId()));
+ runtimeMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(jobRequest.getId()));
}
Map<String, String> runtimeMapTmp = new HashMap<>();
for (Map.Entry<String, Object> entry : runtimeMapOri.entrySet()) {
@@ -150,13 +153,21 @@
runtimeMapTmp.put(entry.getKey(), entry.getValue().toString());
}
}
+
String resultSetPathRoot = GovernanceCommonConf.RESULT_SET_STORE_PATH().getValue(runtimeMapTmp);
+
+ if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) {
+ String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest);
+ CommonLogPathUtils.buildCommonPath(resultParentPath);
+ resultSetPathRoot = CommonLogPathUtils.getResultPath(jobRequest);
+ }
+
Map<String, Object> jobMap = new HashMap<String, Object>();
jobMap.put(RequestTask$.MODULE$.RESULT_SET_STORE_PATH(), resultSetPathRoot);
runtimeMapOri.put(QueryParams$.MODULE$.JOB_KEY(), jobMap);
-
+ jobRequest.setResultLocation(resultSetPathRoot);
EntranceExecuteRequest executeRequest = new EntranceExecuteRequest(this);
- List<Label<?>> labels = new ArrayList<Label<?>>(getJobRequest().getLabels());
+ List<Label<?>> labels = new ArrayList<Label<?>>(jobRequest.getLabels());
executeRequest.setLabels(labels);
return executeRequest;
}
@@ -224,26 +235,32 @@
: "not submit to ec";
StringBuffer sb = new StringBuffer();
- sb.append("Task creation time(任务创建时间): ")
+ sb.append("Task time point information(任务时间节点信息):\n")
+ .append("[Task creation time(任务创建时间)] :")
.append(createTime)
- .append(", Task scheduling time(任务调度时间): ")
+ .append("\n")
+ .append("[Task scheduling time(任务调度时间)]:")
.append(scheduleTime)
- .append(", Task start time(任务开始时间): ")
+ .append("\n")
+ .append("[Task start time(任务开始时间)] :")
.append(startTime)
- .append(", Mission end time(任务结束时间): ")
+ .append("\n")
+ .append("[Task end time(任务结束时间)] :")
.append(endTime)
.append("\n")
.append(LogUtils.generateInfo(""))
- .append("Task submit to Orchestrator time:")
+ .append("[Task submit to Orchestrator time]:")
.append(jobToOrchestrator)
- .append(", Task request EngineConn time:")
+ .append("\n")
+ .append("[Task request EngineConn time] :")
.append(jobRequestEC)
- .append(", Task submit to EngineConn time:")
+ .append("\n")
+ .append("[Task submit to EngineConn time] :")
.append(jobSubmitToEC)
.append("\n")
.append(
LogUtils.generateInfo(
- "Your mission(您的任务) "
+ "Your task jobId(您的任务) "
+ this.getJobRequest().getId()
+ " The total time spent is(总耗时时间为): "
+ runTime));
@@ -269,4 +286,13 @@
logger.warn("Close logWriter and logReader failed. {}", e.getMessage(), e);
}
}
+
+ @Override
+ public void clear() {
+ super.clear();
+ this.setParams(null);
+ JobRequest jobRequest = this.getJobRequest();
+ jobRequest.setExecutionCode(null);
+ jobRequest.setMetrics(null);
+ }
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index f051b05..b912b58 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -164,6 +164,7 @@
}
cliHeartbeatMonitor.unRegisterIfCliJob(job);
updateJobStatus(job);
+ job.clear();
}
private void updateJobStatus(Job job) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
index 0737e25..f755860 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
@@ -17,12 +17,15 @@
package org.apache.linkis.entrance.restful;
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.constant.LabelValueConstant;
+import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.label.InsLabelRefreshRequest;
import org.apache.linkis.protocol.label.InsLabelRemoveRequest;
import org.apache.linkis.rpc.Sender;
@@ -30,12 +33,15 @@
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;
+import org.apache.commons.collections.CollectionUtils;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
@@ -129,7 +135,23 @@
@ApiOperation(value = "isOnline", notes = "entrance isOnline", response = Message.class)
@RequestMapping(path = "/isOnline", method = RequestMethod.GET)
public Message isOnline(HttpServletRequest req) {
- logger.info("Whether Entrance is online: {}", !offlineFlag);
- return Message.ok().data("isOnline", !offlineFlag);
+ String thisInstance = Sender.getThisInstance();
+ ServiceInstance mainInstance = DataWorkCloudApplication.getServiceInstance();
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setApplicationName(mainInstance.getApplicationName());
+ serviceInstance.setInstance(thisInstance);
+ List<Label<?>> labelFromInstance =
+ InstanceLabelClient.getInstance().getLabelFromInstance(serviceInstance);
+ boolean res = true;
+ String offline = "offline";
+ if (!CollectionUtils.isEmpty(labelFromInstance)) {
+ for (Label label : labelFromInstance) {
+ if (offline.equals(label.getValue())) {
+ res = false;
+ }
+ }
+ }
+ logger.info("Whether Entrance is online: {}", res);
+ return Message.ok().data("isOnline", res);
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 24c697c..0638ef5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -291,7 +291,10 @@
val msg = s"JobRequest (${entranceExecuteRequest.jobId()}) was submitted to Orchestrator."
logger.info(msg)
entranceExecuteRequest.getJob.getLogListener.foreach(
- _.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(msg))
+ _.onLogUpdate(
+ entranceExecuteRequest.getJob,
+ LogUtils.generateInfo(msg + "(您的任务已经提交给Orchestrator进行编排执行)")
+ )
)
if (entranceExecuteRequest.getJob.getJobRequest.getMetrics == null) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
index cdcbe01..99ae8b0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
@@ -237,12 +237,14 @@
// to remove metedata start param
TaskUtils.clearStartupMap(params)
- val onceLabel =
- LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
- classOf[ExecuteOnceLabel]
- )
- logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
- requestPersistTask.getLabels.add(onceLabel)
+ if (EntranceConfiguration.TEMPLATE_CONF_ADD_ONCE_LABEL_ENABLE.getValue) {
+ val onceLabel =
+ LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
+ classOf[ExecuteOnceLabel]
+ )
+ logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
+ requestPersistTask.getLabels.add(onceLabel)
+ }
}
}