Merge pull request #5126 from WeDataSphere/master-1.6.0
1. Entrance result set directory unified optimization
2. Entrance memory usage Optimize
3. JDBC Driver support use default db
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
index 9443f15..5522424 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala
@@ -65,6 +65,8 @@
val JOB_HISTORY_ADMIN = CommonVars("wds.linkis.jobhistory.admin", "hadoop")
+ val JOB_HISTORY_DEPARTMENT_ADMIN = CommonVars("wds.linkis.jobhistory.department.admin", "hadoop")
+
// Only the specified token has permission to call some api
val GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH = "ADMIN-"
@@ -124,6 +126,11 @@
.exists(username.equalsIgnoreCase)
}
+ def isDepartmentAdmin(username: String): Boolean = {
+ val departmentAdminUsers = JOB_HISTORY_DEPARTMENT_ADMIN.getHotValue.split(",")
+ departmentAdminUsers.exists(username.equalsIgnoreCase)
+ }
+
def getJobHistoryAdmin(): Array[String] = {
val adminUsers = GOVERNANCE_STATION_ADMIN.getHotValue.split(",")
val historyAdminUsers = JOB_HISTORY_ADMIN.getHotValue.split(",")
diff --git a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
index 6c5c125..67fecd0 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
+++ b/linkis-commons/linkis-hadoop-common/src/main/java/org/apache/linkis/hadoop/common/utils/KerberosUtils.java
@@ -17,12 +17,14 @@
package org.apache.linkis.hadoop.common.utils;
+import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.hadoop.common.conf.HadoopConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,10 @@
public class KerberosUtils {
private static final Logger LOG = LoggerFactory.getLogger(KerberosUtils.class);
+ private static boolean kerberosRefreshStarted = false;
+
+ private static final Object kerberosRefreshLock = new Object();
+
private KerberosUtils() {}
private static Configuration createKerberosSecurityConfiguration() {
@@ -81,20 +87,20 @@
public static Long getKerberosRefreshInterval() {
long refreshInterval;
- String refreshIntervalString = "86400000";
- // defined in linkis-env.sh, if not initialized then the default value is 86400000 ms (1d).
- if (System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL") != null) {
- refreshIntervalString = System.getenv("LINKIS_JDBC_KERBEROS_REFRESH_INTERVAL");
+ String refreshIntervalString = "43200";
+ // defined in linkis-env.sh, if not initialized then the default value is 43200 s (0.5d).
+ if (System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL") != null) {
+ refreshIntervalString = System.getenv("LINKIS_KERBEROS_REFRESH_INTERVAL");
}
try {
refreshInterval = Long.parseLong(refreshIntervalString);
} catch (NumberFormatException e) {
LOG.error(
- "Cannot get time in MS for the given string, "
+ "Cannot get time in S for the given string, "
+ refreshIntervalString
- + " defaulting to 86400000 ",
+ + " defaulting to 43200 ",
e);
- refreshInterval = 86400000L;
+ refreshInterval = 43200;
}
return refreshInterval;
}
@@ -102,14 +108,13 @@
public static Integer kinitFailTimesThreshold() {
Integer kinitFailThreshold = 5;
// defined in linkis-env.sh, if not initialized then the default value is 5.
- if (System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
+ if (System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD") != null) {
try {
- kinitFailThreshold =
- new Integer(System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD"));
+ kinitFailThreshold = new Integer(System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD"));
} catch (Exception e) {
LOG.error(
"Cannot get integer value from the given string, "
- + System.getenv("LINKIS_JDBC_KERBEROS_KINIT_FAIL_THRESHOLD")
+ + System.getenv("LINKIS_KERBEROS_KINIT_FAIL_THRESHOLD")
+ " defaulting to "
+ kinitFailThreshold,
e);
@@ -117,4 +122,70 @@
}
return kinitFailThreshold;
}
+
+ public static void checkStatus() {
+ try {
+ LOG.info("isSecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
+ LOG.info(
+ "userAuthenticationMethod:"
+ + UserGroupInformation.getLoginUser().getAuthenticationMethod());
+ UserGroupInformation loginUsr = UserGroupInformation.getLoginUser();
+ UserGroupInformation curUsr = UserGroupInformation.getCurrentUser();
+ LOG.info("LoginUser: " + loginUsr);
+ LOG.info("CurrentUser: " + curUsr);
+ if (curUsr == null) {
+ LOG.info("CurrentUser is null");
+ } else {
+ LOG.info("CurrentUser is not null");
+ }
+ if (loginUsr.getClass() != curUsr.getClass()) {
+ LOG.info("getClass() is different");
+ } else {
+ LOG.info("getClass() is same");
+ }
+ if (loginUsr.equals(curUsr)) {
+ LOG.info("subject is equal");
+ } else {
+ LOG.info("subject is not equal");
+ }
+ } catch (Exception e) {
+ LOG.error("UGI error: ", e.getMessage());
+ }
+ }
+
+ public static void startKerberosRefreshThread() {
+
+ if (kerberosRefreshStarted || !HadoopConf.KERBEROS_ENABLE()) {
+ LOG.warn(
+ "kerberos refresh thread had start or not kerberos {}", HadoopConf.HDFS_ENABLE_CACHE());
+ return;
+ }
+ synchronized (kerberosRefreshLock) {
+ if (kerberosRefreshStarted) {
+ LOG.warn("kerberos refresh thread had start");
+ return;
+ }
+ kerberosRefreshStarted = true;
+ LOG.info("kerberos Refresh tread started");
+ Utils.defaultScheduler()
+ .scheduleAtFixedRate(
+ () -> {
+ try {
+ checkStatus();
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ LOG.info("Trying re-login from keytab");
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ } else if (UserGroupInformation.isLoginTicketBased()) {
+ LOG.info("Trying re-login from ticket cache");
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to re-login", e);
+ }
+ },
+ getKerberosRefreshInterval(),
+ getKerberosRefreshInterval(),
+ TimeUnit.SECONDS);
+ }
+ }
}
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
index 16fb45e..c550b3f 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/conf/HadoopConf.scala
@@ -23,7 +23,7 @@
val HADOOP_ROOT_USER = CommonVars("wds.linkis.hadoop.root.user", "hadoop")
- val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false)
+ val KERBEROS_ENABLE = CommonVars("wds.linkis.keytab.enable", false).getValue
val KERBEROS_ENABLE_MAP =
CommonVars("linkis.keytab.enable.map", "cluster1=false,cluster2=true")
diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
index 1a69510..5e8d8a6 100644
--- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
+++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala
@@ -66,8 +66,7 @@
)
}
.foreach { hdfsFileSystemContainer =>
- val locker =
- hdfsFileSystemContainer.getUser + JOINT + hdfsFileSystemContainer.getLabel + LOCKER_SUFFIX
+ val locker = hdfsFileSystemContainer.getUser + LOCKER_SUFFIX
locker.intern() synchronized {
if (hdfsFileSystemContainer.canRemove()) {
fileSystemCache.remove(
@@ -248,7 +247,7 @@
def isKerberosEnabled(label: String): Boolean = {
if (label == null) {
- KERBEROS_ENABLE.getValue
+ KERBEROS_ENABLE
} else {
kerberosValueMapParser(KERBEROS_ENABLE_MAP.getValue).get(label).contains("true")
}
diff --git a/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java b/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java
index b84988a..5b29e1f 100644
--- a/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java
+++ b/linkis-commons/linkis-hadoop-common/src/test/java/org/apache/linkis/hadoop/common/utils/KerberosUtilsTest.java
@@ -28,7 +28,7 @@
public void getKerberosRefreshIntervalTest() {
Long refreshInterval = KerberosUtils.getKerberosRefreshInterval();
- Assertions.assertTrue(86400000L == refreshInterval.longValue());
+ Assertions.assertTrue(43200L == refreshInterval.longValue());
}
@Test
diff --git a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
index 44ca1da..7c2c7b3 100644
--- a/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
+++ b/linkis-commons/linkis-hadoop-common/src/test/scala/org/apache/linkis/hadoop/common/conf/HadoopConfTest.scala
@@ -26,7 +26,7 @@
def constTest(): Unit = {
Assertions.assertEquals("hadoop", HadoopConf.HADOOP_ROOT_USER.getValue)
- Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE.getValue)
+ Assertions.assertFalse(HadoopConf.KERBEROS_ENABLE)
Assertions.assertEquals("/appcom/keytab/", HadoopConf.KEYTAB_FILE.getValue)
Assertions.assertEquals("127.0.0.1", HadoopConf.KEYTAB_HOST.getValue)
Assertions.assertFalse(HadoopConf.KEYTAB_HOST_ENABLED.getValue)
diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala
index d402d7a..5e42540 100644
--- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala
+++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala
@@ -59,12 +59,21 @@
ConnectTimeoutException,
HttpHostConnectException
}
+import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustSelfSignedStrategy}
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.entity.mime.MultipartEntityBuilder
-import org.apache.http.impl.client.{BasicCookieStore, CloseableHttpClient, HttpClients}
+import org.apache.http.impl.client.{
+ BasicCookieStore,
+ CloseableHttpClient,
+ HttpClientBuilder,
+ HttpClients
+}
import org.apache.http.message.BasicNameValuePair
+import org.apache.http.ssl.SSLContextBuilder
import org.apache.http.util.EntityUtils
+import javax.net.ssl.{HostnameVerifier, SSLContext, SSLSession}
+
import java.net.URI
import java.nio.charset.Charset
import java.util
@@ -81,12 +90,26 @@
protected val cookieStore = new BasicCookieStore
- protected val httpClient: CloseableHttpClient = HttpClients
+ private val httpClientBuilder: HttpClientBuilder = HttpClients
.custom()
.setDefaultCookieStore(cookieStore)
.setMaxConnTotal(clientConfig.getMaxConnection)
.setMaxConnPerRoute(clientConfig.getMaxConnection / 2)
- .build
+
+ protected val httpClient: CloseableHttpClient = if (clientConfig.isSSL) {
+ val sslContext: SSLContext =
+ SSLContextBuilder.create.loadTrustMaterial(null, new TrustSelfSignedStrategy).build
+
+ val sslConnectionFactory = new SSLConnectionSocketFactory(
+ sslContext,
+ new HostnameVerifier() {
+ override def verify(hostname: String, session: SSLSession) = true
+ }
+ )
+ httpClientBuilder.setSSLSocketFactory(sslConnectionFactory).build()
+ } else {
+ httpClientBuilder.build()
+ }
if (clientConfig.getAuthenticationStrategy != null) {
clientConfig.getAuthenticationStrategy match {
diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfig.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfig.scala
index dbce2d3..dea081b 100644
--- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfig.scala
+++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfig.scala
@@ -44,6 +44,7 @@
private var maxConnection: Int = 20
private var retryEnabled: Boolean = _
private var retryHandler: RetryHandler = _
+ private var ssl: Boolean = false
protected[config] def this(
serverUrl: String,
@@ -59,7 +60,8 @@
retryEnabled: Boolean,
retryHandler: RetryHandler,
authTokenKey: String,
- authTokenValue: String
+ authTokenValue: String,
+ isSSL: Boolean = false
) = {
this()
this.serverUrl = serverUrl
@@ -78,6 +80,7 @@
this.retryHandler = retryHandler
this.authTokenKey = authTokenKey
this.authTokenValue = authTokenValue
+ this.ssl = isSSL
authenticationStrategy match {
case ab: AbstractAuthenticationStrategy => ab.setClientConfig(this)
case _ =>
@@ -123,4 +126,6 @@
def getRetryHandler: RetryHandler = retryHandler
+ def isSSL: Boolean = ssl
+
}
diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala
index b1fc579..a574b89 100644
--- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala
+++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala
@@ -40,6 +40,8 @@
protected var maxConnection: Int = _
protected var retryEnabled: Boolean = true
+ protected var ssl: Boolean = false
+
protected var retryHandler: RetryHandler = {
val retryHandler = new DefaultRetryHandler
retryHandler.addRetryException(classOf[LinkisRetryException])
@@ -112,6 +114,11 @@
this
}
+ def setSSL(isSSL: Boolean): this.type = {
+ this.ssl = isSSL
+ this
+ }
+
def build(): ClientConfig = new ClientConfig(
serverUrl,
discoveryEnabled,
@@ -126,7 +133,8 @@
retryEnabled,
retryHandler,
authTokenKey,
- authTokenValue
+ authTokenValue,
+ ssl
)
}
diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/proxy/ProxyUserService.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/proxy/ProxyUserService.java
new file mode 100644
index 0000000..cb4e6ee
--- /dev/null
+++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/proxy/ProxyUserService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.proxy;
+
+public interface ProxyUserService {
+
+ ProxyUserEntity getProxyUserEntity(String proxyUser, String loginUser);
+}
diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/utils/ModuleUserUtils.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/utils/ModuleUserUtils.java
index 3661a66..0c4b7de 100644
--- a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/utils/ModuleUserUtils.java
+++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/utils/ModuleUserUtils.java
@@ -113,4 +113,8 @@
}
return tokenUser;
}
+
+ public static void printAuditLog(String auditLogMsg) {
+ LOGGER.info(auditLogMsg);
+ }
}
diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/utils/LinkisSpringUtils.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/utils/LinkisSpringUtils.java
new file mode 100644
index 0000000..8021bb1
--- /dev/null
+++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/utils/LinkisSpringUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.utils;
+
+import javax.servlet.http.HttpServletRequest;
+
+public class LinkisSpringUtils {
+
+ public static String getClientIP(HttpServletRequest request) {
+ String clientIp = request.getHeader("X-Forwarded-For");
+
+ if (clientIp == null || clientIp.isEmpty() || "unknown".equalsIgnoreCase(clientIp)) {
+ clientIp = request.getHeader("Proxy-Client-IP");
+ }
+ if (clientIp == null || clientIp.isEmpty() || "unknown".equalsIgnoreCase(clientIp)) {
+ clientIp = request.getHeader("WL-Proxy-Client-IP");
+ }
+ if (clientIp == null || clientIp.isEmpty() || "unknown".equalsIgnoreCase(clientIp)) {
+ clientIp = request.getHeader("HTTP_CLIENT_IP");
+ }
+ if (clientIp == null || clientIp.isEmpty() || "unknown".equalsIgnoreCase(clientIp)) {
+ clientIp = request.getHeader("HTTP_X_FORWARDED_FOR");
+ }
+ if (clientIp == null || clientIp.isEmpty() || "unknown".equalsIgnoreCase(clientIp)) {
+ clientIp = request.getRemoteAddr();
+ }
+ if (clientIp != null && clientIp.contains(",")) {
+ clientIp = clientIp.split(",")[0];
+ }
+
+ return clientIp;
+ }
+}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
index de2b81b..8126ac8 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/AbstractScheduler.scala
@@ -17,14 +17,15 @@
package org.apache.linkis.scheduler
-import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.scheduler.conf.SchedulerConfiguration
import org.apache.linkis.scheduler.errorcode.LinkisSchedulerErrorCodeSummary._
import org.apache.linkis.scheduler.exception.SchedulerErrorException
import org.apache.linkis.scheduler.queue.SchedulerEvent
import org.apache.commons.lang3.StringUtils
-abstract class AbstractScheduler extends Scheduler {
+abstract class AbstractScheduler extends Scheduler with Logging {
override def init(): Unit = {}
override def start(): Unit = {}
@@ -52,6 +53,14 @@
val group = getSchedulerContext.getOrCreateGroupFactory.getOrCreateGroup(event)
val consumer =
getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer(group.getGroupName)
+ logger.info(
+ s"Consumer ${consumer.getGroup.getGroupName} running size ${consumer.getRunningSize} waiting size ${consumer.getWaitingSize}"
+ )
+ if (consumer.getWaitingSize >= SchedulerConfiguration.MAX_GROUP_ALTER_WAITING_SIZE) {
+ logger.warn(
+ s"Group waiting size exceed max alter waiting size ${consumer.getWaitingSize} group name ${consumer.getGroup.getGroupName}"
+ )
+ }
val index = consumer.getConsumeQueue.offer(event)
index.map(getEventId(_, group.getGroupName)).foreach(event.setId)
if (index.isEmpty) {
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
index c282905..e3b76ac 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/conf/SchedulerConfiguration.scala
@@ -33,4 +33,7 @@
val FIFO_CONSUMER_IDLE_SCAN_INIT_TIME =
CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new TimeType("1s"))
+ val MAX_GROUP_ALTER_WAITING_SIZE =
+ CommonVars("linkis.fifo.consumer.group.max.alter.waiting.size", 1000).getValue
+
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/ConsumeQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/ConsumeQueue.scala
index 14c9061..7761a9f 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/ConsumeQueue.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/ConsumeQueue.scala
@@ -21,6 +21,8 @@
def remove(event: SchedulerEvent): Unit
def getWaitingEvents: Array[SchedulerEvent]
def size: Int
+
+ def waitingSize: Int
def isEmpty: Boolean
def isFull: Boolean
def clearAll(): Unit
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
index 165a274..539a2a4 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala
@@ -38,6 +38,12 @@
def getRunningEvents: Array[SchedulerEvent]
+ def getMaxRunningEvents: Int
+
+ def getRunningSize: Int
+
+ def getWaitingSize: Int
+
def start(): Unit
def shutdown(): Unit = {
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
index 7534f74..d513ecc 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Job.scala
@@ -196,15 +196,10 @@
): Unit = toState match {
case Inited =>
jobListener.foreach(_.onJobInited(this))
- // TODO Add event(加事件)
case Scheduled =>
jobListener.foreach(_.onJobScheduled(this))
- logListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("job is scheduled.")))
- // TODO Add event(加事件)
case Running =>
jobListener.foreach(_.onJobRunning(this))
- logListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("job is running.")))
- // TODO job start event
case WaitForRetry =>
jobListener.foreach(_.onJobWaitForRetry(this))
case _ =>
@@ -300,6 +295,7 @@
}
override def run(): Unit = {
+ Thread.currentThread().setName(s"Job_${toString}_Thread")
if (!isScheduled || interrupt) return
startTime = System.currentTimeMillis
Utils.tryAndWarn(transition(Running))
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
index 8bea7e5..c18f18d 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala
@@ -109,7 +109,7 @@
max
}
- def waitingSize: Int = if (takeIndex <= realSize) size
+ override def waitingSize: Int = if (takeIndex <= realSize) size
else {
val length = size - takeIndex + realSize
if (length < 0) 0 else length
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
index df296f8..b4ffbfa 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala
@@ -21,6 +21,7 @@
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.scheduler.SchedulerContext
+import org.apache.linkis.scheduler.conf.SchedulerConfiguration
import org.apache.linkis.scheduler.errorcode.LinkisSchedulerErrorCodeSummary._
import org.apache.linkis.scheduler.exception.SchedulerErrorException
import org.apache.linkis.scheduler.executer.Executor
@@ -178,6 +179,9 @@
totalDuration
)
job.consumerFuture = null
+ logger.info(
+ s"FIFOUserConsumer ${getGroup.getGroupName} running size ${getRunningSize} waiting size ${getWaitingSize}"
+ )
executor.foreach { executor =>
job.setExecutor(executor)
job.future = executeService.submit(job)
@@ -255,4 +259,14 @@
this.queue.peek.isEmpty && !this.runningJobs.exists(job => job != null && !job.isCompleted)
}
+ override def getMaxRunningEvents: Int = this.maxRunningJobsNum
+
+ override def getRunningSize: Int = {
+ runningJobs.count(job => job != null && !job.isCompleted)
+ }
+
+ override def getWaitingSize: Int = {
+ queue.waitingSize
+ }
+
}
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
index 1e753ea..c64158e 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManager.scala
@@ -27,6 +27,11 @@
import scala.collection.mutable
+/**
+ * @param maxParallelismUsers
+ * Consumer Thread pool size is:5 * maxParallelismUsers + 1
+ * @param schedulerName
+ */
class ParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
extends ConsumerManager
with Logging {
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImpl.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImpl.scala
index 99fa57b..5b060a9 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImpl.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImpl.scala
@@ -25,8 +25,14 @@
extends FIFOSchedulerContextImpl(maxParallelismUsers)
with Logging {
+ /**
+ * Set the number of consumption groups supported The number of concurrency supported by each
+ * group is determined by
+ * org.apache.linkis.scheduler.queue.fifoqueue.FIFOGroupFactory#setDefaultMaxRunningJobs(int)
+ */
override protected def createGroupFactory(): GroupFactory = {
val groupFactory = new ParallelGroupFactory
+
groupFactory.setParallelism(maxParallelismUsers)
groupFactory
}
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java
index 07df8b7..cfa57d4 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java
@@ -73,11 +73,16 @@
ParseResult result = parser.parse(args);
ParsedTplValidator parsedTplValidator = new ParsedTplValidator();
+
parsedTplValidator.doValidation(result.getParsedTemplate());
Params params = result.getParams();
logger.debug("==========params============\n" + CliUtils.GSON.toJson(params));
+ /*
+ VarAccess for sys_prop, sys_env
+ */
+
Map<String, ClientProperties> propertiesMap = new HashMap<>();
LoggerManager.getInformationLogger()
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java
index 1fb2104..24ee3c8 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java
@@ -69,7 +69,7 @@
CmdTemplate template = CmdTemplateFactory.getTemplateOri(e.getCmdType());
if (template != null) {
HelpInfoModel model = new HelpInfoModel();
- model.buildModel(ctx.getTemplate());
+ model.buildModel(template);
new HelpPresenter().present(model);
}
LoggerManager.getInformationLogger().error("Failed to build CliCtx", e);
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/BaseOption.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/BaseOption.java
index e497401..eee29c8 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/BaseOption.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/BaseOption.java
@@ -93,7 +93,7 @@
}
public T getValue() {
- return this.value;
+ return this.value == null ? this.defaultValue : this.value;
}
public void setValue(T value) {
diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Flag.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Flag.java
index ee66a64..47af30b 100644
--- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Flag.java
+++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/option/Flag.java
@@ -43,14 +43,9 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("\t")
- .append(StringUtils.join(paramNames, "|"))
- .append(" <")
- .append(this.getDefaultValue().getClass().getSimpleName())
- .append(">")
- .append(System.lineSeparator());
+ sb.append("\t").append(StringUtils.join(paramNames, "|")).append(System.lineSeparator());
- sb.append("\t\t").append(this.getDefaultValue()).append(System.lineSeparator());
+ sb.append("\t\t").append(this.getDescription()).append(System.lineSeparator());
sb.append("\t\tdefault by: ").append(this.getDefaultValue()).append(System.lineSeparator());
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
index 45f3f49..bc1bb75 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
@@ -24,7 +24,8 @@
EngineConnOperateAction,
GetEngineConnAction,
KillEngineConnAction,
- LinkisManagerAction
+ LinkisManagerAction,
+ ListEngineConnAction
}
import org.apache.linkis.computation.client.once.result.{
AskEngineConnResult,
@@ -32,7 +33,8 @@
EngineConnOperateResult,
GetEngineConnResult,
KillEngineConnResult,
- LinkisManagerResult
+ LinkisManagerResult,
+ ListEngineConnResult
}
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.request.Action
@@ -50,6 +52,8 @@
def killEngineConn(killEngineConnAction: KillEngineConnAction): KillEngineConnResult
+ def listEngineConn(listEngineConnAction: ListEngineConnAction): ListEngineConnResult
+
def executeEngineConnOperation(
engineConnOperateAction: EngineConnOperateAction
): EngineConnOperateResult
@@ -104,4 +108,8 @@
override def askEngineConn(askEngineConnAction: AskEngineConnAction): AskEngineConnResult =
execute(askEngineConnAction)
+ override def listEngineConn(listEngineConnAction: ListEngineConnAction): ListEngineConnResult = {
+ execute(listEngineConnAction)
+ }
+
}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ListEngineConnAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ListEngineConnAction.scala
new file mode 100644
index 0000000..c76a5e7
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/ListEngineConnAction.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.action
+
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
+
+class ListEngineConnAction extends GetAction with LinkisManagerAction {
+ override def suffixURLs: Array[String] = Array("linkisManager", "listUserEngines")
+}
+
+object ListEngineConnAction {
+ def newBuilder(): Builder = new Builder
+
+ class Builder private[ListEngineConnAction] () {
+
+ private var user: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def build(): ListEngineConnAction = {
+ if (user == null) throw new UJESClientBuilderException("user is needed!")
+ val listEngineConnAction = new ListEngineConnAction
+ listEngineConnAction.setUser(user)
+ listEngineConnAction
+ }
+
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/GetEngineConnResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/GetEngineConnResult.scala
index e964cd7..b20923d 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/GetEngineConnResult.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/GetEngineConnResult.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.computation.client.once.result
+import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
import java.util
@@ -32,4 +33,41 @@
def getNodeInfo: util.Map[String, Any] = engineConnNode
+ protected def getAs[T](map: util.Map[String, Any], key: String): T =
+ map.get(key).asInstanceOf[T]
+
+ def getTicketId(): String = getAs(engineConnNode, "ticketId")
+
+ def getServiceInstance(): ServiceInstance =
+ engineConnNode.get("serviceInstance") match {
+ case serviceInstance: util.Map[String, Any] =>
+ ServiceInstance(
+ getAs(serviceInstance, "applicationName"),
+ getAs(serviceInstance, "instance")
+ )
+ case _ => null
+ }
+
+ def getNodeStatus(): String = getAs(engineConnNode, "nodeStatus")
+
+ def getECMServiceInstance(): ServiceInstance =
+ engineConnNode.get("ecmServiceInstance") match {
+ case serviceInstance: util.Map[String, Any] =>
+ ServiceInstance(
+ getAs(serviceInstance, "applicationName"),
+ getAs(serviceInstance, "instance")
+ )
+ case _ => null
+ }
+
+ def getManagerServiceInstance(): ServiceInstance =
+ engineConnNode.get("managerServiceInstance") match {
+ case serviceInstance: util.Map[String, Any] =>
+ ServiceInstance(
+ getAs(serviceInstance, "applicationName"),
+ getAs(serviceInstance, "instance")
+ )
+ case _ => null
+ }
+
}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ListEngineConnResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ListEngineConnResult.scala
new file mode 100644
index 0000000..c31ccf4
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/ListEngineConnResult.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.result
+
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+
+import java.util
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/listUserEngines")
+class ListEngineConnResult extends LinkisManagerResult {
+
+ private var engines: util.List[util.Map[String, AnyRef]] = _
+
+ def setEngines(engines: util.List[util.Map[String, AnyRef]]): Unit = {
+ this.engines = engines
+ }
+
+ def getEngines: util.List[util.Map[String, AnyRef]] = engines
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/LinkisFSClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/LinkisFSClient.scala
new file mode 100644
index 0000000..3e7f675
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/LinkisFSClient.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client
+
+import org.apache.linkis.ujes.client.request.{
+ CreateNewDirAction,
+ IsPathExistAction,
+ UploadFileAction
+}
+import org.apache.linkis.ujes.client.response.{
+ CreateNewDirResult,
+ IsPathExistResult,
+ UploadFileResult
+}
+
+class LinkisFSClient(client: UJESClient) {
+
+ def isPathExist(isPathExistAction: IsPathExistAction): Boolean = {
+ val result = client.executeUJESJob(isPathExistAction).asInstanceOf[IsPathExistResult]
+ result.isExist
+ }
+
+ def createNewDir(makeDirAction: CreateNewDirAction): CreateNewDirResult = {
+ client.executeUJESJob(makeDirAction).asInstanceOf[CreateNewDirResult]
+ }
+
+ def upload(uploadFileAction: UploadFileAction): UploadFileResult = {
+ client.executeUJESJob(uploadFileAction).asInstanceOf[UploadFileResult]
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
index 6431c47..6657b7e 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.ujes.client
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.httpclient.authentication.AuthenticationStrategy
import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
@@ -24,11 +25,13 @@
import org.apache.linkis.ujes.client.request._
import org.apache.linkis.ujes.client.request.JobExecIdAction.JobServiceType
import org.apache.linkis.ujes.client.response._
+import org.apache.linkis.ujes.client.utils.UJESClientUtils
import java.io.Closeable
+import java.util
import java.util.concurrent.TimeUnit
-abstract class UJESClient extends Closeable {
+abstract class UJESClient extends Closeable with Logging {
def execute(jobExecuteAction: JobExecuteAction): JobExecuteResult = executeUJESJob(
jobExecuteAction
@@ -37,7 +40,7 @@
def submit(jobSubmitAction: JobSubmitAction): JobSubmitResult =
executeUJESJob(jobSubmitAction).asInstanceOf[JobSubmitResult]
- protected[client] def executeUJESJob(ujesJobAction: UJESJobAction): Result
+ def executeUJESJob(ujesJobAction: UJESJobAction): Result
private def executeJobExecIdAction[T](
jobExecuteResult: JobExecuteResult,
@@ -52,12 +55,34 @@
executeUJESJob(jobExecIdAction).asInstanceOf[T]
}
+ /**
+ * only get the status of the cache Task status should be based on getJobInfo
+ * @param jobExecuteResult
+ * @return
+ */
def status(jobExecuteResult: JobExecuteResult): JobStatusResult =
executeJobExecIdAction(jobExecuteResult, JobServiceType.JobStatus)
+ /**
+ * IF exception return null progress result
+ * @param jobExecuteResult
+ * @return
+ */
def progress(jobExecuteResult: JobExecuteResult): JobProgressResult =
- executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)
+ Utils.tryCatch(executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)) { t =>
+ logger.warn("Failed to get progress, return empty progress.", t)
+ val result = new JobProgressResult
+ result.setProgress(0)
+ result
+ }
+ /**
+ * If exception return null log
+ * @param jobExecuteResult
+ * @param fromLine
+ * @param size
+ * @return
+ */
def log(jobExecuteResult: JobExecuteResult, fromLine: Int, size: Int): JobLogResult = {
val jobLogAction = JobLogAction
.builder()
@@ -66,13 +91,19 @@
.setFromLine(fromLine)
.setSize(size)
.build()
- executeUJESJob(jobLogAction).asInstanceOf[JobLogResult]
+
+ Utils.tryCatch(executeUJESJob(jobLogAction).asInstanceOf[JobLogResult]) { t =>
+ logger.warn("Failed to get Log, return empty log.", t)
+ null
+ }
}
- def list(jobListAction: JobListAction): JobListResult = {
- executeUJESJob(jobListAction).asInstanceOf[JobListResult]
- }
-
+ /**
+ * If exception return null log
+ * @param jobExecuteResult
+ * @param jobLogResult
+ * @return
+ */
def log(jobExecuteResult: JobExecuteResult, jobLogResult: JobLogResult): JobLogResult = {
val jobLogAction = JobLogAction
.builder()
@@ -80,13 +111,21 @@
.setUser(jobExecuteResult.getUser)
.setFromLine(jobLogResult.getFromLine)
.build()
- executeUJESJob(jobLogAction).asInstanceOf[JobLogResult]
+
+ Utils.tryCatch(executeUJESJob(jobLogAction).asInstanceOf[JobLogResult]) { t =>
+ logger.warn("Failed to get Log, return empty log.", t)
+ null
+ }
}
def openLog(openLogAction: OpenLogAction): OpenLogResult = {
executeUJESJob(openLogAction).asInstanceOf[OpenLogResult]
}
+ def list(jobListAction: JobListAction): JobListResult = {
+ executeUJESJob(jobListAction).asInstanceOf[JobListResult]
+ }
+
def kill(jobExecuteResult: JobExecuteResult): JobKillResult =
executeJobExecIdAction(jobExecuteResult, JobServiceType.JobKill)
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClientImpl.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClientImpl.scala
index b173f53..0feabaa 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClientImpl.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClientImpl.scala
@@ -26,7 +26,7 @@
class UJESClientImpl(clientConfig: DWSClientConfig) extends UJESClient {
private val dwsHttpClient = new DWSHttpClient(clientConfig, "Linkis-Job-Execution-Thread")
- override protected[client] def executeUJESJob(ujesJobAction: UJESJobAction): Result =
+ override def executeUJESJob(ujesJobAction: UJESJobAction): Result =
ujesJobAction match {
case action: Action => dwsHttpClient.execute(action)
}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/CreateNewDirAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/CreateNewDirAction.scala
new file mode 100644
index 0000000..561bfc0
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/CreateNewDirAction.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.request
+
+import org.apache.linkis.httpclient.dws.DWSHttpClient
+import org.apache.linkis.httpclient.request.POSTAction
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
+
+class CreateNewDirAction extends POSTAction with UJESJobAction {
+
+ override def suffixURLs: Array[String] = Array("filesystem", "createNewDir")
+
+ override def getRequestPayload: String =
+ DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
+
+}
+
+object CreateNewDirAction {
+ def builder(): Builder = new Builder
+
+ class Builder private[CreateNewDirAction] () {
+ private var user: String = _
+ private var path: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setPath(path: String): Builder = {
+ this.path = path
+ this
+ }
+
+ def build(): CreateNewDirAction = {
+ val makeDirAction = new CreateNewDirAction
+ if (user == null) throw new UJESClientBuilderException("user is needed!")
+ if (path == null) throw new UJESClientBuilderException("path is needed!")
+ makeDirAction.setUser(user)
+ makeDirAction.addRequestPayload("path", path)
+ makeDirAction
+ }
+
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/IsPathExistAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/IsPathExistAction.scala
new file mode 100644
index 0000000..e9e74ed
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/IsPathExistAction.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.request
+
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
+
+class IsPathExistAction extends GetAction with UJESJobAction {
+
+ override def suffixURLs: Array[String] = Array("filesystem", "isExist")
+}
+
+object IsPathExistAction {
+ def builder(): Builder = new Builder
+
+ class Builder private[IsPathExistAction] () {
+ private var user: String = _
+ private var path: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setPath(path: String): Builder = {
+ this.path = path
+ this
+ }
+
+ def build(): IsPathExistAction = {
+ val isPathExistAction = new IsPathExistAction
+ if (user == null) throw new UJESClientBuilderException("user is needed!")
+ if (path == null) throw new UJESClientBuilderException("path is needed!")
+ isPathExistAction.setUser(user)
+ isPathExistAction.setParameter("path", path)
+ isPathExistAction
+ }
+
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
index 9eb7486..45f0e8a 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
@@ -38,6 +38,8 @@
// default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL
private var nullValue: String = "LINKIS_NULL"
+ private var enableLimit: Boolean = false
+
def setUser(user: String): Builder = {
this.user = user
this
@@ -68,6 +70,11 @@
this
}
+ def setEnableLimit(enableLimit: Boolean): Builder = {
+ this.enableLimit = enableLimit
+ this
+ }
+
def build(): ResultSetAction = {
if (user == null) throw new UJESClientBuilderException("user is needed!")
if (path == null) throw new UJESClientBuilderException("path is needed!")
@@ -76,6 +83,7 @@
if (page > 0) resultSetAction.setParameter("page", page)
if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize)
resultSetAction.setParameter("charset", charset)
+ resultSetAction.setParameter("enableLimit", enableLimit)
resultSetAction.setParameter("nullValue", nullValue)
resultSetAction.setUser(user)
resultSetAction
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/UploadFileAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/UploadFileAction.scala
new file mode 100644
index 0000000..4248a9c
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/UploadFileAction.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.request
+
+import org.apache.linkis.httpclient.request.{BinaryBody, GetAction, UploadAction}
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
+
+import org.apache.http.entity.ContentType
+
+import java.io.{File, FileInputStream}
+import java.util
+
+import scala.collection.JavaConverters._
+
+class UploadFileAction extends GetAction with UploadAction with UJESJobAction {
+ override def suffixURLs: Array[String] = Array("filesystem", "upload")
+
+ override val files: util.Map[String, String] = new util.HashMap[String, String]()
+
+ override val binaryBodies: util.List[BinaryBody] = new util.ArrayList[BinaryBody](0)
+
+}
+
+object UploadFileAction {
+ def builder(): Builder = new Builder
+
+ class Builder private[UploadFileAction] {
+ private var user: String = _
+ private var path: String = _
+ private var uploadFiles: util.List[File] = new util.ArrayList[File](0)
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setPath(path: String): Builder = {
+ this.path = path
+ this
+ }
+
+ def addFile(file: File): Builder = {
+ this.uploadFiles.add(file)
+ this
+ }
+
+ def build(): UploadFileAction = {
+ val uploadFileAction = new UploadFileAction
+ if (user == null) throw new UJESClientBuilderException("user is needed!")
+ if (path == null) throw new UJESClientBuilderException("path is needed!")
+
+ uploadFileAction.setUser(user)
+ uploadFileAction.setParameter("path", path)
+ uploadFiles.asScala.foreach { file =>
+ println(String.format("=============== upload file ========== %s ", file.getAbsolutePath))
+ uploadFileAction.binaryBodies.add(
+ BinaryBody
+ .apply("file", new FileInputStream(file), file.getName, ContentType.MULTIPART_FORM_DATA)
+ )
+ }
+
+ uploadFileAction
+ }
+
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/CreateNewDirResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/CreateNewDirResult.scala
new file mode 100644
index 0000000..0871f40
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/CreateNewDirResult.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.response
+
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
+import org.apache.linkis.ujes.client.request.UserAction
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/filesystem/createNewDir")
+class CreateNewDirResult extends DWSResult with UserAction {}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/IsPathExistResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/IsPathExistResult.scala
new file mode 100644
index 0000000..c87cd7d
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/IsPathExistResult.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.response
+
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
+import org.apache.linkis.ujes.client.request.UserAction
+
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/filesystem/isExist")
+class IsPathExistResult extends DWSResult with UserAction {
+ @BeanProperty var isExist: Boolean = _
+}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala
index 973573f..9051748 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/ResultSetResult.scala
@@ -20,6 +20,9 @@
import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
import org.apache.linkis.httpclient.dws.response.DWSResult
import org.apache.linkis.ujes.client.request.UserAction
+import org.apache.linkis.ujes.client.utils.UJESClientUtils.evaluate
+
+import java.util
import scala.beans.BeanProperty
@@ -28,6 +31,31 @@
private var `type`: String = _
+ private var metadataList: util.List[util.Map[String, String]] = _
+
+ private var fileContentList: util.List[util.ArrayList[_]] = _
+
+ def getMetadataList: util.List[util.Map[String, String]] = {
+ metadata.asInstanceOf[util.List[util.Map[String, String]]]
+ }
+
+ def getRowList: util.List[util.ArrayList[Any]] = {
+ val metaData = metadata.asInstanceOf[util.List[util.Map[String, String]]]
+ val fileContentList = fileContent.asInstanceOf[util.List[util.ArrayList[Any]]]
+ for (metaDataColnum <- 1 to metaData.size()) {
+ val col = metaData.get(metaDataColnum - 1)
+ if (!col.get("dataType").equals("string")) {
+ for (cursor <- 1 to fileContentList.size()) {
+ val colDataList = fileContentList.get(cursor - 1)
+ var colData = colDataList.get(metaDataColnum - 1)
+ colData = evaluate(col.get("dataType"), colData.toString)
+ colDataList.set(metaDataColnum - 1, colData)
+ }
+ }
+ }
+ fileContentList
+ }
+
def setType(`type`: String): Unit = this.`type` = `type`
def getType: String = `type`
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/UploadFileResult.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/UploadFileResult.scala
new file mode 100644
index 0000000..837399f
--- /dev/null
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/response/UploadFileResult.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ujes.client.response
+
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
+import org.apache.linkis.ujes.client.request.UserAction
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/filesystem/upload")
+class UploadFileResult extends DWSResult with UserAction {}
diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
index 9615a89..28f4b46 100644
--- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
+++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
@@ -17,8 +17,14 @@
package org.apache.linkis.ujes.client.utils
+import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType}
+import java.util
+import java.util.Locale
+
+import com.google.gson.{Gson, JsonObject}
+
object UJESClientUtils {
def toEngineType(engineType: String): EngineType = engineType match {
@@ -48,4 +54,33 @@
case _ => EngineType.SPARK.SQL
}
+ def evaluate(dataType: String, value: String): Any = {
+ if (value == null || value.equals("null") || value.equals("NULL") || value.equals("Null")) {
+ dataType.toLowerCase(Locale.getDefault) match {
+ case "string" | "char" | "varchar" | "nvarchar" => value
+ case _ => null
+ }
+ } else {
+ dataType.toLowerCase(Locale.getDefault) match {
+ case null => throw new UJESClientBuilderException("data is empty")
+ case "char" | "varchar" | "nvarchar" | "string" => value
+ case "short" => value.toShort
+ case "int" => value.toInt
+ case "long" => value.toLong
+ case "float" => value.toFloat
+ case "double" => value.toDouble
+ case "boolean" => value.toBoolean
+ case "byte" => value.toByte
+ case "timestamp" => value
+ case "date" => value
+ case "bigint" => value.toLong
+ case "decimal" => value.toDouble
+ case "array" => new Gson().fromJson(value, classOf[util.ArrayList[Object]])
+ case "map" => new Gson().fromJson(value, classOf[util.HashMap[Object, Object]])
+ case "struct" => new Gson().fromJson(value, classOf[JsonObject])
+ case _ => value
+ }
+ }
+ }
+
}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
index a94eadf..c418201 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
@@ -74,4 +74,8 @@
val EC_OPERATE_STATUS = "status"
val YARN_APP_RESULT_LIST_KEY = "yarnAppResultList"
+
+ val HIVE_OPTS = "HIVE_OPTS"
+
+ val SPARK_SUBMIT_OPTS = "SPARK_SUBMIT_OPTS"
}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
index 8741c42..9f2879d 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/job/JobRequestConstants.scala
@@ -34,4 +34,8 @@
val JOB_DETAIL_LIST = "jobDetailList"
+ val JOB_SOURCE_TAGS = "job.source.tags"
+
+ val LINKIS_JDBC_DEFAULT_DB = "linkis.jdbc.default.db"
+
}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/AcrossClusterConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/AcrossClusterConf.scala
new file mode 100644
index 0000000..43d3c86
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/AcrossClusterConf.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.protocol.conf
+
+import org.apache.linkis.protocol.message.RequestProtocol
+
+trait AcrossClusterConf extends RequestProtocol
+
+case class AcrossClusterRequest(username: String) extends AcrossClusterConf
+
+case class AcrossClusterResponse(clusterName: String, queueName: String)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/DepartmentConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/DepartmentConf.scala
new file mode 100644
index 0000000..dbfe3f7
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/DepartmentConf.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.protocol.conf
+
+import org.apache.linkis.protocol.message.RequestProtocol
+
+trait DepartmentConf extends RequestProtocol
+
+case class DepartmentRequest(user: String) extends DepartmentConf
+
+case class DepartmentResponse(user: String, departmentId: String, departmentName: String)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/TenantConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/TenantConf.scala
index e8d1294..948501e 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/TenantConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/conf/TenantConf.scala
@@ -23,4 +23,13 @@
case class TenantRequest(user: String, creator: String) extends TenantConf
-case class TenantResponse(user: String, creator: String, tenant: String)
+case class TenantResponse(user: String, creator: String, isValid: String, tenant: String)
+
+case class DepartTenantRequest(creator: String, departmentId: String) extends TenantConf
+
+case class DepartTenantResponse(
+ creator: String,
+ departmentId: String,
+ isValid: String,
+ tenant: String
+)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
index d328ebb..8c6522c 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
@@ -19,8 +19,6 @@
import org.apache.linkis.governance.common.constant.job.JobRequestConstants
-import org.apache.commons.collections.MapUtils
-
import java.util;
object JobUtils {
@@ -36,7 +34,33 @@
}
def getJobIdFromStringMap(map: util.Map[String, String]): String = {
- if (MapUtils.isNotEmpty(map)) map.getOrDefault(JobRequestConstants.JOB_ID, null) else null
+ if (null != map && map.containsKey(JobRequestConstants.JOB_ID)) {
+ val value = map.get(JobRequestConstants.JOB_ID)
+ if (null != value) {
+ return value
+ }
+ }
+ null
+ }
+
+ def getJobSourceTagsFromStringMap(map: util.Map[String, String]): String = {
+ if (null != map && map.containsKey(JobRequestConstants.JOB_SOURCE_TAGS)) {
+ val value = map.get(JobRequestConstants.JOB_SOURCE_TAGS)
+ if (null != value) {
+ return value
+ }
+ }
+ null
+ }
+
+ def getJobSourceTagsFromObjectMap(map: util.Map[String, Object]): String = {
+ if (null != map && map.containsKey(JobRequestConstants.JOB_SOURCE_TAGS)) {
+ val value = map.get(JobRequestConstants.JOB_SOURCE_TAGS)
+ if (null != value) {
+ return value.toString
+ }
+ }
+ null
}
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
index 8e5f9ad..4c02bff 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/conf/ECMConfiguration.scala
@@ -58,10 +58,9 @@
val ECM_MAX_CREATE_INSTANCES: Int =
CommonVars[Integer]("wds.linkis.ecm.engineconn.instances.max", 50).getValue
- val ECM_PROTECTED_MEMORY: Long = CommonVars[Long](
- "wds.linkis.ecm.protected.memory",
- ByteTimeUtils.byteStringAsBytes("4g")
- ).getValue
+ val ECM_PROTECTED_MEMORY: Long = ByteTimeUtils.byteStringAsBytes(
+ CommonVars[String]("wds.linkis.ecm.protected.memory", "10g").getValue
+ )
val ECM_PROTECTED_CPU_LOAD: Double =
CommonVars[Double]("wds.linkis.ecm.protected.cpu.load", 0.98d).getValue
@@ -81,7 +80,7 @@
GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue
val ECM_HEALTH_REPORT_PERIOD: Long =
- CommonVars("wds.linkis.ecm.health.report.period", 30).getValue
+ CommonVars("wds.linkis.ecm.health.report.period", 10).getValue
val ECM_HEALTH_REPORT_DELAY: Long =
CommonVars("wds.linkis.ecm.health.report.delay", 10).getValue
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
index 91f31e5..764a704 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
@@ -17,7 +17,6 @@
package org.apache.linkis.ecm.server.service.impl
-import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
index 08addb9..2a50b40 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/util/ECMUtils.scala
@@ -102,8 +102,7 @@
// if enable estimate actual memory
if (ECM_STIMATE_ACTUAL_MEMORY_ENABLE) {
- // 90%
- val totalByte = (HardwareUtils.getMaxMemory() * 0.9).asInstanceOf[Long]
+ val totalByte = HardwareUtils.getMaxMemory()
val resultMemory = math.max(totalByte, ECM_PROTECTED_MEMORY)
// max of PhysicalMemory or ECM_PROTECTED_MEMORY
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
index a84f581..21d28e2 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
@@ -55,7 +55,7 @@
@Override
public void afterPropertiesSet() throws Exception {
- if ((Boolean) AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue()) {
+ if ((Boolean) (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue())) {
Utils.defaultScheduler()
.scheduleAtFixedRate(
this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(), TimeUnit.MILLISECONDS);
@@ -77,7 +77,7 @@
}
}
if (null == concurrentExecutor) {
- LOG.warn("shell executor can not is null");
+ LOG.warn("Executor can not is null");
return;
}
isAvailable = true;
@@ -96,7 +96,7 @@
} else {
if (concurrentExecutor.isIdle())
synchronized (EXECUTOR_STATUS_LOCKER) {
- LOG.info("monitor turn to executor status from busy to unlock");
+ LOG.info("monitor turn to executor status from unlock to busy");
concurrentExecutor.transition(NodeStatus.Busy);
}
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/executor/exception/HookExecuteException.java b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/executor/exception/HookExecuteException.java
new file mode 100644
index 0000000..4d1fbbf
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/executor/exception/HookExecuteException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.computation.executor.exception;
+
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
+
+public class HookExecuteException extends LinkisRuntimeException {
+
+ public HookExecuteException(int errCode, String desc) {
+ super(errCode, desc);
+ }
+
+ public HookExecuteException(int errCode, String desc, String ip, int port, String serviceKind) {
+ super(errCode, desc, ip, port, serviceKind);
+ }
+
+ @Override
+ public ExceptionLevel getLevel() {
+ return ExceptionLevel.ERROR;
+ }
+}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
index 43f7ab4..513eac0 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
@@ -84,6 +84,10 @@
val SEATUNNEL_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX =
CommonVars("wds.linkis.seatunnel.engine.yarn.app.id.parse.regex", "(application_\\d{13}_\\d+)")
+ val JOB_YARN_TASK_URL = CommonVars("linkis.job.task.yarn.url", "");
+
+ val JOB_YARN_CLUSTER_TASK_URL = CommonVars("linkis.job.task.yarn.cluster.url", "");
+
def getWorkHome: String = System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue)
def getEngineTmpDir: String = System.getenv(ENGINE_CONN_LOCAL_TMP_DIR.getValue)
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConstant.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConstant.scala
index e74f5b7..cdc1459 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConstant.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConstant.scala
@@ -20,4 +20,12 @@
object EngineConnConstant {
val MAX_TASK_NUM = 10000
+
+ val SPRING_CONF_MAP_NAME = "SpringConfMap"
+
+ val MAX_EXECUTOR_ID_NAME = "MaxExecutorId"
+
+ var hiveLogReg = "The url to track the job: http://.*?/proxy/(application_[0-9]+_[0-9]+)/"
+
+ val YARN_LOG_URL = "INFO yarn application url:"
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/hook/ShutdownHook.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/hook/ShutdownHook.scala
index 86ab8a1..524f44c 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/hook/ShutdownHook.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/hook/ShutdownHook.scala
@@ -20,6 +20,7 @@
import org.apache.linkis.common.utils.Logging
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
class ShutdownHook extends Logging {
@@ -35,6 +36,10 @@
// Guarded by "lock"
private var stopped: Boolean = false
+ private val tryStopTimes = new AtomicInteger(0)
+
+ private val maxTimes = 10;
+
def notifyError(e: Throwable): Unit = {
lock.lock()
try {
@@ -49,12 +54,17 @@
def notifyStop(): Unit = {
lock.lock()
+ val num = tryStopTimes.incrementAndGet()
try {
setExitCode(0)
stopped = true
condition.signalAll()
} finally {
lock.unlock()
+ if (num >= maxTimes) {
+ logger.error(s"try to stop with times:${num}, now do system exit!!!")
+ System.exit(0)
+ }
}
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/log/SendAppender.java b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/log/SendAppender.java
index cde56ca..b274642 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/log/SendAppender.java
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/log/SendAppender.java
@@ -18,9 +18,12 @@
package org.apache.linkis.engineconn.acessible.executor.log;
import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant;
import org.apache.linkis.engineconn.executor.listener.EngineConnSyncListenerBus;
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
@@ -34,6 +37,8 @@
import org.apache.logging.log4j.core.layout.PatternLayout;
import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +96,7 @@
}
}
if (!flag) {
+ logStr = matchLog(logStr);
logCache.cacheLog(logStr);
}
} else {
@@ -113,4 +119,17 @@
}
return new SendAppender(name, filter, layout, ignoreExceptions);
}
+
+ public String matchLog(String logLine) {
+ String yarnUrl = EngineConnConf.JOB_YARN_TASK_URL().getValue();
+ if (StringUtils.isNotBlank(yarnUrl)) {
+ Matcher hiveMatcher = Pattern.compile(EngineConnConstant.hiveLogReg()).matcher(logLine);
+ if (hiveMatcher.find()) {
+ logLine =
+ hiveMatcher.replaceAll(
+ EngineConnConstant.YARN_LOG_URL() + yarnUrl + hiveMatcher.group(1));
+ }
+ }
+ return logLine;
+ }
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index 40cf314..0cebf5e 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -37,11 +37,14 @@
val ENGINECONN_LOG_SEND_SIZE = CommonVars[Int]("wds.linkis.engineconn.log.send.cache.size", 300)
val ENGINECONN_MAX_FREE_TIME =
- CommonVars("wds.linkis.engineconn.max.free.time", new TimeType("10m"))
+ CommonVars("wds.linkis.engineconn.max.free.time", new TimeType("5m"))
val ENGINECONN_LOCK_CHECK_INTERVAL =
CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
+ val ENGINECONN_ENABLED_LOCK_IDLE_TIME_OUT =
+ CommonVars("linkis.engineconn.enabled.lock.timeout.release", true)
+
val ENGINECONN_SUPPORT_PARALLELISM =
CommonVars("wds.linkis.engineconn.support.parallelism", false)
@@ -67,4 +70,10 @@
"Heartbeat status report repeated ignore, default 3ms,Negative numbers do not take effect"
).getValue
+ val ENGINECONN_AUTO_EXIT =
+ CommonVars("linkis.engineconn.support.auto.exit", false).getValue
+
+ val ENGINECONN_AUTO_EXIT_DAYS =
+ CommonVars("linkis.engineconn.auto.exit.days", 7).getValue
+
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 0bd7ece..6e6b8b5 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -20,6 +20,7 @@
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
+import org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.execution.EngineConnExecution
@@ -40,6 +41,7 @@
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
+import java.util.Random
import java.util.concurrent.TimeUnit
class AccessibleEngineConnExecution extends EngineConnExecution with Logging {
@@ -73,6 +75,9 @@
reportUsedResource(executor, engineCreationContext)
reportLabel(executor)
executorStatusChecker
+ if (AccessibleExecutorConfiguration.ENGINECONN_AUTO_EXIT) {
+ ecAutoExit()
+ }
afterReportToLinkisManager(executor, engineCreationContext, engineConn)
}
@@ -140,6 +145,39 @@
)
}
+ /**
+ * EC auto exit only support concurrent executor
+ */
+ private def ecAutoExit(): Unit = {
+ logger.info(s"ec auto exit start ${System.currentTimeMillis()}")
+ Utils.defaultScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = Utils.tryAndWarn {
+ ExecutorManager.getInstance.getReportExecutor match {
+ case executor: ConcurrentExecutor =>
+ val rand = new Random
+ val minute = rand.nextInt(5) + 1
+ Thread.sleep(minute * 60000L)
+ if (executor.hasTaskRunning()) {
+ ExecutorHeartbeatServiceHolder
+ .getDefaultHeartbeatService()
+ .setSelfUnhealthy(s"EC running time exceed max time")
+ } else {
+ logger.warn(
+ s"Executor has no task running ${executor.getId}, will be to shutdown ec"
+ )
+ executor.tryShutdown()
+ }
+ case _ =>
+ logger.warn(s"Executor is not a ConcurrentExecutor, do noting")
+ }
+ }
+ },
+ AccessibleExecutorConfiguration.ENGINECONN_AUTO_EXIT_DAYS,
+ TimeUnit.DAYS
+ )
+ }
+
def requestManagerReleaseExecutor(msg: String, nodeStatus: NodeStatus): Unit = {
val engineReleaseRequest = new EngineConnReleaseRequest(
Sender.getThisServiceInstance,
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/info/NodeHealthyInfoManager.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/info/NodeHealthyInfoManager.scala
index 4365d58..98f8850 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/info/NodeHealthyInfoManager.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/info/NodeHealthyInfoManager.scala
@@ -20,7 +20,7 @@
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.core.executor.ExecutorManager
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.manager.common.entity.enumeration.{NodeHealthy, NodeStatus}
import org.apache.linkis.manager.common.entity.metrics.NodeHealthyInfo
import org.springframework.stereotype.Component
@@ -29,20 +29,45 @@
def getNodeHealthyInfo(): NodeHealthyInfo
+ def setNodeHealthy(healthy: NodeHealthy): Unit
+
+ def getNodeHealthy(): NodeHealthy
+
+ def setByManager(setByManager: Boolean): Unit
+
}
@Component
class DefaultNodeHealthyInfoManager extends NodeHealthyInfoManager with Logging {
+ private var healthy: NodeHealthy = NodeHealthy.Healthy
+
+ private var setByManager: Boolean = false
+
override def getNodeHealthyInfo(): NodeHealthyInfo = {
val nodeHealthyInfo = new NodeHealthyInfo
nodeHealthyInfo.setMsg("")
- nodeHealthyInfo.setNodeHealthy(
+
+ /** If it is actively set by the manager, then the manager setting shall prevail */
+ val newHealthy: NodeHealthy = if (this.setByManager) {
+ this.healthy
+ } else {
NodeStatus.isEngineNodeHealthy(
ExecutorManager.getInstance.getReportExecutor.asInstanceOf[AccessibleExecutor].getStatus
)
- )
+ }
+ logger.info("current node healthy status is {}", newHealthy)
+ nodeHealthyInfo.setNodeHealthy(newHealthy)
nodeHealthyInfo
}
+ override def setNodeHealthy(healthy: NodeHealthy): Unit = {
+ this.healthy = healthy
+ }
+
+ override def setByManager(setByManager: Boolean): Unit = {
+ this.setByManager = setByManager
+ }
+
+ override def getNodeHealthy(): NodeHealthy = this.healthy
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
index af4d1eb..bb39545 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/lock/EngineConnTimedLock.scala
@@ -27,6 +27,7 @@
ExecutorStatusChangedEvent,
ExecutorUnLockEvent
}
+import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.SensibleExecutor
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
@@ -44,6 +45,10 @@
var releaseTask: ScheduledFuture[_] = null
var lastLockTime: Long = 0
+ val idleTimeLockOut = AccessibleExecutorConfiguration.ENGINECONN_LOCK_CHECK_INTERVAL
+ .getValue(EngineConnObject.getEngineCreationContext.getOptions)
+ .toLong
+
override def acquire(executor: AccessibleExecutor): Unit = {
lock.acquire()
lastLockTime = System.currentTimeMillis()
@@ -105,7 +110,9 @@
isAcquired() && NodeStatus.Idle == reportExecutor.getStatus && isExpired()
) {
// unlockCallback depends on lockedBy, so lockedBy cannot be set null before unlockCallback
- logger.info(s"Lock : [${lock.toString} was released due to timeout.")
+ logger.info(
+ s"Lock : [${lock.toString} was released due to timeout. idleTimeLockOut $idleTimeLockOut"
+ )
release()
} else if (isAcquired() && NodeStatus.Busy == reportExecutor.getStatus) {
lastLockTime = System.currentTimeMillis()
@@ -116,7 +123,7 @@
}
},
3000,
- AccessibleExecutorConfiguration.ENGINECONN_LOCK_CHECK_INTERVAL.getValue.toLong,
+ idleTimeLockOut,
TimeUnit.MILLISECONDS
)
logger.info("Add scheduled timeout task.")
@@ -131,7 +138,11 @@
override def isExpired(): Boolean = {
if (lastLockTime == 0) return false
if (timeout <= 0) return false
- System.currentTimeMillis() - lastLockTime > timeout
+ if (AccessibleExecutorConfiguration.ENGINECONN_ENABLED_LOCK_IDLE_TIME_OUT.getValue) {
+ System.currentTimeMillis() - lastLockTime > idleTimeLockOut
+ } else {
+ System.currentTimeMillis() - lastLockTime > timeout
+ }
}
override def numOfPending(): Int = {
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
index 8ef944f..97a9cab 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
@@ -113,7 +113,6 @@
logger.info("Reported status shuttingDown to manager.")
Utils.tryQuietly(Thread.sleep(2000))
shutDownHooked = true
- ShutdownHook.getShutdownHook.notifyStop()
}
override def stopExecutor: Unit = {
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
index ea3248b..ff8e666 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
@@ -31,8 +31,13 @@
import org.apache.linkis.engineconn.executor.entity.{Executor, ResourceExecutor, SensibleExecutor}
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.service.ManagerService
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.protocol.node.{NodeHeartbeatMsg, NodeHeartbeatRequest}
+import org.apache.linkis.manager.common.entity.enumeration.{NodeHealthy, NodeStatus}
+import org.apache.linkis.manager.common.entity.metrics.NodeHealthyInfo
+import org.apache.linkis.manager.common.protocol.node.{
+ NodeHealthyRequest,
+ NodeHeartbeatMsg,
+ NodeHeartbeatRequest
+}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.message.annotation.Receiver
@@ -61,6 +66,8 @@
private val asyncListenerBusContext =
ExecutorListenerBusContext.getExecutorListenerBusContext.getEngineConnAsyncListenerBus
+ private val healthyLock = new Object()
+
@PostConstruct
private def init(): Unit = {
asyncListenerBusContext.addListener(this)
@@ -95,6 +102,16 @@
nodeHeartbeatRequest: NodeHeartbeatRequest
): NodeHeartbeatMsg = generateHeartBeatMsg(null)
+ @Receiver
+ def dealNodeHealthyRequest(nodeHealthyRequest: NodeHealthyRequest): Unit =
+ healthyLock synchronized {
+ val toHealthy = nodeHealthyRequest.getNodeHealthy
+ val healthyInfo: NodeHealthyInfo = nodeHealthyInfoManager.getNodeHealthyInfo()
+ logger.info(s"engine nodeHealthy from ${healthyInfo.getNodeHealthy} to ${toHealthy}")
+ nodeHealthyInfoManager.setByManager(true)
+ nodeHealthyInfoManager.setNodeHealthy(toHealthy)
+ }
+
override def onNodeHealthyUpdate(nodeHealthyUpdateEvent: NodeHealthyUpdateEvent): Unit = {
logger.warn(s"node healthy update, tiger heartbeatReport")
// val executor = ExecutorManager.getInstance.getReportExecutor
@@ -139,4 +156,15 @@
nodeHeartbeatMsg
}
+ override def setSelfUnhealthy(reason: String): Unit = healthyLock synchronized {
+ logger.info(s"Set self to unhealthy to automatically exit, reason: $reason")
+ if (EngineConnObject.isReady) {
+ val nodeHealthyInfo = nodeHealthyInfoManager.getNodeHealthyInfo()
+ if (nodeHealthyInfo.getNodeHealthy != NodeHealthy.UnHealthy) {
+ nodeHealthyInfoManager.setNodeHealthy(NodeHealthy.UnHealthy)
+ nodeHealthyInfoManager.setByManager(true)
+ }
+ }
+ }
+
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultManagerService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultManagerService.scala
index bc410c7..7d4dc3b 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultManagerService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultManagerService.scala
@@ -76,6 +76,9 @@
override def heartbeatReport(nodeHeartbeatMsg: NodeHeartbeatMsg): Unit = {
getManagerSender.send(nodeHeartbeatMsg)
+ if (nodeHeartbeatMsg != null && nodeHeartbeatMsg.getHealthyInfo != null) {
+ logger.info("report engine healthy status: {}", nodeHeartbeatMsg.getHealthyInfo)
+ }
logger.info(
"success to send engine heartbeat report to {},status: {},msg: {}",
Array(
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
index 7abcbe8..7734492 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
@@ -33,6 +33,8 @@
def dealNodeHeartbeatRequest(nodeHeartbeatRequest: NodeHeartbeatRequest): NodeHeartbeatMsg
+ def setSelfUnhealthy(reason: String): Unit
+
}
object ExecutorHeartbeatServiceHolder {
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/utils/AccessableExecutorUtils.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/utils/AccessableExecutorUtils.scala
new file mode 100644
index 0000000..27dd91c
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/utils/AccessableExecutorUtils.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.acessible.executor.utils
+
+import org.apache.linkis.DataWorkCloudApplication.getApplicationContext
+import org.apache.linkis.engineconn.acessible.executor.info.DefaultNodeHealthyInfoManager
+import org.apache.linkis.manager.common.entity.enumeration.NodeHealthy
+
+object AccessibleExecutorUtils {
+
+ val manager: DefaultNodeHealthyInfoManager =
+ getApplicationContext.getBean(classOf[DefaultNodeHealthyInfoManager])
+
+ def currentEngineIsUnHealthy(): Boolean = {
+ manager != null && manager.getNodeHealthy() == NodeHealthy.UnHealthy
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
index f3235ff..f0bae00 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala
@@ -36,10 +36,8 @@
CommonVars[String]("HADOOP_CONF_DIR", "/appcom/config/hadoop-config").getValue
)
- val ENGINE_CONN_JARS = CommonVars("wds.linkis.engineConn.jars", "", "engineConn额外的Jars")
-
val ENGINE_CONN_CLASSPATH_FILES =
- CommonVars("wds.linkis.engineConn.files", "", "engineConn额外的配置文件")
+ CommonVars("linkis.engineConn.classpath.files", "", "engineConn额外的配置文件")
val MAX_METASPACE_SIZE = CommonVars("linkis.engineconn.metaspace.size.max", "256m")
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala
index 082b02a..e461265 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala
@@ -117,49 +117,27 @@
addPathToClassPath(environment, variable(HIVE_CONF_DIR))
}
// first, add engineconn conf dirs.
- addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_CONF_DIR_NAME))
+ addPathToClassPath(environment, buildPath(Seq(variable(PWD), ENGINE_CONN_CONF_DIR_NAME)))
// then, add LINKIS_CONF_DIR conf dirs.
- addPathToClassPath(environment, Seq(EnvConfiguration.LINKIS_CONF_DIR.getValue))
+ addPathToClassPath(environment, buildPath(Seq(EnvConfiguration.LINKIS_CONF_DIR.getValue)))
// then, add engineconn libs.
- addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_LIB_DIR_NAME + "/*"))
+ addPathToClassPath(environment, buildPath(Seq(variable(PWD), ENGINE_CONN_LIB_DIR_NAME + "/*")))
// then, add public modules.
if (!enablePublicModule) {
- addPathToClassPath(environment, Seq(LINKIS_PUBLIC_MODULE_PATH.getValue + "/*"))
+ addPathToClassPath(environment, buildPath(Seq(LINKIS_PUBLIC_MODULE_PATH.getValue + "/*")))
}
// finally, add the suitable properties key to classpath
- engineConnBuildRequest.engineConnCreationDesc.properties.asScala.foreach { case (key, value) =>
- if (
- key
- .startsWith("engineconn.classpath") || key.startsWith("wds.linkis.engineconn.classpath")
- ) {
- addPathToClassPath(environment, Seq(variable(PWD), new File(value).getName))
- }
+ val taskClassPathFiles = EnvConfiguration.ENGINE_CONN_CLASSPATH_FILES.getValue(
+ engineConnBuildRequest.engineConnCreationDesc.properties
+ )
+ if (StringUtils.isNotBlank(taskClassPathFiles)) {
+ taskClassPathFiles
+ .split(",")
+ .filter(StringUtils.isNotBlank(_))
+ .foreach(file => addPathToClassPath(environment, buildPath(Seq(file))))
}
- getExtraClassPathFile.foreach { file: String =>
- addPathToClassPath(environment, Seq(variable(PWD), new File(file).getName))
- }
- engineConnBuildRequest match {
- case richer: RicherEngineConnBuildRequest =>
- def addFiles(files: String): Unit = if (StringUtils.isNotBlank(files)) {
- files
- .split(",")
- .foreach(file =>
- addPathToClassPath(environment, Seq(variable(PWD), new File(file).getName))
- )
- }
-
- val configs: util.Map[String, String] =
- richer.getStartupConfigs.asScala
- .filter(_._2.isInstanceOf[String])
- .map { case (k, v: String) =>
- k -> v
- }
- .asJava
- val jars: String = EnvConfiguration.ENGINE_CONN_JARS.getValue(configs)
- addFiles(jars)
- val files: String = EnvConfiguration.ENGINE_CONN_CLASSPATH_FILES.getValue(configs)
- addFiles(files)
- case _ =>
+ getExtraClassPathFile.filter(StringUtils.isNotBlank(_)).foreach { file: String =>
+ addPathToClassPath(environment, buildPath(Seq(new File(file).getName)))
}
environment
}
@@ -198,7 +176,7 @@
) ++: engineConnResource.getOtherBmlResources.toList
}.asJava
- private implicit def buildPath(paths: Seq[String]): String =
+ private def buildPath(paths: Seq[String]): String =
Paths.get(paths.head, paths.tail: _*).toFile.getPath
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
index d9b3382..ca19f4d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
@@ -27,8 +27,10 @@
import org.apache.linkis.entrance.log.WebSocketCacheLogReader;
import org.apache.linkis.entrance.log.WebSocketLogWriter;
import org.apache.linkis.entrance.persistence.PersistenceManager;
+import org.apache.linkis.entrance.utils.CommonLogPathUtils;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
+import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.governance.common.protocol.task.RequestTask$;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.orchestrator.plans.ast.QueryParams$;
@@ -125,11 +127,12 @@
// add resultSet path root
Map<String, String> starupMapTmp = new HashMap<>();
Map<String, Object> starupMapOri = TaskUtils.getStartupMap(getParams());
+ JobRequest jobRequest = getJobRequest();
if (starupMapOri.isEmpty()) {
TaskUtils.addStartupMap(getParams(), starupMapOri);
}
if (!starupMapOri.containsKey(JobRequestConstants.JOB_REQUEST_LIST())) {
- starupMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(getJobRequest().getId()));
+ starupMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(jobRequest.getId()));
}
for (Map.Entry<String, Object> entry : starupMapOri.entrySet()) {
if (null != entry.getKey() && null != entry.getValue()) {
@@ -142,7 +145,7 @@
runtimeMapOri = TaskUtils.getRuntimeMap(getParams());
}
if (!runtimeMapOri.containsKey(JobRequestConstants.JOB_ID())) {
- runtimeMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(getJobRequest().getId()));
+ runtimeMapOri.put(JobRequestConstants.JOB_ID(), String.valueOf(jobRequest.getId()));
}
Map<String, String> runtimeMapTmp = new HashMap<>();
for (Map.Entry<String, Object> entry : runtimeMapOri.entrySet()) {
@@ -150,13 +153,21 @@
runtimeMapTmp.put(entry.getKey(), entry.getValue().toString());
}
}
+
String resultSetPathRoot = GovernanceCommonConf.RESULT_SET_STORE_PATH().getValue(runtimeMapTmp);
+
+ if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) {
+ String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest);
+ CommonLogPathUtils.buildCommonPath(resultParentPath);
+ resultSetPathRoot = CommonLogPathUtils.getResultPath(jobRequest);
+ }
+
Map<String, Object> jobMap = new HashMap<String, Object>();
jobMap.put(RequestTask$.MODULE$.RESULT_SET_STORE_PATH(), resultSetPathRoot);
runtimeMapOri.put(QueryParams$.MODULE$.JOB_KEY(), jobMap);
-
+ jobRequest.setResultLocation(resultSetPathRoot);
EntranceExecuteRequest executeRequest = new EntranceExecuteRequest(this);
- List<Label<?>> labels = new ArrayList<Label<?>>(getJobRequest().getLabels());
+ List<Label<?>> labels = new ArrayList<Label<?>>(jobRequest.getLabels());
executeRequest.setLabels(labels);
return executeRequest;
}
@@ -224,26 +235,32 @@
: "not submit to ec";
StringBuffer sb = new StringBuffer();
- sb.append("Task creation time(任务创建时间): ")
+ sb.append("Task time point information(任务时间节点信息):\n")
+ .append("[Task creation time(任务创建时间)] :")
.append(createTime)
- .append(", Task scheduling time(任务调度时间): ")
+ .append("\n")
+ .append("[Task scheduling time(任务调度时间)]:")
.append(scheduleTime)
- .append(", Task start time(任务开始时间): ")
+ .append("\n")
+ .append("[Task start time(任务开始时间)] :")
.append(startTime)
- .append(", Mission end time(任务结束时间): ")
+ .append("\n")
+ .append("[Task end time(任务结束时间)] :")
.append(endTime)
.append("\n")
.append(LogUtils.generateInfo(""))
- .append("Task submit to Orchestrator time:")
+ .append("[Task submit to Orchestrator time]:")
.append(jobToOrchestrator)
- .append(", Task request EngineConn time:")
+ .append("\n")
+ .append("[Task request EngineConn time] :")
.append(jobRequestEC)
- .append(", Task submit to EngineConn time:")
+ .append("\n")
+ .append("[Task submit to EngineConn time] :")
.append(jobSubmitToEC)
.append("\n")
.append(
LogUtils.generateInfo(
- "Your mission(您的任务) "
+ "Your task jobId(您的任务) "
+ this.getJobRequest().getId()
+ " The total time spent is(总耗时时间为): "
+ runTime));
@@ -269,4 +286,13 @@
logger.warn("Close logWriter and logReader failed. {}", e.getMessage(), e);
}
}
+
+ @Override
+ public void clear() {
+ super.clear();
+ this.setParams(null);
+ JobRequest jobRequest = this.getJobRequest();
+ jobRequest.setExecutionCode(null);
+ jobRequest.setMetrics(null);
+ }
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index f051b05..b912b58 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -164,6 +164,7 @@
}
cliHeartbeatMonitor.unRegisterIfCliJob(job);
updateJobStatus(job);
+ job.clear();
}
private void updateJobStatus(Job job) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
index 0737e25..f755860 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java
@@ -17,12 +17,15 @@
package org.apache.linkis.entrance.restful;
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.constant.LabelValueConstant;
+import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.protocol.label.InsLabelRefreshRequest;
import org.apache.linkis.protocol.label.InsLabelRemoveRequest;
import org.apache.linkis.rpc.Sender;
@@ -30,12 +33,15 @@
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;
+import org.apache.commons.collections.CollectionUtils;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
@@ -129,7 +135,23 @@
@ApiOperation(value = "isOnline", notes = "entrance isOnline", response = Message.class)
@RequestMapping(path = "/isOnline", method = RequestMethod.GET)
public Message isOnline(HttpServletRequest req) {
- logger.info("Whether Entrance is online: {}", !offlineFlag);
- return Message.ok().data("isOnline", !offlineFlag);
+ String thisInstance = Sender.getThisInstance();
+ ServiceInstance mainInstance = DataWorkCloudApplication.getServiceInstance();
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setApplicationName(mainInstance.getApplicationName());
+ serviceInstance.setInstance(thisInstance);
+ List<Label<?>> labelFromInstance =
+ InstanceLabelClient.getInstance().getLabelFromInstance(serviceInstance);
+ boolean res = true;
+ String offline = "offline";
+ if (!CollectionUtils.isEmpty(labelFromInstance)) {
+ for (Label label : labelFromInstance) {
+ if (offline.equals(label.getValue())) {
+ res = false;
+ }
+ }
+ }
+ logger.info("Whether Entrance is online: {}", res);
+ return Message.ok().data("isOnline", res);
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index dfab300..a17d8d8 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -41,6 +41,7 @@
import org.apache.linkis.server.conf.ServerConfiguration;
import org.apache.linkis.server.security.SecurityFilter;
import org.apache.linkis.server.utils.ModuleUserUtils;
+import org.apache.linkis.utils.LinkisSpringUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@@ -100,7 +101,11 @@
@RequestMapping(path = "/execute", method = RequestMethod.POST)
public Message execute(HttpServletRequest req, @RequestBody Map<String, Object> json) {
Message message = null;
- logger.info("Begin to get an execID");
+ String operationUser = ModuleUserUtils.getOperationUser(req);
+ logger.info(
+ "Begin to get execute task for user {}, Client IP {}",
+ operationUser,
+ LinkisSpringUtils.getClientIP(req));
json.put(TaskConstant.EXECUTE_USER, ModuleUserUtils.getOperationUser(req));
json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req));
HashMap<String, String> map = (HashMap<String, String>) json.get(TaskConstant.SOURCE);
@@ -127,7 +132,7 @@
+ jobReqId
+ " in "
+ Sender.getThisServiceInstance().toString()
- + ". Please wait it to be scheduled"),
+ + ". \n Please wait it to be scheduled(您的任务已经提交,进入排队中,如果一直没有更新日志,是任务并发达到了限制,可以在ITSM提Linkis参数修改单)"),
job);
message = Message.ok();
message.setMethod("/api/entrance/execute");
@@ -143,9 +148,25 @@
@RequestMapping(path = "/submit", method = RequestMethod.POST)
public Message submit(HttpServletRequest req, @RequestBody Map<String, Object> json) {
Message message = null;
- logger.info("Begin to get an execID");
- json.put(TaskConstant.EXECUTE_USER, ModuleUserUtils.getOperationUser(req));
+ String executeUser = ModuleUserUtils.getOperationUser(req);
+ logger.info(
+ "Begin to get execute task for user {}, Client IP {}",
+ executeUser,
+ LinkisSpringUtils.getClientIP(req));
json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req));
+ String token = ModuleUserUtils.getToken(req);
+ Object tempExecuteUser = json.get(TaskConstant.EXECUTE_USER);
+ // check special admin token
+ if (StringUtils.isNotBlank(token) && tempExecuteUser != null) {
+ if (Configuration.isAdminToken(token)) {
+ logger.warn(
+ "ExecuteUser variable will be replaced by system value: {} -> {}",
+ tempExecuteUser,
+ executeUser);
+ executeUser = String.valueOf(tempExecuteUser);
+ }
+ }
+ json.put(TaskConstant.EXECUTE_USER, executeUser);
HashMap<String, String> map = (HashMap) json.get(TaskConstant.SOURCE);
if (map == null) {
map = new HashMap<>();
@@ -165,7 +186,7 @@
+ jobReqId
+ " in "
+ Sender.getThisServiceInstance().toString()
- + ". Please wait it to be scheduled"),
+ + ". \n Please wait it to be scheduled(您的任务已经提交,进入排队中,如果一直没有更新日志,是任务并发达到了限制,可以在ITSM提Linkis参数修改单)"),
job);
String execID =
ZuulEntranceUtils.generateExecID(
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 8e66758..9e09374 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -215,16 +215,19 @@
val GROUP_CACHE_EXPIRE_TIME = CommonVars("wds.linkis.consumer.group.expire.time", 50)
val CLIENT_MONITOR_CREATOR =
- CommonVars("wds.linkis.entrance.client.monitor.creator", "LINKISCLI")
+ CommonVars("wds.linkis.entrance.client.monitor.creator", "LINKISCLI,BdpClient")
val CREATOR_IP_SWITCH =
CommonVars("wds.linkis.entrance.user.creator.ip.interceptor.switch", false)
val TEMPLATE_CONF_SWITCH =
- CommonVars("linkis.entrance.template.conf.interceptor.switch", false)
+ CommonVars("wds.linkis.entrance.template.conf.interceptor.switch", false)
- val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR =
- CommonVars("linkis.entrance.auto.clean.dirty.data.enable", false)
+ val TEMPLATE_CONF_ADD_ONCE_LABEL_ENABLE =
+ CommonVars("wds.linkis.entrance.template.add.once.label.enable", false)
+
+ val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR: CommonVars[Boolean] =
+ CommonVars[Boolean]("linkis.entrance.auto.clean.dirty.data.enable", true)
val ENTRANCE_CREATOR_JOB_LIMIT: CommonVars[Int] =
CommonVars[Int](
@@ -288,4 +291,7 @@
val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR =
CommonVars("linkis.entrance.skip.orchestrator", false).getValue
+ val ENABLE_HDFS_RES_DIR_PRIVATE =
+ CommonVars[Boolean]("linkis.entrance.enable.hdfs.res.dir.private", false).getValue
+
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 24c697c..0638ef5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -291,7 +291,10 @@
val msg = s"JobRequest (${entranceExecuteRequest.jobId()}) was submitted to Orchestrator."
logger.info(msg)
entranceExecuteRequest.getJob.getLogListener.foreach(
- _.onLogUpdate(entranceExecuteRequest.getJob, LogUtils.generateInfo(msg))
+ _.onLogUpdate(
+ entranceExecuteRequest.getJob,
+ LogUtils.generateInfo(msg + "(您的任务已经提交给Orchestrator进行编排执行)")
+ )
)
if (entranceExecuteRequest.getJob.getJobRequest.getMetrics == null) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index b762f54..50efcaf 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -160,7 +160,7 @@
getLogListener.foreach(
_.onLogUpdate(
this,
- LogUtils.generateInfo("Your job is Scheduled. Please wait it to run.")
+ LogUtils.generateInfo("Your job is Scheduled. Please wait it to run.(您的任务已经调度运行中)")
)
)
case WaitForRetry =>
@@ -174,7 +174,8 @@
getLogListener.foreach(
_.onLogUpdate(
this,
- LogUtils.generateInfo("Your job is Running now. Please wait it to complete.")
+ LogUtils
+ .generateInfo("Your job is Running now. Please wait it to complete.(您的任务已经在运行中)")
)
)
getJobRequest.getMetrics.put(
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
index a40c3fa..d938647 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.entrance.interceptor.impl
import org.apache.linkis.common.conf.Configuration
+import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils, VariableUtils}
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.utils.LabelUtil
@@ -42,7 +43,11 @@
* : requestPersistTask
* @return
*/
- def replaceCustomVar(jobRequest: JobRequest, runType: String): String = {
+ def replaceCustomVar(
+ jobRequest: JobRequest,
+ runType: String,
+ logAppender: java.lang.StringBuilder
+ ): String = {
val variables: util.Map[String, String] = new util.HashMap[String, String]()
val sender =
Sender.getSender(Configuration.CLOUD_CONSOLE_VARIABLE_SPRING_APPLICATION_NAME.getValue)
@@ -63,11 +68,28 @@
}
val variableMap = TaskUtils
.getVariableMap(jobRequest.getParams)
- .asInstanceOf[util.Map[String, String]]
+ .asInstanceOf[util.HashMap[String, String]]
variables.putAll(variableMap)
- if (!variables.containsKey("user")) {
- variables.put("user", jobRequest.getExecuteUser)
+ variables.put("user", jobRequest.getExecuteUser)
+ // User customization is not supported. If the user has customized it, add a warning log and replace it
+ if (variables.containsKey("submit_user")) {
+ logAppender.append(
+ LogUtils.generateInfo(
+ "submitUser variable will be replaced by system value:" + jobRequest.getSubmitUser + " -> " + variables
+ .get("submit_user") + "\n"
+ )
+ )
}
+ if (variables.containsKey("execute_user")) {
+ logAppender.append(
+ LogUtils.generateInfo(
+ "executeUser variable will be replaced by system value:" + jobRequest.getExecuteUser + " -> " + variables
+ .get("execute_user") + "\n"
+ )
+ )
+ }
+ variables.put("execute_user", jobRequest.getExecuteUser)
+ variables.put("submit_user", jobRequest.getSubmitUser)
VariableUtils.replace(jobRequest.getExecutionCode, runType, variables)
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/StorePathEntranceInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/StorePathEntranceInterceptor.scala
index d05dce4..50b23df 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/StorePathEntranceInterceptor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/StorePathEntranceInterceptor.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.entrance.interceptor.impl
import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
@@ -44,6 +45,34 @@
* @return
*/
override def apply(jobReq: JobRequest, logAppender: java.lang.StringBuilder): JobRequest = {
+ val paramsMap = if (null != jobReq.getParams) {
+ jobReq.getParams
+ } else {
+ new util.HashMap[String, AnyRef]()
+ }
+ var runtimeMap = TaskUtils.getRuntimeMap(paramsMap)
+ if (null == runtimeMap || runtimeMap.isEmpty) {
+ runtimeMap = new util.HashMap[String, AnyRef]()
+ }
+ if (runtimeMap.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH.key)) {
+ return jobReq
+ }
+ if (EntranceConfiguration.ENABLE_HDFS_RES_DIR_PRIVATE) {
+ val parentPath = generateUserPrivateResDir(jobReq)
+ runtimeMap.put(GovernanceCommonConf.RESULT_SET_STORE_PATH.key, parentPath)
+ TaskUtils.addRuntimeMap(paramsMap, runtimeMap)
+ val params = new util.HashMap[String, AnyRef]()
+ paramsMap.asScala.foreach(kv => params.put(kv._1, kv._2))
+ jobReq.setResultLocation(parentPath)
+ jobReq.setParams(params)
+ jobReq
+ } else {
+ jobReq
+ }
+
+ }
+
+ private def generateUserPrivateResDir(jobReq: JobRequest): String = {
var parentPath: String = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
if (!parentPath.endsWith("/")) parentPath += "/"
parentPath += jobReq.getExecuteUser
@@ -61,23 +90,7 @@
// multi linkis cluster should not use same root folder , in which case result file may be overwrite
parentPath += DateFormatUtils.format(System.currentTimeMillis, "yyyy-MM-dd/HHmmss") + "/" +
userCreator._2 + "/" + jobReq.getId
- val paramsMap = if (null != jobReq.getParams) {
- jobReq.getParams
- } else {
- new util.HashMap[String, AnyRef]()
- }
-
- var runtimeMap = TaskUtils.getRuntimeMap(paramsMap)
- if (null == runtimeMap || runtimeMap.isEmpty) {
- runtimeMap = new util.HashMap[String, AnyRef]()
- }
- runtimeMap.put(GovernanceCommonConf.RESULT_SET_STORE_PATH.key, parentPath)
- TaskUtils.addRuntimeMap(paramsMap, runtimeMap)
- val params = new util.HashMap[String, AnyRef]()
- paramsMap.asScala.foreach(kv => params.put(kv._1, kv._2))
- jobReq.setResultLocation(parentPath)
- jobReq.setParams(params)
- jobReq
+ parentPath
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
index cdcbe01..99ae8b0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/TemplateConfUtils.scala
@@ -237,12 +237,14 @@
// to remove metedata start param
TaskUtils.clearStartupMap(params)
- val onceLabel =
- LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
- classOf[ExecuteOnceLabel]
- )
- logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
- requestPersistTask.getLabels.add(onceLabel)
+ if (EntranceConfiguration.TEMPLATE_CONF_ADD_ONCE_LABEL_ENABLE.getValue) {
+ val onceLabel =
+ LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
+ classOf[ExecuteOnceLabel]
+ )
+ logger.info("Add once label for task id:{}", requestPersistTask.getId.toString)
+ requestPersistTask.getLabels.add(onceLabel)
+ }
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/VarSubstitutionInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/VarSubstitutionInterceptor.scala
index 72d4030..b761e20 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/VarSubstitutionInterceptor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/VarSubstitutionInterceptor.scala
@@ -41,7 +41,7 @@
LogUtils.generateInfo("Program is substituting variables for you") + "\n"
)
val codeType = LabelUtil.getCodeType(jobRequest.getLabels)
- val realCode = CustomVariableUtils.replaceCustomVar(jobRequest, codeType)
+ val realCode = CustomVariableUtils.replaceCustomVar(jobRequest, codeType, logAppender)
jobRequest.setExecutionCode(realCode)
logAppender.append(
LogUtils.generateInfo("Variables substitution ended successfully") + "\n"
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/Cache.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/Cache.scala
index 44474ee..3c5173a 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/Cache.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/Cache.scala
@@ -18,7 +18,12 @@
package org.apache.linkis.entrance.log
class Cache(maxCapacity: Int) {
- val cachedLogs: LoopArray[String] = LoopArray[String](maxCapacity)
+ var cachedLogs: LoopArray[String] = LoopArray[String](maxCapacity)
+
+ def clearCachedLogs(): Unit = {
+ this.cachedLogs = null
+ }
+
}
object Cache {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
index 748f82d..406d43e 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogReader.scala
@@ -68,21 +68,39 @@
}
override protected def readLog(deal: String => Unit, fromLine: Int, size: Int): Int = {
- if (!sharedCache.cachedLogs.nonEmpty) return super.readLog(deal, fromLine, size)
+ if (sharedCache.cachedLogs == null || sharedCache.cachedLogs.isEmpty) {
+ return super.readLog(deal, fromLine, size)
+ }
val min = sharedCache.cachedLogs.min
val max = sharedCache.cachedLogs.max
- if (fromLine > max) return 0
- val from = fromLine
- val to = if (fromLine >= min) {
- if (size >= 0 && max >= fromLine + size) fromLine + size else max + 1
- } else {
- // If you are getting it from a file, you don't need to read the cached data again. In this case, you can guarantee that the log will not be missing.
- val read = super.readLog(deal, fromLine, size)
- return read
- }
- (from until to) map sharedCache.cachedLogs.get foreach deal
- to - fromLine
+ val fakeClearEleNums = sharedCache.cachedLogs.fakeClearEleNums
+
+ if (fromLine > max) return 0
+
+ var from = fromLine
+ val end =
+ if (size >= 0 && max >= fromLine + size) {
+ fromLine + size
+ } else {
+ max + 1
+ }
+
+ var readNums = 0
+ // The log may have been refreshed to the log file regularly and cannot be determined based on min.
+ if (fromLine < fakeClearEleNums) {
+ // If you are getting it from a file, you don't need to read the cached data again. In this case, you can guarantee that the log will not be missing.
+ readNums = super.readLog(deal, fromLine, size)
+ if ((fromLine + size) < min) {
+ return readNums
+ } else {
+ from = from + readNums
+ }
+ } else {}
+
+ (from until end) map sharedCache.cachedLogs.get foreach deal
+ end - from + readNums
+
}
@throws[IOException]
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogWriter.scala
index b54dc75..8f1cea1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogWriter.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogWriter.scala
@@ -33,22 +33,26 @@
def getCache: Option[Cache] = Some(sharedCache)
private def cache(msg: String): Unit = {
+ if (sharedCache.cachedLogs == null) {
+ return
+ }
this synchronized {
- val removed = sharedCache.cachedLogs.add(msg)
+ val isNextOneEmpty = sharedCache.cachedLogs.isNextOneEmpty
val currentTime = new Date(System.currentTimeMillis())
- if (removed != null || currentTime.after(pushTime)) {
+
+ if (isNextOneEmpty == false || currentTime.after(pushTime)) {
val logs = sharedCache.cachedLogs.toList
val sb = new StringBuilder
- if (removed != null) sb.append(removed).append("\n")
logs.filter(_ != null).foreach(log => sb.append(log).append("\n"))
// need append latest msg before clear
- sb.append(msg).append("\n")
+ sb.append(msg)
sharedCache.cachedLogs.fakeClear()
super.write(sb.toString())
pushTime.setTime(
currentTime.getTime + EntranceConfiguration.LOG_PUSH_INTERVAL_TIME.getValue
)
}
+ sharedCache.cachedLogs.add(msg)
}
}
@@ -65,10 +69,12 @@
override def flush(): Unit = {
val sb = new StringBuilder
- sharedCache.cachedLogs.toList
- .filter(StringUtils.isNotEmpty)
- .foreach(sb.append(_).append("\n"))
- sharedCache.cachedLogs.clear()
+ if (sharedCache.cachedLogs != null) {
+ sharedCache.cachedLogs.toList
+ .filter(StringUtils.isNotEmpty)
+ .foreach(sb.append(_).append("\n"))
+ sharedCache.cachedLogs.clear()
+ }
super.write(sb.toString())
super.flush()
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
index 4f37ff1..ff04640 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
@@ -100,33 +100,33 @@
def getCache: Option[Cache] = Some(sharedCache)
private def cache(msg: String): Unit = {
+ if (sharedCache.cachedLogs == null) {
+ return
+ }
WRITE_LOCKER synchronized {
- val removed = sharedCache.cachedLogs.add(msg)
+ val isNextOneEmpty = sharedCache.cachedLogs.isNextOneEmpty
val currentTime = new Date(System.currentTimeMillis())
- if (removed != null || currentTime.after(pushTime)) {
+ if (isNextOneEmpty == false || currentTime.after(pushTime)) {
val logs = sharedCache.cachedLogs.toList
val sb = new StringBuilder
- if (removed != null) sb.append(removed).append("\n")
logs.filter(_ != null).foreach(log => sb.append(log).append("\n"))
- // need append latest msg before fake clear
- sb.append(msg).append("\n")
sharedCache.cachedLogs.fakeClear()
writeToFile(sb.toString())
pushTime.setTime(
currentTime.getTime + EntranceConfiguration.LOG_PUSH_INTERVAL_TIME.getValue
)
}
+ sharedCache.cachedLogs.add(msg)
}
}
private def writeToFile(msg: String): Unit = WRITE_LOCKER synchronized {
- val log =
- if (!firstWrite) "\n" + msg
- else {
- logger.info(s"$toString write first one line log")
- firstWrite = false
- msg
- }
+ val log = msg
+ if (firstWrite) {
+ logger.info(s"$toString write first one line log")
+ firstWrite = false
+ msg
+ }
Utils.tryAndWarnMsg {
getOutputStream.write(log.getBytes(charset))
}(s"$toString error when write query log to outputStream.")
@@ -144,10 +144,12 @@
override def flush(): Unit = {
val sb = new StringBuilder
- sharedCache.cachedLogs.toList
- .filter(_ != null)
- .foreach(sb.append(_).append("\n"))
- sharedCache.cachedLogs.clear()
+ if (sharedCache.cachedLogs != null) {
+ sharedCache.cachedLogs.toList
+ .filter(_ != null)
+ .foreach(sb.append(_).append("\n"))
+ sharedCache.cachedLogs.clear()
+ }
writeToFile(sb.toString())
}
@@ -157,6 +159,7 @@
fileSystem.close()
fileSystem = null
}(s"$toString Error encounters when closing fileSystem")
+ sharedCache.clearCachedLogs()
}
override def toString: String = logPath
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
index 1d5f0cb..da7f058 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogReader.scala
@@ -115,8 +115,11 @@
}
protected def readLog(deal: String => Unit, fromLine: Int, size: Int = 100): Int = {
- val from = if (fromLine < 0) 0 else fromLine
- var line, read = 0
+
+ // fromline param with begin 1 ,if set 0 missing first line
+ val from = if (fromLine < 1) 1 else fromLine
+ var line = 1
+ var read = 0
val inputStream = getInputStream
val lineIterator = IOUtils.lineIterator(inputStream, charset)
Utils.tryFinally(while (lineIterator.hasNext && (read < size || size < 0)) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
index 5b62a49..ff0dfbb 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LoopArray.scala
@@ -23,21 +23,33 @@
def this() = this(32)
+ // realSize 游标之前的数据 已经被重写覆盖了
+ // The data before realSize cursor has been overwritten by rewriting
protected[this] var realSize = 0
- private var flag = 0
+
+ // the loop begin indx
+ private var front = 0
+
+ // the loop last index
+ // 尾部 下一个存储的游标
private var tail = 0
+ private var clearEleNums = 0
+
def add(event: T): T = {
var t = null.asInstanceOf[T]
eventQueue synchronized {
- val index = (tail + 1) % maxCapacity
- if (index == flag) {
- flag = (flag + 1) % maxCapacity
+ val nextIndex = (tail + 1) % maxCapacity
+ // 首尾相遇 第一次循环队列满了,后续所有add动作 nextIndex和front都是相等的 front指针不断往前循环移动
+ // When the first and last ends meet, the first circular queue is full, and all subsequent add actions nextIndex and front are equal.
+ // The front pointer continues to move forward in a circular motion.
+ if (nextIndex == front) {
+ front = (front + 1) % maxCapacity
realSize += 1
}
t = eventQueue(tail).asInstanceOf[T]
eventQueue(tail) = event
- tail = index
+ tail = nextIndex
}
t
}
@@ -51,18 +63,19 @@
} else if (index > _max) {
throw new IllegalArgumentException("The index " + index + " must be less than " + _max)
}
- val _index = (flag + (index - realSize + maxCapacity - 1)) % maxCapacity
+ val _index = (front + (index - realSize + maxCapacity - 1)) % maxCapacity
eventQueue(_index).asInstanceOf[T]
}
def clear(): Unit = eventQueue synchronized {
- flag = 0
+ front = 0
tail = 0
realSize = 0
(0 until maxCapacity).foreach(eventQueue(_) = null)
}
def fakeClear(): Unit = eventQueue synchronized {
+ clearEleNums = clearEleNums + size
(0 until maxCapacity).foreach(eventQueue(_) = null)
}
@@ -73,16 +86,34 @@
if (_size == 0) {
_size = 1
}
- realSize + _size - 1
+ realSize + _size
}
- private def filledSize = if (tail >= flag) tail - flag else tail + maxCapacity - flag
+ def fakeClearEleNums: Int = clearEleNums
+
+ private def filledSize = {
+ if (tail == front && tail == 0) {
+ 0
+ } else if (tail > front) {
+ tail - front
+ } else {
+ tail + maxCapacity - front
+ }
+ }
def size: Int = filledSize
def isFull: Boolean = filledSize == maxCapacity - 1
- def nonEmpty: Boolean = size > 0
+ // If it is not empty, it means that the loop queue is full this round.
+ // 不为空 说明本轮 循环队列满了
+ def isNextOneEmpty(): Boolean = {
+
+ eventQueue(tail).asInstanceOf[T] == null
+
+ }
+
+ def isEmpty: Boolean = size == 0
def toList: List[T] = toIndexedSeq.toList
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
index 1311374..7467746 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
@@ -19,10 +19,16 @@
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.governance.common.entity.job.JobRequest
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.fs.FileSystem
import org.apache.linkis.storage.utils.{FileSystemUtils, StorageConfiguration, StorageUtils}
+import java.text.SimpleDateFormat
+import java.util.Date
+
object CommonLogPathUtils {
def buildCommonPath(commonPath: String): Unit = {
@@ -52,4 +58,35 @@
}
}
+ private val resPrefix = EntranceConfiguration.DEFAULT_LOGPATH_PREFIX.getValue
+
+ /**
+ * get result path parentPath: resPrefix + dateStr + result + creator subPath: parentPath +
+ * executeUser + taskid + filename
+ * @param jobRequest
+ * @return
+ */
+ def getResultParentPath(jobRequest: JobRequest): String = {
+ val resStb = new StringBuilder()
+ if (resStb.endsWith("/")) {
+ resStb.append(resPrefix)
+ } else {
+ resStb.append(resPrefix).append("/")
+ }
+ val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+ val date = new Date(System.currentTimeMillis)
+ val dateString = dateFormat.format(date)
+ val userCreator = LabelUtil.getUserCreatorLabel(jobRequest.getLabels)
+ val creator =
+ if (null == userCreator) EntranceConfiguration.DEFAULT_CREATE_SERVICE
+ else userCreator.getCreator
+ resStb.append("result").append("/").append(dateString).append("/").append(creator)
+ resStb.toString()
+ }
+
+ def getResultPath(jobRequest: JobRequest): String = {
+ val parentPath = getResultParentPath(jobRequest)
+ parentPath + "/" + jobRequest.getExecuteUser + "/" + jobRequest.getId
+ }
+
}
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
index c12e279..dd12d14 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
@@ -42,6 +42,7 @@
static String DB_NAME = "DBNAME";
static String PARAMS = "PARAMS";
static String ENGINE_TYPE = "EngineType";
+ static String ENGINE_VERSION = "EngineVersion";
static String USER = "user";
static String TOKEN_KEY = "key";
@@ -49,6 +50,8 @@
static String PASSWORD = "password";
static boolean TABLEAU_SERVER = false;
static String FIXED_SESSION = "fixedSession";
+
+ static String USE_SSL = "useSSL";
static String VERSION = "version";
static int DEFAULT_VERSION = 1;
static String MAX_CONNECTION_SIZE = "maxConnectionSize";
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
index 80298bc..f75db82 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
@@ -18,7 +18,7 @@
package org.apache.linkis.ujes.jdbc
import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
+import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.engine.{EngineType, EngineTypeLabel, RunType}
import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
@@ -87,9 +87,7 @@
tableauFlag
}
- private[jdbc] val dbName =
- if (StringUtils.isNotBlank(props.getProperty(DB_NAME))) props.getProperty(DB_NAME)
- else "default"
+ private[jdbc] val dbName = props.getProperty(DB_NAME)
private val runningSQLStatements = new util.LinkedList[Statement]
@@ -120,26 +118,30 @@
private val runtimeParams: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]
private[jdbc] def getEngineType: EngineTypeLabel = {
- val engineType: EngineTypeLabel =
- EngineTypeLabelCreator.createEngineTypeLabel(EngineType.TRINO.toString)
+
+ var engineType = EngineType.TRINO.toString
+ var engineVersion = ""
if (props.containsKey(PARAMS)) {
val params = props.getProperty(PARAMS)
if (params != null & params.length() > 0) {
params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).foreach {
- case Array(k, v) if k.equals(UJESSQLDriver.ENGINE_TYPE) =>
- if (v.contains('-')) {
- val factory = LabelBuilderFactoryContext.getLabelBuilderFactory
- val label = factory.createLabel(classOf[EngineTypeLabel])
- label.setStringValue(v)
- return label
- } else {
- return EngineTypeLabelCreator.createEngineTypeLabel(v)
+ case Array(k, v) =>
+ if (k.equals(UJESSQLDriver.ENGINE_TYPE)) {
+ engineType = v
+ } else if (k.equals(UJESSQLDriver.ENGINE_VERSION)) {
+ engineVersion = v
}
+
case _ =>
}
}
}
- engineType
+ if (StringUtils.isNotBlank(engineVersion)) {
+ EngineTypeLabelCreator.registerVersion(engineType, engineVersion)
+ }
+
+ EngineTypeLabelCreator.createEngineTypeLabel(engineType)
+
}
private[jdbc] def throwWhenClosed[T](op: => T): T =
@@ -150,10 +152,6 @@
val statement = op
runningSQLStatements.add(statement)
- if (!inited) {
- inited = true
- Utils.tryAndWarn(statement.execute(s"USE $dbName"))
- }
statement
}
@@ -473,6 +471,10 @@
logger.info("Fixed session is enable session id is {}", connectionId)
}
+ if (StringUtils.isNotBlank(dbName)) {
+ runtimeParams.put(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB, dbName)
+ }
+
val jobSubmitAction = JobSubmitAction.builder
.addExecuteCode(code)
.setStartupParams(startupParams)
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala
index bea2418..96132d5 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala
@@ -23,8 +23,10 @@
import org.apache.linkis.ujes.client.UJESClient
import org.apache.linkis.ujes.jdbc.UJESSQLDriverMain._
+import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.StringUtils
+import java.nio.charset.StandardCharsets
import java.util
import java.util.Properties
@@ -36,29 +38,50 @@
val host = props.getProperty(HOST)
val port = props.getProperty(PORT)
val user = props.getProperty(USER)
- val serverUrl = if (StringUtils.isNotBlank(port)) s"http://$host:$port" else "http://" + host
- val uniqueKey = s"${serverUrl}_$user"
- if (ujesClients.containsKey(uniqueKey)) {
- logger.info("Clients with the same JDBC unique key({}) will get it directly", uniqueKey)
- ujesClients.get(uniqueKey)
+ val pwd = props.getProperty(PASSWORD)
+ val sslEnabled =
+ if (
+ props
+ .containsKey(USE_SSL) && "true".equalsIgnoreCase(props.getProperty(USE_SSL))
+ ) {
+ true
+ } else {
+ false
+ }
+ val prefix = if (sslEnabled) {
+ "https"
} else {
- uniqueKey.intern synchronized {
- if (ujesClients.containsKey(uniqueKey)) {
- logger.info("Clients with the same JDBC unique key({}) will get it directly", uniqueKey)
- return ujesClients.get(uniqueKey)
+ "http"
+ }
+ val serverUrl =
+ if (StringUtils.isNotBlank(port)) s"$prefix://$host:$port" else "$prefix://" + host
+ val uniqueKey = s"${serverUrl}_${user}_${pwd}"
+ val uniqueKeyDes = Hex.encodeHexString(uniqueKey.getBytes(StandardCharsets.UTF_8))
+ if (ujesClients.containsKey(uniqueKeyDes)) {
+ logger.info("Clients with the same JDBC unique key({}) will get it directly", serverUrl)
+ ujesClients.get(uniqueKeyDes)
+ } else {
+ uniqueKeyDes.intern synchronized {
+ if (ujesClients.containsKey(uniqueKeyDes)) {
+ logger.info("Clients with the same JDBC unique key({}) will get it directly", serverUrl)
+ return ujesClients.get(uniqueKeyDes)
}
logger.info(
"The same Client does not exist for the JDBC unique key({}), a new Client will be created",
- uniqueKey
+ serverUrl
)
- val ujesClient = createUJESClient(serverUrl, props)
- ujesClients.put(uniqueKey, ujesClient)
+ val ujesClient = createUJESClient(serverUrl, props, sslEnabled)
+ ujesClients.put(uniqueKeyDes, ujesClient)
ujesClient
}
}
}
- private def createUJESClient(serverUrl: String, props: Properties): UJESClient = {
+ private def createUJESClient(
+ serverUrl: String,
+ props: Properties,
+ sslEnabled: Boolean
+ ): UJESClient = {
val clientConfigBuilder = DWSClientConfigBuilder.newBuilder()
clientConfigBuilder.addServerUrl(serverUrl)
clientConfigBuilder.setAuthTokenKey(props.getProperty(USER))
@@ -89,6 +112,10 @@
}
}
if (!versioned) clientConfigBuilder.setDWSVersion("v" + DEFAULT_VERSION)
+
+ if (sslEnabled) {
+ clientConfigBuilder.setSSL(sslEnabled)
+ }
UJESClient(clientConfigBuilder.build())
}
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
index ab2f6dd..c162b8c 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
@@ -75,6 +75,9 @@
case Array(FIXED_SESSION, value) =>
props.setProperty(FIXED_SESSION, value)
false
+ case Array(USE_SSL, value) =>
+ props.setProperty(USE_SSL, value)
+ false
case Array(key, _) =>
if (StringUtils.isBlank(key)) {
throw new LinkisSQLException(
@@ -139,6 +142,8 @@
val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER
val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION
+ val USE_SSL = UJESSQLDriver.USE_SSL
+
val VERSION = UJESSQLDriver.VERSION
val DEFAULT_VERSION = UJESSQLDriver.DEFAULT_VERSION
val MAX_CONNECTION_SIZE = UJESSQLDriver.MAX_CONNECTION_SIZE
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
index dd5ace5..39418a4 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
@@ -160,11 +160,13 @@
return
}
metaData = resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]]
- for (cursor <- 1 to metaData.size()) {
- val col = metaData.get(cursor - 1)
- resultSetMetaData.setColumnNameProperties(cursor, col.get("columnName"))
- resultSetMetaData.setDataTypeProperties(cursor, col.get("dataType"))
- resultSetMetaData.setCommentPropreties(cursor, col.get("comment"))
+ if (null != metaData) {
+ for (cursor <- 1 to metaData.size()) {
+ val col = metaData.get(cursor - 1)
+ resultSetMetaData.setColumnNameProperties(cursor, col.get("columnName"))
+ resultSetMetaData.setDataTypeProperties(cursor, col.get("dataType"))
+ resultSetMetaData.setCommentPropreties(cursor, col.get("comment"))
+ }
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index fbaea61..4856961 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -71,4 +71,6 @@
public static final String TEMPLATE_CONF_NAME_KEY = "ec.resource.name";
public static final String MANAGER_KEY = "manager";
+
+ public static final String DRIVER_TASK_KEY = "taskId";
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
index 05754c6..b07c77f 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
@@ -28,7 +28,8 @@
import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CLUSTER_LABEL_VALUE_ERROR;
-public class ClusterLabel extends GenericLabel implements EngineNodeLabel, UserModifiable {
+public class ClusterLabel extends GenericLabel
+ implements EMNodeLabel, EngineNodeLabel, UserModifiable {
public ClusterLabel() {
setLabelKey(LabelKeyConstant.YARN_CLUSTER_KEY);
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/DriverTaskLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/DriverTaskLabel.java
new file mode 100644
index 0000000..5f74e27
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/DriverTaskLabel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EngineNodeLabel;
+import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+
+import java.util.HashMap;
+
+public class DriverTaskLabel extends GenericLabel implements EngineNodeLabel {
+
+ public DriverTaskLabel() {
+ setLabelKey(LabelKeyConstant.DRIVER_TASK_KEY);
+ }
+
+ public String getTaskId() {
+ if (null == getValue()) {
+ return null;
+ }
+ return getValue().get(getLabelKey());
+ }
+
+ @ValueSerialNum(0)
+ public void setTaskId(String taskId) {
+ if (null == getValue()) {
+ setValue(new HashMap<>());
+ }
+ getValue().put(getLabelKey(), taskId);
+ }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index c58f642..e155016 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -71,6 +71,8 @@
val IMPALA = Value("impala")
+ val JOBSERVER = Value("jobserver")
+
def mapFsTypeToEngineType(fsType: String): String = {
fsType match {
case "file" =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index 986f130..fc58dbe 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -18,7 +18,7 @@
package org.apache.linkis.manager.label.utils
import org.apache.linkis.manager.label.constant.LabelValueConstant
-import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.{Label, TenantLabel}
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineConnModeLabel,
@@ -82,6 +82,20 @@
getLabelFromList[CodeLanguageLabel](labels)
}
+ def getTenantValue(labels: util.List[Label[_]]): String = {
+ if (null == labels) return ""
+ val tentantLabel = getTenantLabel(labels)
+ if (null != tentantLabel) {
+ tentantLabel.getTenant
+ } else {
+ ""
+ }
+ }
+
+ def getTenantLabel(labels: util.List[Label[_]]): TenantLabel = {
+ getLabelFromList[TenantLabel](labels)
+ }
+
def getEngingeConnRuntimeModeLabel(labels: util.List[Label[_]]): EngingeConnRuntimeModeLabel = {
getLabelFromList[EngingeConnRuntimeModeLabel](labels)
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java
index 8d0b1e4..50f4cb4 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/test/java/org/apache/linkis/manager/label/builder/factory/StdLabelBuilderFactoryTest.java
@@ -19,11 +19,16 @@
import org.apache.linkis.manager.label.builder.CombinedLabelBuilder;
import org.apache.linkis.manager.label.builder.LabelBuilder;
+import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.junit.jupiter.api.*;
/** StdLabelBuilderFactory Tester */
@@ -73,4 +78,14 @@
stdLabelBuilderFactory.createLabel("testLabelKey", null, EMInstanceLabel.class, null);
Assertions.assertTrue(emInstanceLabel1.getLabelKey().equals("emInstance"));
}
+
+ @Test
+ public void test() {
+ Map input = new HashMap<String, String>();
+ input.put("userCreator", "username-IDE");
+ input.put("yarnCluster", "bdp-test");
+ input.put("executeOnce", "true");
+ List<Label> res = stdLabelBuilderFactory.getLabels(input);
+ System.out.println(res);
+ }
}
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
index 1c1eee9..15e4947 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala
@@ -19,6 +19,7 @@
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
+import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
@@ -223,9 +224,11 @@
var compileRet = -1
Utils.tryCatch {
compileRet = driver.compile(realCode)
- logger.info(s"driver compile realCode : ${realCode} finished, status : ${compileRet}")
+ logger.info(
+ s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}"
+ )
if (0 != compileRet) {
- logger.warn(s"compile realCode : ${realCode} error status : ${compileRet}")
+ logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}")
throw HiveQueryFailedException(
COMPILE_HIVE_QUERY_ERROR.getErrorCode,
COMPILE_HIVE_QUERY_ERROR.getErrorDesc
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index abe32d7..aac9cb7 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -34,6 +34,7 @@
}
import org.apache.linkis.engineplugin.hive.exception.HiveQueryFailedException
import org.apache.linkis.engineplugin.hive.progress.HiveProgressHelper
+import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.hadoop.common.conf.HadoopConf
@@ -144,49 +145,82 @@
val realCode = code.trim()
LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
+
if (StringUtils.isNotBlank(jobId)) {
- LOG.info(s"set mapreduce.job.tags=LINKIS_$jobId")
- hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
+ val jobTags = JobUtils.getJobSourceTagsFromObjectMap(engineExecutorContext.getProperties)
+ val tags = if (StringUtils.isAsciiPrintable(jobTags)) {
+ s"LINKIS_$jobId,$jobTags"
+ } else {
+ s"LINKIS_$jobId"
+ }
+ LOG.info(s"set mapreduce.job.tags=$tags")
+ hiveConf.set("mapreduce.job.tags", tags)
}
+
if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
val tokens = realCode.trim.split("""\s+""")
SessionState.setCurrentSessionState(sessionState)
sessionState.setLastCommand(code)
+ if (
+ engineExecutorContext.getCurrentParagraph == 1 && engineExecutorContext.getProperties
+ .containsKey(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
+ ) {
+ val defaultDB =
+ engineExecutorContext.getProperties
+ .get(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
+ .asInstanceOf[String]
+ logger.info(s"set default DB to $defaultDB")
+ sessionState.setCurrentDatabase(defaultDB)
+ }
val proc = CommandProcessorFactory.get(tokens, hiveConf)
this.proc = proc
LOG.debug("ugi is " + ugi.getUserName)
- ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
- override def run(): ExecuteResponse = {
- proc match {
- case any if HiveDriverProxy.isDriver(any) =>
- logger.info(s"driver is $any")
- thread = Thread.currentThread()
- driver = new HiveDriverProxy(any)
- executeHQL(realCode, driver)
- case _ =>
- val resp = proc.run(realCode.substring(tokens(0).length).trim)
- val result = new String(baos.toByteArray)
- logger.info("RESULT => {}", result)
- engineExecutorContext.appendStdout(result)
- baos.reset()
- if (resp.getResponseCode != 0) {
- clearCurrentProgress()
+ Utils.tryFinally {
+ ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
+ override def run(): ExecuteResponse = {
+ proc match {
+ case any if HiveDriverProxy.isDriver(any) =>
+ logger.info(s"driver is $any")
+ thread = Thread.currentThread()
+ driver = new HiveDriverProxy(any)
+ executeHQL(realCode, driver)
+ case _ =>
+ val resp = proc.run(realCode.substring(tokens(0).length).trim)
+ val result = new String(baos.toByteArray)
+ logger.info("RESULT => {}", result)
+ engineExecutorContext.appendStdout(result)
+ baos.reset()
+ if (resp.getResponseCode != 0) {
+ clearCurrentProgress()
+ HiveProgressHelper.clearHiveProgress()
+ onComplete()
+ singleSqlProgressMap.clear()
+ HiveProgressHelper.storeSingleSQLProgress(0.0f)
+ throw resp.getException
+ }
HiveProgressHelper.clearHiveProgress()
+ HiveProgressHelper.storeSingleSQLProgress(0.0f)
onComplete()
singleSqlProgressMap.clear()
- HiveProgressHelper.storeSingleSQLProgress(0.0f)
- throw resp.getException
- }
- HiveProgressHelper.clearHiveProgress()
- HiveProgressHelper.storeSingleSQLProgress(0.0f)
- onComplete()
- singleSqlProgressMap.clear()
- SuccessExecuteResponse()
+ SuccessExecuteResponse()
+ }
+ }
+ })
+ } {
+ if (this.driver != null) {
+ Utils.tryQuietly {
+ driver.close()
+ this.driver = null
+ val ss = SessionState.get()
+ if (ss != null) {
+ ss.deleteTmpOutputFile()
+ ss.deleteTmpErrOutputFile()
+ }
}
}
- })
+ }
}
private def executeHQL(realCode: String, driver: HiveDriverProxy): ExecuteResponse = {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
index 6e302b4..01eccdb 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala
@@ -69,7 +69,6 @@
"Spark application sc has already stopped, please restart it."
)
}
-
val startTime = System.currentTimeMillis()
// sc.setJobGroup(jobGroup, "Get IDE-SQL Results.", false)
@@ -121,7 +120,7 @@
)
)
.toArray[Column]
- columns.foreach(c => logger.info(s"c is ${c.getColumnName()}, comment is ${c.getComment()}"))
+ columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer =
@@ -147,11 +146,11 @@
)
}
val taken = ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)
- logger.info(s"Time taken: ${taken}, Fetched $index row(s).")
+ logger.info(s"Time taken: ${taken}, Fetched $index row(s)")
// to register TempTable
// Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:")
engineExecutionContext.appendStdout(
- s"${EngineUtils.getName} >> Time taken: ${taken}, Fetched $index row(s)."
+ s"${EngineUtils.getName} >> Time taken: ${taken}, Fetched ${columns.length} col(s) : $index row(s)"
)
engineExecutionContext.sendResultSet(writer)
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index 1a6203a..5439629 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -102,7 +102,7 @@
override def init(): Unit = {
setCodeParser(new PythonCodeParser)
super.init()
- logger.info("spark sql executor start")
+ logger.info("spark python executor start")
}
override def killTask(taskID: String): Unit = {
@@ -113,7 +113,14 @@
}
override def close: Unit = {
- logger.info("python executor ready to close")
+
+ logger.info(s"To remove pyspark executor")
+ Utils.tryAndError(
+ ExecutorManager.getInstance.removeExecutor(getExecutorLabels().asScala.toArray)
+ )
+ logger.info(s"Finished remove pyspark executor")
+
+ logger.info("To kill pyspark process")
if (process != null) {
if (gatewayServer != null) {
Utils.tryAndError(gatewayServer.shutdown())
@@ -126,18 +133,14 @@
logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
GovernanceUtils.killProcess(String.valueOf(p), s"kill pyspark process,pid: $pid", false)
})
- if (pid.isEmpty) {
- process.destroy()
- process = null
- }
+
+ Utils.tryQuietly(process.destroy())
+ process = null
+ logger.info("Finished kill pyspark process")
}("process close failed")
}
- logger.info(s"To delete python executor")
- Utils.tryAndError(
- ExecutorManager.getInstance.removeExecutor(getExecutorLabels().asScala.toArray)
- )
- logger.info(s"Finished to kill python")
- logger.info("python executor Finished to close")
+
+ logger.info("python executor finished to close")
}
override def getKind: Kind = PySpark()
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index 99627a5..523db9b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -102,11 +102,13 @@
Utils.newCachedExecutionContext(5, "Spark-Scala-REPL-Thread-", true)
override def init(): Unit = {
-
System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
-
setCodeParser(new ScalaCodeParser)
+ super.init()
+ logger.info("spark scala executor start")
+ }
+ def lazyInitLoadILoop(): Unit = {
if (sparkILoop == null) {
synchronized {
if (sparkILoop == null) createSparkILoop
@@ -146,6 +148,7 @@
jobGroup: String
): ExecuteResponse = {
this.jobGroup.append(jobGroup)
+ lazyInitLoadILoop()
if (null != sparkILoop.intp && null != sparkILoop.intp.classLoader) {
Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader)
}
@@ -153,13 +156,22 @@
lineOutputStream.reset(engineExecutionContext)
}
- lazyLoadILoop
+ doBindSparkSession()
+
lineOutputStream.ready()
if (sparkILoopInited) {
this.engineExecutionContextFactory.setEngineExecutionContext(engineExecutionContext)
}
var res: ExecuteResponse = null
+ if (Thread.currentThread().isInterrupted) {
+ logger.error("The thread of execution has been interrupted and the task should be terminated")
+ return ErrorExecuteResponse(
+ "The thread of execution has been interrupted and the task should be terminated",
+ null
+ )
+ }
+
Utils.tryCatch {
res = executeLine(code, engineExecutionContext)
} { case e: Exception =>
@@ -220,30 +232,43 @@
// error("incomplete code.")
IncompleteExecuteResponse(null)
case Results.Error =>
- lineOutputStream.flush()
- val output = lineOutputStream.toString
- IOUtils.closeQuietly(lineOutputStream)
- var errorMsg: String = null
- if (StringUtils.isNotBlank(output)) {
- errorMsg = Utils.tryCatch(EngineUtils.getResultStrByDolphinTextContent(output))(t =>
- t.getMessage
+ if (Thread.currentThread().isInterrupted) {
+ logger.error(
+ "The thread of execution has been interrupted and the task should be terminated"
)
- logger.error("Execute code error for " + errorMsg)
- engineExecutionContext.appendStdout("Execute code error for " + errorMsg)
- if (matchFatalLog(errorMsg)) {
- logger.error("engine log fatal logs now to set status to shutdown")
- ExecutorManager.getInstance.getReportExecutor.tryShutdown()
+ Utils.tryQuietly {
+ IOUtils.closeQuietly(lineOutputStream)
}
- } else {
- logger.error("No error message is captured, please see the detailed log")
- }
- ErrorExecuteResponse(
- errorMsg,
- ExecuteError(
- EXECUTE_SPARKSCALA_FAILED.getErrorCode,
- EXECUTE_SPARKSCALA_FAILED.getErrorDesc
+ ErrorExecuteResponse(
+ "The thread of execution has been interrupted and the task should be terminated",
+ null
)
- )
+ } else {
+ lineOutputStream.flush()
+ val output = lineOutputStream.toString
+ IOUtils.closeQuietly(lineOutputStream)
+ var errorMsg: String = null
+ if (StringUtils.isNotBlank(output)) {
+ errorMsg = Utils.tryCatch(EngineUtils.getResultStrByDolphinTextContent(output))(t =>
+ t.getMessage
+ )
+ logger.error("Execute code error for " + errorMsg)
+ engineExecutionContext.appendStdout("Execute code error for " + errorMsg)
+ if (matchFatalLog(errorMsg)) {
+ logger.error("engine log fatal logs now to set status to shutdown")
+ ExecutorManager.getInstance.getReportExecutor.tryShutdown()
+ }
+ } else {
+ logger.error("No error message is captured, please see the detailed log")
+ }
+ ErrorExecuteResponse(
+ errorMsg,
+ ExecuteError(
+ EXECUTE_SPARKSCALA_FAILED.getErrorCode,
+ EXECUTE_SPARKSCALA_FAILED.getErrorDesc
+ )
+ )
+ }
}
}
// reset the java stdout
@@ -275,11 +300,10 @@
}
}
- private def lazyLoadILoop = { // lazy loaded.
+ private def doBindSparkSession() = { // lazy loaded.
if (!bindFlag) {
bindSparkSession
}
-
}
private def initSparkILoop = {
@@ -418,6 +442,7 @@
}
override protected def getExecutorIdPreFix: String = "SparkScalaExecutor_"
+
}
class EngineExecutionContextFactory {
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
index 1dc5b99..33b3dad 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala
@@ -23,6 +23,7 @@
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
import org.apache.linkis.engineplugin.spark.utils.EngineUtils
+import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.scheduler.executer.{
ErrorExecuteResponse,
@@ -53,6 +54,18 @@
jobGroup: String
): ExecuteResponse = {
+ if (
+ engineExecutionContext.getCurrentParagraph == 1 && engineExecutionContext.getProperties
+ .containsKey(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
+ ) {
+ val defaultDB =
+ engineExecutionContext.getProperties
+ .get(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
+ .asInstanceOf[String]
+ logger.info(s"set default DB to $defaultDB")
+ sparkEngineSession.sqlContext.sql(s"use $defaultDB")
+ }
+
logger.info("SQLExecutor run query: " + code)
engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> $code")
val standInClassLoader = Thread.currentThread().getContextClassLoader
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java
index f2fee2f..611c5e1 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/conf/AcrossClusterRuleKeys.java
@@ -19,25 +19,35 @@
public class AcrossClusterRuleKeys {
- public static final String KEY_QUEUE_SUFFIX = "suffix";
+ public static final String KEY_CROSS_QUEUE = "crossQueue";
- public static final String KEY_ACROSS_CLUSTER_QUEUE_SUFFIX = "bdap2bdp";
+ public static final String KEY_PRIORITY_CLUSTER = "priorityCluster";
public static final String KEY_START_TIME = "startTime";
public static final String KEY_END_TIME = "endTime";
- public static final String KEY_CPU_THRESHOLD = "CPUThreshold";
+ public static final String KEY_TARGET_CPU_THRESHOLD = "targetCPUThreshold";
- public static final String KEY_MEMORY_THRESHOLD = "MemoryThreshold";
+ public static final String KEY_TARGET_MEMORY_THRESHOLD = "targetMemoryThreshold";
- public static final String KEY_CPU_PERCENTAGE_THRESHOLD = "CPUPercentageThreshold";
+ public static final String KEY_TARGET_CPU_PERCENTAGE_THRESHOLD = "targetCPUPercentageThreshold";
- public static final String KEY_MEMORY_PERCENTAGE_THRESHOLD = "MemoryPercentageThreshold";
+ public static final String KEY_TARGET_MEMORY_PERCENTAGE_THRESHOLD =
+ "targetMemoryPercentageThreshold";
+
+ public static final String KEY_ORIGIN_CPU_PERCENTAGE_THRESHOLD = "originCPUPercentageThreshold";
+
+ public static final String KEY_ORIGIN_MEMORY_PERCENTAGE_THRESHOLD =
+ "originMemoryPercentageThreshold";
public static final String KEY_QUEUE_RULE = "queueRule";
public static final String KEY_TIME_RULE = "timeRule";
- public static final String KEY_THRESHOLD_RULE = "thresholdRule";
+ public static final String KEY_TARGET_CLUSTER_RULE = "targetClusterRule";
+
+ public static final String KEY_ORIGIN_CLUSTER_RULE = "originClusterRule";
+
+ public static final String KEY_PRIORITY_CLUSTER_RULE = "priorityClusterRule";
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java
index 9dadcf9..55c5b1a 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/AcrossClusterRuleMapper.java
@@ -25,13 +25,22 @@
public interface AcrossClusterRuleMapper {
- AcrossClusterRule getAcrossClusterRule(@Param("id") Long id);
+ AcrossClusterRule getAcrossClusterRule(@Param("id") Long id, @Param("username") String username);
- void deleteAcrossClusterRule(
- @Param("creator") String creator, @Param("username") String username);
+ void deleteAcrossClusterRule(@Param("id") Long id);
+
+ void deleteAcrossClusterRuleByBatch(@Param("ids") List<Long> ids);
+
+ void deleteAcrossClusterRuleByUsername(@Param("username") String username);
+
+ void deleteAcrossClusterRuleByCrossQueue(@Param("crossQueue") String crossQueue);
void updateAcrossClusterRule(@Param("acrossClusterRule") AcrossClusterRule acrossClusterRule);
+ void updateAcrossClusterRuleByBatch(
+ @Param("ids") List<Long> ids,
+ @Param("acrossClusterRule") AcrossClusterRule acrossClusterRule);
+
void insertAcrossClusterRule(@Param("acrossClusterRule") AcrossClusterRule acrossClusterRule);
List<AcrossClusterRule> queryAcrossClusterRuleList(
@@ -39,5 +48,17 @@
@Param("creator") String creator,
@Param("clusterName") String clusterName);
- void validAcrossClusterRule(@Param("isValid") String isValid, @Param("id") Long id);
+ void validAcrossClusterRule(
+ @Param("isValid") String isValid, @Param("id") Long id, @Param("username") String username);
+
+ /**
+ * Query across cluster resource rule by username.
+ *
+ * @param username
+ * @return
+ */
+ AcrossClusterRule queryAcrossClusterRuleByUserName(@Param("username") String username);
+
+ void validAcrossClusterRuleByBatch(
+ @Param("ids") List<Long> ids, @Param("isValid") String isValid);
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentMapper.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentMapper.java
new file mode 100644
index 0000000..195fef6
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentMapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.dao;
+
+import org.apache.linkis.configuration.entity.DepartmentVo;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface DepartmentMapper {
+
+ DepartmentVo getDepartmentByUser(@Param("user") String user);
+
+ List<DepartmentVo> queryDepartmentList();
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentTenantMapper.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentTenantMapper.java
new file mode 100644
index 0000000..336ad69
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/dao/DepartmentTenantMapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.dao;
+
+import org.apache.linkis.configuration.entity.DepartmentTenantVo;
+
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface DepartmentTenantMapper {
+
+ void insertTenant(DepartmentTenantVo departmentTenantVo);
+
+ void updateTenant(DepartmentTenantVo departmentTenantVo);
+
+ List<DepartmentTenantVo> queryTenantList(
+ @Param("creator") String creator,
+ @Param("departmentId") String departmentId,
+ @Param("tenantValue") String tenantValue);
+
+ void deleteTenant(@Param("id") Integer id);
+
+ DepartmentTenantVo queryTenant(
+ @Param("creator") String creator, @Param("departmentId") String departmentId);
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentTenantVo.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentTenantVo.java
new file mode 100644
index 0000000..41b4d3d
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentTenantVo.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.entity;
+
+import java.util.Date;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class DepartmentTenantVo {
+ @ApiModelProperty("id")
+ private String id;
+
+ @ApiModelProperty("creator")
+ private String creator;
+
+ @ApiModelProperty("department")
+ private String department;
+
+ @ApiModelProperty("departmentId")
+ private String departmentId;
+
+ @ApiModelProperty("tenantValue")
+ private String tenantValue;
+
+ @ApiModelProperty("createTime")
+ private Date createTime;
+
+ @ApiModelProperty("updateTime")
+ private Date updateTime;
+
+ @ApiModelProperty("createBy")
+ private String createBy;
+
+ @ApiModelProperty("isValid")
+ private String isValid;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCreator() {
+ return creator;
+ }
+
+ public void setCreator(String creator) {
+ this.creator = creator;
+ }
+
+ public String getDepartmentId() {
+ return departmentId;
+ }
+
+ public void setDepartmentId(String departmentId) {
+ this.departmentId = departmentId;
+ }
+
+ public String getTenantValue() {
+ return tenantValue;
+ }
+
+ public void setTenantValue(String tenantValue) {
+ this.tenantValue = tenantValue;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
+
+ public String getCreateBy() {
+ return createBy;
+ }
+
+ public void setCreateBy(String createBy) {
+ this.createBy = createBy;
+ }
+
+ public String getDepartment() {
+ return department;
+ }
+
+ public void setDepartment(String department) {
+ this.department = department;
+ }
+
+ public String getIsValid() {
+ return isValid;
+ }
+
+ public void setIsValid(String isValid) {
+ this.isValid = isValid;
+ }
+
+ @Override
+ public String toString() {
+ return "DepartmentTenantVo{"
+ + "id='"
+ + id
+ + '\''
+ + ", creator='"
+ + creator
+ + '\''
+ + ", department='"
+ + department
+ + '\''
+ + ", departmentId='"
+ + departmentId
+ + '\''
+ + ", tenantValue='"
+ + tenantValue
+ + '\''
+ + ", createTime="
+ + createTime
+ + ", updateTime="
+ + updateTime
+ + ", bussinessUser='"
+ + createBy
+ + ", isValid="
+ + isValid
+ + '\''
+ + '}';
+ }
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentVo.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentVo.java
new file mode 100644
index 0000000..197e360
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/DepartmentVo.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.entity;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class DepartmentVo {
+
+ @ApiModelProperty("cluster_code")
+ private String clusterCode;
+
+ @ApiModelProperty("user_type")
+ private String userType;
+
+ @ApiModelProperty("user_name")
+ private String userName;
+
+ @ApiModelProperty("org_id")
+ private String orgId;
+
+ @ApiModelProperty("org_name")
+ private String orgName;
+
+ @ApiModelProperty("queue_name")
+ private String queueName;
+
+ @ApiModelProperty("db_name")
+ private String dbName;
+
+ @ApiModelProperty("interface_user")
+ private String interfaceUser;
+
+ @ApiModelProperty("is_union_analyse")
+ private String isUnionAnalyse;
+
+ @ApiModelProperty("create_time")
+ private String createTime;
+
+ @ApiModelProperty("user_itsm_no")
+ private String userItsmNo;
+
+ public String getClusterCode() {
+ return clusterCode;
+ }
+
+ public void setClusterCode(String clusterCode) {
+ this.clusterCode = clusterCode;
+ }
+
+ public String getUserType() {
+ return userType;
+ }
+
+ public void setUserType(String userType) {
+ this.userType = userType;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getOrgId() {
+ return orgId;
+ }
+
+ public void setOrgId(String orgId) {
+ this.orgId = orgId;
+ }
+
+ public String getOrgName() {
+ return orgName;
+ }
+
+ public void setOrgName(String orgName) {
+ this.orgName = orgName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ public String getInterfaceUser() {
+ return interfaceUser;
+ }
+
+ public void setInterfaceUser(String interfaceUser) {
+ this.interfaceUser = interfaceUser;
+ }
+
+ public String getIsUnionAnalyse() {
+ return isUnionAnalyse;
+ }
+
+ public void setIsUnionAnalyse(String isUnionAnalyse) {
+ this.isUnionAnalyse = isUnionAnalyse;
+ }
+
+ public String getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(String createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getUserItsmNo() {
+ return userItsmNo;
+ }
+
+ public void setUserItsmNo(String userItsmNo) {
+ this.userItsmNo = userItsmNo;
+ }
+
+ @Override
+ public String toString() {
+ return "DepartmentVo{"
+ + "clusterCode='"
+ + clusterCode
+ + '\''
+ + ", userType='"
+ + userType
+ + '\''
+ + ", userName='"
+ + userName
+ + '\''
+ + ", orgId='"
+ + orgId
+ + '\''
+ + ", orgName='"
+ + orgName
+ + '\''
+ + ", queueName='"
+ + queueName
+ + '\''
+ + ", dbName='"
+ + dbName
+ + '\''
+ + ", interfaceUser='"
+ + interfaceUser
+ + '\''
+ + ", isUnionAnalyse='"
+ + isUnionAnalyse
+ + '\''
+ + ", createTime='"
+ + createTime
+ + '\''
+ + ", userItsmNo='"
+ + userItsmNo
+ + '\''
+ + '}';
+ }
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/TenantVo.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/TenantVo.java
index c2399ff..903de73 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/TenantVo.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/entity/TenantVo.java
@@ -49,6 +49,9 @@
@ApiModelProperty("bussinessUser")
private String bussinessUser;
+ @ApiModelProperty("isValid")
+ private String isValid;
+
public String getId() {
return id;
}
@@ -113,6 +116,14 @@
this.bussinessUser = bussinessUser;
}
+ public String getIsValid() {
+ return isValid;
+ }
+
+ public void setIsValid(String isValid) {
+ this.isValid = isValid;
+ }
+
@Override
public String toString() {
return "TenantVo{"
@@ -137,6 +148,8 @@
+ '\''
+ ", bussinessUser='"
+ bussinessUser
+ + ", isValid="
+ + isValid
+ '\''
+ '}';
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java
index 3a01c86..ea66c08 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/AcrossClusterRuleRestfulApi.java
@@ -31,6 +31,8 @@
import javax.servlet.http.HttpServletRequest;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import io.swagger.annotations.Api;
@@ -54,16 +56,15 @@
notes = "valid acrossClusterRule",
response = Message.class)
@ApiImplicitParams({
- @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"),
@ApiImplicitParam(name = "id", dataType = "Integer", value = "id"),
@ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"),
})
@RequestMapping(path = "/isValid", method = RequestMethod.PUT)
public Message isValidRule(HttpServletRequest req, @RequestBody Map<String, Object> json) {
String operationUser = ModuleUserUtils.getOperationUser(req, "execute valid acrossClusterRule");
+ String username = null;
if (!Configuration.isAdmin(operationUser)) {
- return Message.error(
- "Failed to valid acrossClusterRule List,msg: only administrators can configure");
+ username = operationUser;
}
Integer idInt = (Integer) json.get("id");
@@ -71,13 +72,53 @@
String isValid = (String) json.get("isValid");
if (StringUtils.isBlank(isValid)) {
- return Message.error("Failed to valid acrossClusterRule: Illegal Input Param");
+ return Message.error("Failed to valid acrossClusterRule, Illegal Input Param: isValid");
}
try {
- acrossClusterRuleService.validAcrossClusterRule(id, isValid);
+ acrossClusterRuleService.validAcrossClusterRule(id, isValid, username);
} catch (Exception e) {
- log.info("valid acrossClusterRule failed:" + e.getMessage());
+ log.info("valid acrossClusterRule failed: ", e);
+ return Message.error("valid acrossClusterRule failed");
+ }
+
+ return Message.ok();
+ }
+
+ @ApiOperation(
+ value = "valid acrossClusterRules by batch",
+ notes = "valid acrossClusterRules by batch",
+ response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "ids", dataType = "List<Integer>", value = "ids"),
+ @ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"),
+ })
+ @RequestMapping(path = "/isValidByBatch", method = RequestMethod.PUT)
+ public Message isValidRulesByBatch(
+ HttpServletRequest req, @RequestBody Map<String, Object> json) {
+ String operationUser = ModuleUserUtils.getOperationUser(req, "execute valid acrossClusterRule");
+ if (!Configuration.isAdmin(operationUser)) {
+ return Message.error(
+ "Failed to valid acrossClusterRule batch, msg: only administrators can configure");
+ }
+
+ @SuppressWarnings("unchecked")
+ ArrayList<Integer> ids = (ArrayList<Integer>) json.get("ids");
+ List<Long> ruleIds = new ArrayList<>();
+ for (Integer id : ids) {
+ ruleIds.add(id.longValue());
+ }
+
+ String isValid = (String) json.get("isValid");
+
+ if (StringUtils.isBlank(isValid)) {
+ return Message.error("Failed to valid acrossClusterRule, Illegal Input Param: isValid");
+ }
+
+ try {
+ acrossClusterRuleService.validAcrossClusterRuleByBatch(ruleIds, isValid);
+ } catch (Exception e) {
+ log.info("valid acrossClusterRule failed: ", e);
return Message.error("valid acrossClusterRule failed");
}
@@ -89,10 +130,11 @@
notes = "query acrossClusterRule list",
response = Message.class)
@ApiImplicitParams({
- @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"),
@ApiImplicitParam(name = "creator", dataType = "String", value = "creator"),
@ApiImplicitParam(name = "username", dataType = "String", value = "username"),
@ApiImplicitParam(name = "clusterName", dataType = "String", value = "clusterName"),
+ @ApiImplicitParam(name = "pageNow", dataType = "Integer", value = "pageNow"),
+ @ApiImplicitParam(name = "pageSize", dataType = "Integer", value = "pageSize"),
})
@RequestMapping(path = "/list", method = RequestMethod.GET)
public Message queryAcrossClusterRuleList(
@@ -104,9 +146,13 @@
@RequestParam(value = "pageSize", required = false) Integer pageSize) {
String operationUser =
ModuleUserUtils.getOperationUser(req, "execute query acrossClusterRule List");
+
if (!Configuration.isAdmin(operationUser)) {
- return Message.error(
- "Failed to query acrossClusterRule List,msg: only administrators can configure");
+ if (StringUtils.isNotBlank(username) && !operationUser.equals(username)) {
+ username = "noexist";
+ } else {
+ username = operationUser;
+ }
}
if (StringUtils.isBlank(username)) username = null;
@@ -114,14 +160,13 @@
if (StringUtils.isBlank(clusterName)) clusterName = null;
if (null == pageNow) pageNow = 1;
if (null == pageSize) pageSize = 20;
-
Map<String, Object> resultMap = null;
try {
resultMap =
acrossClusterRuleService.queryAcrossClusterRuleList(
creator, username, clusterName, pageNow, pageSize);
} catch (Exception e) {
- log.info("query acrossClusterRule List failed:" + e.getMessage());
+ log.info("query acrossClusterRule List failed: ", e);
return Message.error("query acrossClusterRule List failed");
}
@@ -135,15 +180,11 @@
notes = "delete acrossClusterRule",
response = Message.class)
@ApiImplicitParams({
- @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"),
- @ApiImplicitParam(name = "creator", dataType = "String", value = "creator"),
- @ApiImplicitParam(name = "username", dataType = "String", value = "username"),
+ @ApiImplicitParam(name = "id", dataType = "Integer", value = "id"),
})
@RequestMapping(path = "/delete", method = RequestMethod.DELETE)
public Message deleteAcrossClusterRule(
- HttpServletRequest req,
- @RequestParam(value = "creator", required = false) String creator,
- @RequestParam(value = "username", required = false) String username) {
+ HttpServletRequest req, @RequestParam(value = "id", required = false) Integer id) {
String operationUser =
ModuleUserUtils.getOperationUser(req, "execute delete acrossClusterRule");
if (!Configuration.isAdmin(operationUser)) {
@@ -151,14 +192,44 @@
"Failed to delete acrossClusterRule,msg: only administrators can configure");
}
- if (StringUtils.isBlank(creator) || StringUtils.isBlank(username)) {
- return Message.error("Failed to delete acrossClusterRule: Illegal Input Param");
+ try {
+ acrossClusterRuleService.deleteAcrossClusterRule(id.longValue());
+ } catch (Exception e) {
+ log.info("delete acrossClusterRule failed: ", e);
+ return Message.error("delete acrossClusterRule failed");
+ }
+
+ return Message.ok();
+ }
+
+ @ApiOperation(
+ value = "delete acrossClusterRule by batch",
+ notes = "delete acrossClusterRule by batch",
+ response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "ids", dataType = "List<Integer>", value = "ids"),
+ })
+ @RequestMapping(path = "/deleteByBatch", method = RequestMethod.PUT)
+ public Message deleteAcrossClusterRuleByBatch(
+ HttpServletRequest req, @RequestBody Map<String, Object> json) {
+ String operationUser =
+ ModuleUserUtils.getOperationUser(req, "execute delete acrossClusterRule");
+ if (!Configuration.isAdmin(operationUser)) {
+ return Message.error(
+ "Failed to delete acrossClusterRule,msg: only administrators can configure");
+ }
+
+ @SuppressWarnings("unchecked")
+ ArrayList<Integer> ids = (ArrayList<Integer>) json.get("ids");
+ List<Long> ruleIds = new ArrayList<>();
+ for (Integer id : ids) {
+ ruleIds.add(id.longValue());
}
try {
- acrossClusterRuleService.deleteAcrossClusterRule(creator, username);
+ acrossClusterRuleService.deleteAcrossClusterRuleByBatch(ruleIds);
} catch (Exception e) {
- log.info("delete acrossClusterRule failed:" + e.getMessage());
+ log.info("delete acrossClusterRule failed: ", e);
return Message.error("delete acrossClusterRule failed");
}
@@ -166,11 +237,75 @@
}
@ApiOperation(
+ value = "delete acrossClusterRule",
+ notes = "delete acrossClusterRule",
+ response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "username", dataType = "String", value = "username"),
+ })
+ @RequestMapping(path = "/deleteByUsername", method = RequestMethod.DELETE)
+ public Message deleteAcrossClusterRuleByUsername(
+ HttpServletRequest req, @RequestParam(value = "username", required = false) String username) {
+ String operationUser =
+ ModuleUserUtils.getOperationUser(req, "execute delete acrossClusterRule");
+ if (!Configuration.isAdmin(operationUser)) {
+ return Message.error(
+ "Failed to delete acrossClusterRule,msg: only administrators can configure");
+ }
+
+ if (StringUtils.isBlank(username)) {
+ return Message.error("Failed to delete acrossClusterRule, Illegal Input Param: username");
+ }
+
+ try {
+ acrossClusterRuleService.deleteAcrossClusterRuleByUsername(username);
+ } catch (Exception e) {
+ log.info("delete acrossClusterRule failed:", e);
+ return Message.error("delete acrossClusterRule failed, username is: " + username);
+ }
+
+ return Message.ok();
+ }
+
+ @ApiOperation(
+ value = "delete acrossClusterRule",
+ notes = "delete acrossClusterRule",
+ response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "crossQueue", dataType = "String", value = "crossQueue"),
+ })
+ @RequestMapping(path = "/deleteByCrossQueue", method = RequestMethod.DELETE)
+ public Message deleteAcrossClusterRuleByCrossQueue(
+ HttpServletRequest req,
+ @RequestParam(value = "crossQueue", required = false) String crossQueue) {
+ String operationUser =
+ ModuleUserUtils.getOperationUser(req, "execute delete acrossClusterRule");
+ if (!Configuration.isAdmin(operationUser)) {
+ return Message.error(
+ "Failed to delete acrossClusterRule,msg: only administrators can configure");
+ }
+
+ if (StringUtils.isBlank(crossQueue)) {
+ return Message.error(
+ "Failed to delete acrossClusterRule, Illegal Input Param: " + crossQueue);
+ }
+
+ try {
+ acrossClusterRuleService.deleteAcrossClusterRuleByCrossQueue(
+ CommonUtils.concatQueue(crossQueue));
+ } catch (Exception e) {
+ log.info("delete acrossClusterRule failed:", e);
+ return Message.error("delete acrossClusterRule failed, crossQueue is: " + crossQueue);
+ }
+
+ return Message.ok();
+ }
+
+ @ApiOperation(
value = "update acrossClusterRule",
notes = "update acrossClusterRule ",
response = Message.class)
@ApiImplicitParams({
- @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"),
@ApiImplicitParam(name = "id", dataType = "Integer", value = "id"),
@ApiImplicitParam(name = "clusterName", dataType = "String", value = "clusterName"),
@ApiImplicitParam(name = "creator", dataType = "String", value = "creator"),
@@ -178,16 +313,32 @@
@ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"),
@ApiImplicitParam(name = "startTime", dataType = "String", value = "startTime"),
@ApiImplicitParam(name = "endTime", dataType = "String", value = "endTime"),
- @ApiImplicitParam(name = "CPUThreshold", dataType = "String", value = "CPUThreshold"),
- @ApiImplicitParam(name = "MemoryThreshold", dataType = "String", value = "MemoryThreshold"),
+ @ApiImplicitParam(name = "crossQueue", dataType = "String", value = "crossQueue"),
+ @ApiImplicitParam(name = "priorityCluster", dataType = "String", value = "priorityCluster"),
@ApiImplicitParam(
- name = "CPUPercentageThreshold",
+ name = "targetCPUThreshold",
dataType = "String",
- value = "CPUPercentageThreshold"),
+ value = "targetCPUThreshold"),
@ApiImplicitParam(
- name = "MemoryPercentageThreshold",
+ name = "targetMemoryThreshold",
dataType = "String",
- value = "MemoryPercentageThreshold"),
+ value = "targetMemoryThreshold"),
+ @ApiImplicitParam(
+ name = "originCPUPercentageThreshold",
+ dataType = "String",
+ value = "originCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "originMemoryPercentageThreshold",
+ dataType = "String",
+ value = "originMemoryPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetCPUPercentageThreshold",
+ dataType = "String",
+ value = "targetCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetMemoryPercentageThreshold",
+ dataType = "String",
+ value = "targetMemoryPercentageThreshold"),
})
@RequestMapping(path = "/update", method = RequestMethod.PUT)
public Message updateAcrossClusterRule(
@@ -207,21 +358,29 @@
String isValid = (String) json.get("isValid");
String startTime = (String) json.get("startTime");
String endTime = (String) json.get("endTime");
- String CPUThreshold = (String) json.get("CPUThreshold");
- String MemoryThreshold = (String) json.get("MemoryThreshold");
- String CPUPercentageThreshold = (String) json.get("CPUPercentageThreshold");
- String MemoryPercentageThreshold = (String) json.get("MemoryPercentageThreshold");
+ String crossQueue = (String) json.get("crossQueue");
+ String priorityCluster = (String) json.get("priorityCluster");
+ String targetCPUThreshold = (String) json.get("targetCPUThreshold");
+ String targetMemoryThreshold = (String) json.get("targetMemoryThreshold");
+ String targetCPUPercentageThreshold = (String) json.get("targetCPUPercentageThreshold");
+ String targetMemoryPercentageThreshold = (String) json.get("targetMemoryPercentageThreshold");
+ String originCPUPercentageThreshold = (String) json.get("originCPUPercentageThreshold");
+ String originMemoryPercentageThreshold = (String) json.get("originMemoryPercentageThreshold");
if (StringUtils.isBlank(clusterName)
|| StringUtils.isBlank(creator)
|| StringUtils.isBlank(username)
|| StringUtils.isBlank(isValid)
|| StringUtils.isBlank(startTime)
|| StringUtils.isBlank(endTime)
- || StringUtils.isBlank(CPUThreshold)
- || StringUtils.isBlank(MemoryThreshold)
- || StringUtils.isBlank(CPUPercentageThreshold)
- || StringUtils.isBlank(MemoryPercentageThreshold)) {
- return Message.error("Failed to add acrossClusterRule: Illegal Input Param");
+ || StringUtils.isBlank(crossQueue)
+ || StringUtils.isBlank(priorityCluster)
+ || StringUtils.isBlank(targetCPUThreshold)
+ || StringUtils.isBlank(targetMemoryThreshold)
+ || StringUtils.isBlank(targetCPUPercentageThreshold)
+ || StringUtils.isBlank(targetMemoryPercentageThreshold)
+ || StringUtils.isBlank(originCPUPercentageThreshold)
+ || StringUtils.isBlank(originMemoryPercentageThreshold)) {
+ return Message.error("Failed to add acrossClusterRule, Illegal Input Param");
}
try {
@@ -229,10 +388,14 @@
CommonUtils.ruleMap2String(
startTime,
endTime,
- CPUThreshold,
- MemoryThreshold,
- CPUPercentageThreshold,
- MemoryPercentageThreshold);
+ crossQueue,
+ priorityCluster,
+ targetCPUThreshold,
+ targetMemoryThreshold,
+ targetCPUPercentageThreshold,
+ targetMemoryPercentageThreshold,
+ originCPUPercentageThreshold,
+ originMemoryPercentageThreshold);
AcrossClusterRule acrossClusterRule = new AcrossClusterRule();
acrossClusterRule.setId(id);
acrossClusterRule.setClusterName(clusterName.toLowerCase());
@@ -243,8 +406,119 @@
acrossClusterRule.setIsValid(isValid);
acrossClusterRuleService.updateAcrossClusterRule(acrossClusterRule);
} catch (Exception e) {
- log.info("update acrossClusterRule failed:" + e.getMessage());
- return Message.error("update acrossClusterRule failed:history already exist");
+ log.info("update acrossClusterRule failed:", e);
+ return Message.error("update acrossClusterRule failed, rule already exits");
+ }
+ return Message.ok();
+ }
+
+ @ApiOperation(
+ value = "update acrossClusterRule by batch",
+ notes = "update acrossClusterRule by batch",
+ response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "ids", dataType = "List<Integer>", value = "ids"),
+ @ApiImplicitParam(name = "clusterName", dataType = "String", value = "clusterName"),
+ @ApiImplicitParam(name = "creator", dataType = "String", value = "creator"),
+ @ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"),
+ @ApiImplicitParam(name = "startTime", dataType = "String", value = "startTime"),
+ @ApiImplicitParam(name = "endTime", dataType = "String", value = "endTime"),
+ @ApiImplicitParam(name = "crossQueue", dataType = "String", value = "crossQueue"),
+ @ApiImplicitParam(name = "priorityCluster", dataType = "String", value = "priorityCluster"),
+ @ApiImplicitParam(
+ name = "targetCPUThreshold",
+ dataType = "String",
+ value = "targetCPUThreshold"),
+ @ApiImplicitParam(
+ name = "targetMemoryThreshold",
+ dataType = "String",
+ value = "targetMemoryThreshold"),
+ @ApiImplicitParam(
+ name = "originCPUPercentageThreshold",
+ dataType = "String",
+ value = "originCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "originMemoryPercentageThreshold",
+ dataType = "String",
+ value = "originMemoryPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetCPUPercentageThreshold",
+ dataType = "String",
+ value = "targetCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetMemoryPercentageThreshold",
+ dataType = "String",
+ value = "targetMemoryPercentageThreshold"),
+ })
+ @RequestMapping(path = "/updateByBatch", method = RequestMethod.PUT)
+ public Message updateAcrossClusterRuleByBatch(
+ HttpServletRequest req, @RequestBody Map<String, Object> json) {
+ String operationUser =
+ ModuleUserUtils.getOperationUser(req, "execute update acrossClusterRule");
+ if (!Configuration.isAdmin(operationUser)) {
+ return Message.error(
+ "Failed to update acrossClusterRule,msg: only administrators can configure");
+ }
+
+ @SuppressWarnings("unchecked")
+ ArrayList<Integer> ids = (ArrayList<Integer>) json.get("ids");
+ List<Long> ruleIds = new ArrayList<>();
+ for (Integer id : ids) {
+ ruleIds.add(id.longValue());
+ }
+
+ String clusterName = (String) json.get("clusterName");
+ String creator = (String) json.get("creator");
+ String isValid = (String) json.get("isValid");
+ String startTime = (String) json.get("startTime");
+ String endTime = (String) json.get("endTime");
+ String crossQueue = (String) json.get("crossQueue");
+ String priorityCluster = (String) json.get("priorityCluster");
+ String targetCPUThreshold = (String) json.get("targetCPUThreshold");
+ String targetMemoryThreshold = (String) json.get("targetMemoryThreshold");
+ String targetCPUPercentageThreshold = (String) json.get("targetCPUPercentageThreshold");
+ String targetMemoryPercentageThreshold = (String) json.get("targetMemoryPercentageThreshold");
+ String originCPUPercentageThreshold = (String) json.get("originCPUPercentageThreshold");
+ String originMemoryPercentageThreshold = (String) json.get("originMemoryPercentageThreshold");
+ if (StringUtils.isBlank(clusterName)
+ || StringUtils.isBlank(creator)
+ || StringUtils.isBlank(isValid)
+ || StringUtils.isBlank(startTime)
+ || StringUtils.isBlank(endTime)
+ || StringUtils.isBlank(crossQueue)
+ || StringUtils.isBlank(priorityCluster)
+ || StringUtils.isBlank(targetCPUThreshold)
+ || StringUtils.isBlank(targetMemoryThreshold)
+ || StringUtils.isBlank(targetCPUPercentageThreshold)
+ || StringUtils.isBlank(targetMemoryPercentageThreshold)
+ || StringUtils.isBlank(originCPUPercentageThreshold)
+ || StringUtils.isBlank(originMemoryPercentageThreshold)) {
+ return Message.error("Failed to add acrossClusterRule, Illegal Input Param");
+ }
+
+ try {
+ String rules =
+ CommonUtils.ruleMap2String(
+ startTime,
+ endTime,
+ crossQueue,
+ priorityCluster,
+ targetCPUThreshold,
+ targetMemoryThreshold,
+ targetCPUPercentageThreshold,
+ targetMemoryPercentageThreshold,
+ originCPUPercentageThreshold,
+ originMemoryPercentageThreshold);
+ AcrossClusterRule acrossClusterRule = new AcrossClusterRule();
+ acrossClusterRule.setClusterName(clusterName.toLowerCase());
+ acrossClusterRule.setCreator(creator);
+ acrossClusterRule.setUpdateBy(operationUser);
+ acrossClusterRule.setRules(rules);
+ acrossClusterRule.setIsValid(isValid);
+ acrossClusterRuleService.updateAcrossClusterRuleByBatch(ruleIds, acrossClusterRule);
+ } catch (Exception e) {
+ log.info("update acrossClusterRule failed:", e);
+ return Message.error("update acrossClusterRule failed, rule already exits");
}
return Message.ok();
}
@@ -254,23 +528,38 @@
notes = "add acrossClusterRule ",
response = Message.class)
@ApiImplicitParams({
- @ApiImplicitParam(name = "req", dataType = "HttpServletRequest", value = "req"),
@ApiImplicitParam(name = "clusterName", dataType = "String", value = "clusterName"),
@ApiImplicitParam(name = "creator", dataType = "String", value = "creator"),
@ApiImplicitParam(name = "username", dataType = "String", value = "username"),
@ApiImplicitParam(name = "isValid", dataType = "String", value = "isValid"),
@ApiImplicitParam(name = "startTime", dataType = "String", value = "startTime"),
@ApiImplicitParam(name = "endTime", dataType = "String", value = "endTime"),
- @ApiImplicitParam(name = "CPUThreshold", dataType = "String", value = "CPUThreshold"),
- @ApiImplicitParam(name = "MemoryThreshold", dataType = "String", value = "MemoryThreshold"),
+ @ApiImplicitParam(name = "crossQueue", dataType = "String", value = "crossQueue"),
+ @ApiImplicitParam(name = "priorityCluster", dataType = "String", value = "priorityCluster"),
@ApiImplicitParam(
- name = "CPUPercentageThreshold",
+ name = "targetCPUThreshold",
dataType = "String",
- value = "CPUPercentageThreshold"),
+ value = "targetCPUThreshold"),
@ApiImplicitParam(
- name = "MemoryPercentageThreshold",
+ name = "targetMemoryThreshold",
dataType = "String",
- value = "MemoryPercentageThreshold"),
+ value = "targetMemoryThreshold"),
+ @ApiImplicitParam(
+ name = "originCPUPercentageThreshold",
+ dataType = "String",
+ value = "originCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "originMemoryPercentageThreshold",
+ dataType = "String",
+ value = "originMemoryPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetCPUPercentageThreshold",
+ dataType = "String",
+ value = "targetCPUPercentageThreshold"),
+ @ApiImplicitParam(
+ name = "targetMemoryPercentageThreshold",
+ dataType = "String",
+ value = "targetMemoryPercentageThreshold"),
})
@RequestMapping(path = "/add", method = RequestMethod.POST)
public Message insertAcrossClusterRule(
@@ -287,21 +576,29 @@
String isValid = (String) json.get("isValid");
String startTime = (String) json.get("startTime");
String endTime = (String) json.get("endTime");
- String CPUThreshold = (String) json.get("CPUThreshold");
- String MemoryThreshold = (String) json.get("MemoryThreshold");
- String CPUPercentageThreshold = (String) json.get("CPUPercentageThreshold");
- String MemoryPercentageThreshold = (String) json.get("MemoryPercentageThreshold");
+ String crossQueue = (String) json.get("crossQueue");
+ String priorityCluster = (String) json.get("priorityCluster");
+ String targetCPUThreshold = (String) json.get("targetCPUThreshold");
+ String targetMemoryThreshold = (String) json.get("targetMemoryThreshold");
+ String targetCPUPercentageThreshold = (String) json.get("targetCPUPercentageThreshold");
+ String targetMemoryPercentageThreshold = (String) json.get("targetMemoryPercentageThreshold");
+ String originCPUPercentageThreshold = (String) json.get("originCPUPercentageThreshold");
+ String originMemoryPercentageThreshold = (String) json.get("originMemoryPercentageThreshold");
if (StringUtils.isBlank(clusterName)
|| StringUtils.isBlank(creator)
|| StringUtils.isBlank(username)
|| StringUtils.isBlank(isValid)
|| StringUtils.isBlank(startTime)
|| StringUtils.isBlank(endTime)
- || StringUtils.isBlank(CPUThreshold)
- || StringUtils.isBlank(MemoryThreshold)
- || StringUtils.isBlank(CPUPercentageThreshold)
- || StringUtils.isBlank(MemoryPercentageThreshold)) {
- return Message.error("Failed to add acrossClusterRule: Illegal Input Param");
+ || StringUtils.isBlank(crossQueue)
+ || StringUtils.isBlank(priorityCluster)
+ || StringUtils.isBlank(targetCPUThreshold)
+ || StringUtils.isBlank(targetMemoryThreshold)
+ || StringUtils.isBlank(targetCPUPercentageThreshold)
+ || StringUtils.isBlank(targetMemoryPercentageThreshold)
+ || StringUtils.isBlank(originCPUPercentageThreshold)
+ || StringUtils.isBlank(originMemoryPercentageThreshold)) {
+ return Message.error("Failed to add acrossClusterRule, Illegal Input Param");
}
try {
@@ -309,10 +606,14 @@
CommonUtils.ruleMap2String(
startTime,
endTime,
- CPUThreshold,
- MemoryThreshold,
- CPUPercentageThreshold,
- MemoryPercentageThreshold);
+ crossQueue,
+ priorityCluster,
+ targetCPUThreshold,
+ targetMemoryThreshold,
+ targetCPUPercentageThreshold,
+ targetMemoryPercentageThreshold,
+ originCPUPercentageThreshold,
+ originMemoryPercentageThreshold);
AcrossClusterRule acrossClusterRule = new AcrossClusterRule();
acrossClusterRule.setClusterName(clusterName.toLowerCase());
acrossClusterRule.setCreator(creator);
@@ -323,8 +624,8 @@
acrossClusterRule.setIsValid(isValid);
acrossClusterRuleService.insertAcrossClusterRule(acrossClusterRule);
} catch (Exception e) {
- log.info("add acrossClusterRule failed:" + e.getMessage());
- return Message.error("add acrossClusterRule failed:history already exist");
+ log.info("add acrossClusterRule failed:", e);
+ return Message.error("add acrossClusterRule failed, rule already exits");
}
return Message.ok();
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java
index 11dfee8..5586455 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/ConfigurationRestfulApi.java
@@ -142,7 +142,8 @@
ModuleUserUtils.getOperationUser(
req,
MessageFormat.format(
- "ConfigurationException,engineType:{0},version:{1}", engineType, version));
+ "getFullTreesByAppName,engineType:{0},version:{1},creator:{2}",
+ engineType, version, creator));
List labelList =
LabelEntityParser.generateUserCreatorEngineTypeLabelList(
username, creator, engineType, version);
@@ -173,7 +174,7 @@
ModuleUserUtils.getOperationUser(req, "getItemList with engineType:" + engineType);
// Adding * represents returning all configuration information
if ("*".equals(engineType)) {
- engineType = null;
+ engineType = "";
}
List<ConfigKey> result = configKeyService.getConfigKeyList(engineType);
List<Map<String, Object>> filterResult = new ArrayList<>();
@@ -497,7 +498,8 @@
@ApiImplicitParam(name = "version", required = true, dataType = "String", value = "version"),
@ApiImplicitParam(name = "creator", required = true, dataType = "String", value = "creator"),
@ApiImplicitParam(name = "configKey", required = true, dataType = "String"),
- @ApiImplicitParam(name = "configValue", required = true, dataType = "String")
+ @ApiImplicitParam(name = "configValue", required = true, dataType = "String"),
+ @ApiImplicitParam(name = "configKeyId", required = false, dataType = "String")
})
@ApiOperationSupport(ignoreParameters = {"json"})
@RequestMapping(path = "/keyvalue", method = RequestMethod.POST)
@@ -505,12 +507,13 @@
throws ConfigurationException {
Message message = Message.ok();
String username = ModuleUserUtils.getOperationUser(req, "saveKey");
- String engineType = (String) json.getOrDefault("engineType", "*");
- String user = (String) json.getOrDefault("user", "");
- String version = (String) json.getOrDefault("version", "*");
- String creator = (String) json.getOrDefault("creator", "*");
- String configKey = (String) json.get("configKey");
- String value = (String) json.get("configValue");
+ String engineType = ((String) json.getOrDefault("engineType", "*")).trim();
+ String user = ((String) json.getOrDefault("user", "")).trim();
+ String version = ((String) json.getOrDefault("version", "*")).trim();
+ String creator = ((String) json.getOrDefault("creator", "*")).trim();
+ String configKey = ((String) json.get("configKey")).trim();
+ String value = ((String) json.get("configValue")).trim();
+ String configKeyId = ((String) json.getOrDefault("configKeyId", "")).trim();
boolean force = Boolean.parseBoolean(json.getOrDefault("force", "false").toString());
if (!org.apache.linkis.common.conf.Configuration.isAdmin(username) && !username.equals(user)) {
return Message.error("Only admin can modify other user configuration data");
@@ -531,7 +534,9 @@
ConfigKeyValue configKeyValue = new ConfigKeyValue();
configKeyValue.setKey(configKey);
configKeyValue.setConfigValue(value);
-
+ if (StringUtils.isNotBlank(configKeyId)) {
+ configKeyValue.setId(Long.valueOf(configKeyId));
+ }
try {
configurationService.paramCheck(configKeyValue);
} catch (Exception e) {
@@ -562,10 +567,10 @@
public Message deleteKeyValue(HttpServletRequest req, @RequestBody Map<String, Object> json)
throws ConfigurationException {
String username = ModuleUserUtils.getOperationUser(req, "deleteKeyValue");
- String engineType = (String) json.getOrDefault("engineType", "*");
- String version = (String) json.getOrDefault("version", "*");
- String creator = (String) json.getOrDefault("creator", "*");
- String configKey = (String) json.get("configKey");
+ String engineType = ((String) json.getOrDefault("engineType", "*")).trim();
+ String version = ((String) json.getOrDefault("version", "*")).trim();
+ String creator = ((String) json.getOrDefault("creator", "*")).trim();
+ String configKey = ((String) json.get("configKey")).trim();
if (engineType.equals("*") && !version.equals("*")) {
return Message.error("When engineType is any engine, the version must also be any version");
}
@@ -690,10 +695,6 @@
throws ConfigurationException, InstantiationException, IllegalAccessException {
checkAdmin(ModuleUserUtils.getOperationUser(req, "saveBaseKeyValue"));
String key = configKey.getKey();
- String name = configKey.getName();
- String treeName = configKey.getTreeName();
- String description = configKey.getDescription();
- Integer boundaryType = configKey.getBoundaryType();
String defaultValue = configKey.getDefaultValue();
String validateType = configKey.getValidateType();
String validateRange = configKey.getValidateRange();
@@ -701,13 +702,14 @@
if (StringUtils.isBlank(key)) {
return Message.error("key cannot be empty");
}
- if (StringUtils.isBlank(name)) {
+ configKey.setKey(configKey.getKey().trim());
+ if (StringUtils.isBlank(configKey.getName())) {
return Message.error("name cannot be empty");
}
- if (StringUtils.isBlank(description)) {
+ if (StringUtils.isBlank(configKey.getDescription())) {
return Message.error("description cannot be empty");
}
- if (StringUtils.isBlank(treeName)) {
+ if (StringUtils.isBlank(configKey.getTreeName())) {
return Message.error("treeName cannot be empty");
}
if (StringUtils.isBlank(validateType)) {
@@ -716,7 +718,7 @@
if (!validateType.equals("None") && StringUtils.isBlank(validateRange)) {
return Message.error("validateRange cannot be empty");
}
- if (null == boundaryType) {
+ if (null == configKey.getBoundaryType()) {
return Message.error("boundaryType cannot be empty");
}
if (StringUtils.isNotEmpty(defaultValue)
@@ -730,6 +732,7 @@
key, validateType, validateRange, defaultValue);
throw new ConfigurationException(msg);
}
+ configKey.setDefaultValue(configKey.getDefaultValue().trim());
if (null == configKey.getId()) {
List<ConfigKey> configBykey =
configKeyService.getConfigBykey(engineType, key, req.getHeader("Content-Language"));
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateManagerRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateRestfulApi.java
similarity index 95%
rename from linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateManagerRestfulApi.java
rename to linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateRestfulApi.java
index e0d52d4..e6391c5 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateManagerRestfulApi.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TemplateRestfulApi.java
@@ -52,9 +52,9 @@
@Api(tags = "configuration template")
@RestController
@RequestMapping(path = "/configuration/template")
-public class TemplateManagerRestfulApi {
+public class TemplateRestfulApi {
- private static final Logger logger = LoggerFactory.getLogger(TemplateManagerRestfulApi.class);
+ private static final Logger logger = LoggerFactory.getLogger(TemplateRestfulApi.class);
@Autowired private TemplateConfigKeyService templateConfigKeyService;
@@ -86,7 +86,8 @@
// check special admin token
if (StringUtils.isNotBlank(token)) {
if (!Configuration.isAdminToken(token)) {
- return Message.error("Token has no permission to updateKeyMapping.");
+ logger.warn("Token:{} has no permission to updateKeyMapping.", token);
+ return Message.error("Token:" + token + " has no permission to updateKeyMapping.");
}
} else if (!Configuration.isAdmin(username)) {
logger.warn("User:{} has no permission to updateKeyMapping.", username);
@@ -159,7 +160,8 @@
// check special admin token
if (StringUtils.isNotBlank(token)) {
if (!Configuration.isAdminToken(token)) {
- return Message.error("Token has no permission to queryKeyInfoList.");
+ logger.warn("Token:{} has no permission to queryKeyInfoList.", token);
+ return Message.error("Token:" + token + " has no permission to queryKeyInfoList.");
}
} else if (!Configuration.isAdmin(username)) {
logger.warn("User:{} has no permission to queryKeyInfoList.", username);
@@ -210,7 +212,8 @@
// check special admin token
if (StringUtils.isNotBlank(token)) {
if (!Configuration.isAdminToken(token)) {
- return Message.error("Token has no permission to apply.");
+ logger.warn("Token:{} has no permission to apply.", token);
+ return Message.error("Token:" + token + " has no permission to apply.");
}
} else if (!Configuration.isAdmin(username)) {
logger.warn("User:{} has no permission to apply.", username);
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TenantConfigrationRestfulApi.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TenantConfigrationRestfulApi.java
index 3bd5b80..4ab7455 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TenantConfigrationRestfulApi.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/restful/api/TenantConfigrationRestfulApi.java
@@ -18,6 +18,7 @@
package org.apache.linkis.configuration.restful.api;
import org.apache.linkis.common.conf.Configuration;
+import org.apache.linkis.configuration.entity.DepartmentTenantVo;
import org.apache.linkis.configuration.entity.TenantVo;
import org.apache.linkis.configuration.exception.ConfigurationException;
import org.apache.linkis.configuration.service.TenantConfigService;
@@ -75,15 +76,15 @@
if (!Configuration.isAdmin(userName)) {
return Message.error("Failed to create-tenant,msg: only administrators can configure");
}
+ parameterVerification(tenantVo);
if (tenantConfigService.isExist(tenantVo.getUser(), tenantVo.getCreator())) {
throw new ConfigurationException("User-creator is existed");
}
- parameterVerification(tenantVo);
tenantConfigService.createTenant(tenantVo);
} catch (DuplicateKeyException e) {
return Message.error("Failed to create-tenant,msg:create user-creator is existed");
} catch (ConfigurationException e) {
- return Message.error("Failed to update-tenant,msg:" + e.getMessage());
+ return Message.error("Failed to create-tenant,msg:" + e.getMessage());
}
return Message.ok();
}
@@ -112,10 +113,6 @@
if (!Configuration.isAdmin(userName)) {
return Message.error("Failed to update-tenant,msg: only administrators can configure");
}
- // if (!tenantConfigService.checkUserCteator(tenantVo.getUser(), tenantVo.getCreator(),
- // null)) {
- // throw new ConfigurationException("User-creator is not existed");
- // }
parameterVerification(tenantVo);
tenantConfigService.updateTenant(tenantVo);
} catch (ConfigurationException e) {
@@ -267,4 +264,127 @@
throw new ConfigurationException("User && Creator cannot be both *");
}
}
+
+ @ApiImplicitParams({
+ @ApiImplicitParam(
+ paramType = "body",
+ dataType = "DepartmentTenantVo",
+ name = "departmentTenantVo",
+ value = "departmentTenantVo")
+ })
+ @ApiOperation(
+ value = "save-department-tenant",
+ notes = "save department tenant",
+ httpMethod = "POST",
+ response = Message.class)
+ @RequestMapping(path = "/save-department-tenant", method = RequestMethod.POST)
+ public Message saveDepartmentTenant(
+ HttpServletRequest req, @RequestBody DepartmentTenantVo departmentTenantVo)
+ throws ConfigurationException {
+ String userName = ModuleUserUtils.getOperationUser(req, "execute saveDepartmentTenant");
+ if (!Configuration.isAdmin(userName)) {
+ return Message.error("Failed to save-department-tenant,msg: only administrator users to use");
+ }
+ if (StringUtils.isBlank(departmentTenantVo.getTenantValue())) {
+ return Message.error("tenant can not be empty");
+ }
+ if (StringUtils.isBlank(departmentTenantVo.getCreator())) {
+ return Message.error("creator can not be empty");
+ }
+ if (StringUtils.isBlank(departmentTenantVo.getDepartmentId())) {
+ return Message.error("departmentId can not be empty");
+ }
+ // Query whether the data exists before executing the update
+ if (StringUtils.isBlank(departmentTenantVo.getId())) {
+ DepartmentTenantVo departTenant =
+ tenantConfigService.queryDepartTenant(
+ departmentTenantVo.getCreator(), departmentTenantVo.getDepartmentId());
+ if (null != departTenant) {
+ return Message.error("department creator is exist");
+ }
+ }
+ tenantConfigService.saveDepartmentTenant(departmentTenantVo);
+ return Message.ok();
+ }
+
+ @ApiImplicitParams({
+ @ApiImplicitParam(
+ paramType = "query",
+ dataType = "string",
+ name = "department",
+ value = "department"),
+ @ApiImplicitParam(
+ paramType = "query",
+ dataType = "string",
+ name = "creator",
+ value = "creator"),
+ @ApiImplicitParam(
+ paramType = "query",
+ dataType = "string",
+ name = "tenantValue",
+ value = "tenantValue"),
+ @ApiImplicitParam(paramType = "query", dataType = "int", name = "pageNow", value = "pageNow"),
+ @ApiImplicitParam(paramType = "query", dataType = "int", name = "pageSize", value = "pageSize")
+ })
+ @ApiOperation(
+ value = "query-department-tenant",
+ notes = "query department tenant list",
+ httpMethod = "GET",
+ response = Message.class)
+ @RequestMapping(path = "/query-department-tenant", method = RequestMethod.GET)
+ public Message queryDepartmentTenant(
+ HttpServletRequest req,
+ @RequestParam(value = "departmentId", required = false) String departmentId,
+ @RequestParam(value = "creator", required = false) String creator,
+ @RequestParam(value = "tenantValue", required = false) String tenantValue,
+ @RequestParam(value = "pageNow", required = false, defaultValue = "1") Integer pageNow,
+ @RequestParam(value = "pageSize", required = false, defaultValue = "20") Integer pageSize) {
+ String userName = ModuleUserUtils.getOperationUser(req, "execute queryDepartmentTenantList");
+ if (!Configuration.isAdmin(userName)) {
+ return Message.error("Failed to query-tenant-list,msg: only administrator users to use");
+ }
+ if (StringUtils.isBlank(departmentId)) departmentId = null;
+ if (StringUtils.isBlank(creator)) creator = null;
+ if (StringUtils.isBlank(tenantValue)) tenantValue = null;
+ Map<String, Object> resultMap =
+ tenantConfigService.queryDepartmentTenant(
+ departmentId, creator, tenantValue, pageNow, pageSize);
+ return Message.ok()
+ .data("tenantList", resultMap.get("tenantList"))
+ .data(JobRequestConstants.TOTAL_PAGE(), resultMap.get(JobRequestConstants.TOTAL_PAGE()));
+ }
+
+ @ApiImplicitParams({
+ @ApiImplicitParam(paramType = "query", dataType = "int", name = "id", value = "id")
+ })
+ @ApiOperation(
+ value = "delete-department-tenant",
+ notes = "delete department tenant",
+ httpMethod = "GET",
+ response = Message.class)
+ @RequestMapping(path = "/delete-department-tenant", method = RequestMethod.GET)
+ public Message deleteDepartmentTenant(
+ HttpServletRequest req, @RequestParam(value = "id") Integer id) {
+ String userName =
+ ModuleUserUtils.getOperationUser(req, "execute deleteDepartmentTenant,id: " + id);
+ if (!Configuration.isAdmin(userName)) {
+ return Message.error(
+ "Failed to delete-department-tenant,msg: only administrator users to use");
+ }
+ if (StringUtils.isBlank(id.toString())) {
+ return Message.error("id can not be empty");
+ }
+ tenantConfigService.deleteDepartmentTenant(id);
+ return Message.ok();
+ }
+
+ @ApiOperation(
+ value = "query department",
+ notes = "query department",
+ httpMethod = "GET",
+ response = Message.class)
+ @RequestMapping(path = "/query-department", method = RequestMethod.GET)
+ public Message queryDepartmentList() {
+ return Message.ok().data("departmentList", tenantConfigService.queryDepartmentList());
+ }
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java
index 2fff11c..c38b1a9 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/AcrossClusterRuleService.java
@@ -18,20 +18,38 @@
package org.apache.linkis.configuration.service;
import org.apache.linkis.configuration.entity.AcrossClusterRule;
+import org.apache.linkis.governance.common.protocol.conf.AcrossClusterRequest;
+import org.apache.linkis.governance.common.protocol.conf.AcrossClusterResponse;
+import org.apache.linkis.rpc.Sender;
+import java.util.List;
import java.util.Map;
public interface AcrossClusterRuleService {
- void deleteAcrossClusterRule(String creator, String username) throws Exception;
+ void deleteAcrossClusterRule(Long id) throws Exception;
+
+ void deleteAcrossClusterRuleByBatch(List<Long> ids) throws Exception;
+
+ void deleteAcrossClusterRuleByUsername(String username) throws Exception;
+
+ void deleteAcrossClusterRuleByCrossQueue(String crossQueue) throws Exception;
void updateAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws Exception;
+ void updateAcrossClusterRuleByBatch(List<Long> ids, AcrossClusterRule acrossClusterRule)
+ throws Exception;
+
void insertAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws Exception;
Map<String, Object> queryAcrossClusterRuleList(
String creator, String username, String clusterName, Integer pageNow, Integer pageSize)
throws Exception;
- void validAcrossClusterRule(Long id, String isValid) throws Exception;
+ void validAcrossClusterRule(Long id, String isValid, String username) throws Exception;
+
+ void validAcrossClusterRuleByBatch(List<Long> ids, String isValid) throws Exception;
+
+ AcrossClusterResponse getAcrossClusterRuleByUsername(
+ AcrossClusterRequest acrossClusterRequest, Sender sender) throws Exception;
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/DepartmentService.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/DepartmentService.java
new file mode 100644
index 0000000..eb97f21
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/DepartmentService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.service;
+
+import org.apache.linkis.governance.common.protocol.conf.DepartmentRequest;
+import org.apache.linkis.governance.common.protocol.conf.DepartmentResponse;
+import org.apache.linkis.rpc.Sender;
+
+public interface DepartmentService {
+
+ DepartmentResponse getDepartmentByUser(DepartmentRequest departmentRequest, Sender sender);
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantConfigService.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantConfigService.java
index 87b14a9..0baa30c 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantConfigService.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantConfigService.java
@@ -17,9 +17,12 @@
package org.apache.linkis.configuration.service;
+import org.apache.linkis.configuration.entity.DepartmentTenantVo;
+import org.apache.linkis.configuration.entity.DepartmentVo;
import org.apache.linkis.configuration.entity.TenantVo;
import org.apache.linkis.configuration.exception.ConfigurationException;
+import java.util.List;
import java.util.Map;
public interface TenantConfigService {
@@ -36,4 +39,15 @@
Boolean isExist(String user, String creator) throws ConfigurationException;
TenantVo queryTenant(String user, String creator);
+
+ void saveDepartmentTenant(DepartmentTenantVo departmentTenantVo) throws ConfigurationException;
+
+ Map<String, Object> queryDepartmentTenant(
+ String departmentId, String creator, String tenantValue, Integer pageNow, Integer pageSize);
+
+ void deleteDepartmentTenant(Integer id);
+
+ DepartmentTenantVo queryDepartTenant(String creator, String departmentId);
+
+ List<DepartmentVo> queryDepartmentList();
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantService.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantService.java
index df88923..f7a06ae 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantService.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/TenantService.java
@@ -17,6 +17,8 @@
package org.apache.linkis.configuration.service;
+import org.apache.linkis.governance.common.protocol.conf.DepartTenantRequest;
+import org.apache.linkis.governance.common.protocol.conf.DepartTenantResponse;
import org.apache.linkis.governance.common.protocol.conf.TenantRequest;
import org.apache.linkis.governance.common.protocol.conf.TenantResponse;
import org.apache.linkis.rpc.Sender;
@@ -24,4 +26,6 @@
public interface TenantService {
TenantResponse getTenantData(TenantRequest request, Sender sender);
+
+ DepartTenantResponse getDepartTenantData(DepartTenantRequest departTenantRequest, Sender sender);
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java
index a906ca2..487b0b5 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/AcrossClusterRuleServiceImpl.java
@@ -21,6 +21,11 @@
import org.apache.linkis.configuration.entity.AcrossClusterRule;
import org.apache.linkis.configuration.service.AcrossClusterRuleService;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
+import org.apache.linkis.governance.common.protocol.conf.AcrossClusterRequest;
+import org.apache.linkis.governance.common.protocol.conf.AcrossClusterResponse;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+import org.apache.linkis.server.BDPJettyServerHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -29,9 +34,13 @@
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.linkis.configuration.conf.AcrossClusterRuleKeys.KEY_CROSS_QUEUE;
+import static org.apache.linkis.configuration.conf.AcrossClusterRuleKeys.KEY_QUEUE_RULE;
+
@Service
public class AcrossClusterRuleServiceImpl implements AcrossClusterRuleService {
@@ -39,13 +48,33 @@
@Autowired private AcrossClusterRuleMapper ruleMapper;
@Override
- public void deleteAcrossClusterRule(String creator, String username) throws Exception {
- ruleMapper.deleteAcrossClusterRule(creator, username);
+ public void deleteAcrossClusterRule(Long id) throws Exception {
+ AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(id, null);
+ if (beforeRule == null) {
+ throw new Exception("acrossClusterRule not exit");
+ }
+
+ ruleMapper.deleteAcrossClusterRule(id);
+ }
+
+ @Override
+ public void deleteAcrossClusterRuleByBatch(List<Long> ids) throws Exception {
+ ruleMapper.deleteAcrossClusterRuleByBatch(ids);
+ }
+
+ @Override
+ public void deleteAcrossClusterRuleByUsername(String username) throws Exception {
+ ruleMapper.deleteAcrossClusterRuleByUsername(username);
+ }
+
+ @Override
+ public void deleteAcrossClusterRuleByCrossQueue(String crossQueue) throws Exception {
+ ruleMapper.deleteAcrossClusterRuleByCrossQueue(crossQueue);
}
@Override
public void updateAcrossClusterRule(AcrossClusterRule newRule) throws Exception {
- AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(newRule.getId());
+ AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(newRule.getId(), null);
if (beforeRule == null) {
throw new Exception("acrossClusterRule not exit");
}
@@ -59,6 +88,15 @@
}
@Override
+ public void updateAcrossClusterRuleByBatch(List<Long> ids, AcrossClusterRule newRule)
+ throws Exception {
+ Date time = new Date();
+ newRule.setUpdateTime(time);
+
+ ruleMapper.updateAcrossClusterRuleByBatch(ids, newRule);
+ }
+
+ @Override
public void insertAcrossClusterRule(AcrossClusterRule acrossClusterRule) throws Exception {
Date time = new Date();
acrossClusterRule.setCreateTime(time);
@@ -91,13 +129,45 @@
}
@Override
- public void validAcrossClusterRule(Long id, String isValid) throws Exception {
- AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(id);
-
+ public void validAcrossClusterRule(Long id, String isValid, String username) throws Exception {
+ AcrossClusterRule beforeRule = ruleMapper.getAcrossClusterRule(id, username);
if (beforeRule == null) {
throw new Exception("acrossClusterRule not exit");
}
- ruleMapper.validAcrossClusterRule(isValid, id);
+ ruleMapper.validAcrossClusterRule(isValid, id, username);
+ }
+
+ @Override
+ public void validAcrossClusterRuleByBatch(List<Long> ids, String isValid) throws Exception {
+ ruleMapper.validAcrossClusterRuleByBatch(ids, isValid);
+ }
+
+ @Receiver
+ @Override
+ public AcrossClusterResponse getAcrossClusterRuleByUsername(
+ AcrossClusterRequest acrossClusterRequest, Sender sender) throws Exception {
+ String username = acrossClusterRequest.username();
+ AcrossClusterRule acrossClusterRule = ruleMapper.queryAcrossClusterRuleByUserName(username);
+ if (acrossClusterRule == null) {
+ return null;
+ }
+ String clusterName = acrossClusterRule.getClusterName();
+ Map<String, Map<String, String>> rulesMap = new HashMap<>();
+ try {
+ Gson gson = BDPJettyServerHelper.gson();
+ rulesMap = gson.fromJson(acrossClusterRule.getRules(), rulesMap.getClass());
+ Map<String, String> queueRule = rulesMap.get(KEY_QUEUE_RULE);
+ String crossQueueName = queueRule.get(KEY_CROSS_QUEUE);
+ logger.info(
+ "{} configure across cluster name is {}, queue name is {}",
+ username,
+ acrossClusterRule.getClusterName(),
+ crossQueueName);
+ return new AcrossClusterResponse(clusterName, crossQueueName);
+ } catch (Exception e) {
+ logger.warn("Failed to parse rulesMap from rules");
+ }
+ return null;
}
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/DepartmentServiceImpl.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/DepartmentServiceImpl.java
new file mode 100644
index 0000000..45e607b
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/DepartmentServiceImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.configuration.service.impl;
+
+import org.apache.linkis.configuration.dao.DepartmentMapper;
+import org.apache.linkis.configuration.entity.DepartmentVo;
+import org.apache.linkis.configuration.service.DepartmentService;
+import org.apache.linkis.governance.common.protocol.conf.DepartmentRequest;
+import org.apache.linkis.governance.common.protocol.conf.DepartmentResponse;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class DepartmentServiceImpl implements DepartmentService {
+
+ private static final Logger logger = LoggerFactory.getLogger(DepartmentServiceImpl.class);
+
+ @Autowired private DepartmentMapper departmentMapper;
+
+ @Receiver
+ @Override
+ public DepartmentResponse getDepartmentByUser(
+ DepartmentRequest departmentRequest, Sender sender) {
+ DepartmentVo departmentVo = departmentMapper.getDepartmentByUser(departmentRequest.user());
+ if (null == departmentVo) {
+ logger.warn(
+ "Department data loading failed user {},department cache will set '' ",
+ departmentRequest.user());
+ return new DepartmentResponse(departmentRequest.user(), "", "");
+ } else {
+ return new DepartmentResponse(
+ departmentRequest.user(), departmentVo.getOrgId(), departmentVo.getOrgName());
+ }
+ }
+}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantConfigServiceImpl.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantConfigServiceImpl.java
index a5eb4c1..07668b0 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantConfigServiceImpl.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantConfigServiceImpl.java
@@ -17,22 +17,28 @@
package org.apache.linkis.configuration.service.impl;
+import org.apache.linkis.configuration.dao.DepartmentMapper;
+import org.apache.linkis.configuration.dao.DepartmentTenantMapper;
import org.apache.linkis.configuration.dao.UserTenantMapper;
+import org.apache.linkis.configuration.entity.DepartmentTenantVo;
+import org.apache.linkis.configuration.entity.DepartmentVo;
import org.apache.linkis.configuration.entity.TenantVo;
import org.apache.linkis.configuration.exception.ConfigurationException;
import org.apache.linkis.configuration.service.TenantConfigService;
-import org.apache.linkis.configuration.util.HttpsUtil;
+import org.apache.linkis.configuration.util.ClientUtil;
import org.apache.linkis.governance.common.constant.job.JobRequestConstants;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@@ -46,6 +52,10 @@
@Autowired private UserTenantMapper userTenantMapper;
+ @Autowired private DepartmentTenantMapper departmentTenantMapper;
+
+ @Autowired private DepartmentMapper departmentMapper;
+
/**
* * Querying the tenant configuration table
*
@@ -104,9 +114,9 @@
throw new ConfigurationException("id can't be empty ");
}
dataProcessing(tenantVo);
- TenantVo tenantVoLowerCase = toLowerCase(tenantVo);
- logger.info("updateTenant : {}", tenantVoLowerCase);
- userTenantMapper.updateTenant(tenantVoLowerCase);
+ tenantVo.setUpdateTime(new Date());
+ logger.info("updateTenant : {}", tenantVo);
+ userTenantMapper.updateTenant(tenantVo);
}
/**
@@ -117,23 +127,27 @@
@Override
public void createTenant(TenantVo tenantVo) throws ConfigurationException {
dataProcessing(tenantVo);
- TenantVo tenantVoLowerCase = toLowerCase(tenantVo);
- tenantVoLowerCase.setCreateTime(new Date());
- logger.info("createTenant : {}", tenantVoLowerCase);
+ tenantVo.setCreateTime(new Date());
+ tenantVo.setUpdateTime(new Date());
+ logger.info("createTenant : {}", tenantVo);
userTenantMapper.createTenant(tenantVo);
}
private void dataProcessing(TenantVo tenantVo) throws ConfigurationException {
+ // If tenant is set to invalid, skip ecm check
+ if (("N").equals(tenantVo.getIsValid())) {
+ return;
+ }
AtomicReference<Boolean> tenantResult = new AtomicReference<>(false);
// Obtain the tenant information of the ECM list
- Map<String, Object> ecmListResult = null;
+ Map<String, Object> ecmList = null;
try {
- ecmListResult = HttpsUtil.sendHttp(null, null);
- logger.info("Request ecm list response {}:", ecmListResult);
+ ecmList = ClientUtil.getEcmList();
+ logger.info("Request ecm list response {}:", ecmList);
} catch (IOException e) {
- logger.warn("failed to get ecmResource data");
+ logger.warn("failed to get ecmResource data", e);
}
- Map<String, List<Map<String, Object>>> data = MapUtils.getMap(ecmListResult, "data");
+ Map<String, List<Map<String, Object>>> data = MapUtils.getMap(ecmList, "data");
List<Map<String, Object>> emNodeVoList = data.get("EMs");
// Compare ECM list tenant labels for task
emNodeVoList.forEach(
@@ -143,8 +157,8 @@
.filter(labelmap -> labelmap.containsKey("tenant"))
.forEach(
map -> {
- String tenant = map.get("tenant").toString().toLowerCase();
- if (tenant.equals(tenantVo.getTenantValue().toLowerCase())) {
+ String tenant = map.get("tenant").toString();
+ if (tenant.equals(tenantVo.getTenantValue())) {
tenantResult.set(true);
}
});
@@ -152,10 +166,10 @@
// Compare the value of ecm tenant
if (!tenantResult.get())
throw new ConfigurationException("The ECM with the corresponding label was not found");
- if (!tenantVo.getCreator().equals("*")) {
+ if (!("*").equals(tenantVo.getCreator())) {
// The beginning of tenantValue needs to contain creator
- String creator = tenantVo.getCreator().toLowerCase();
- String[] tenantArray = tenantVo.getTenantValue().toLowerCase().split("_");
+ String creator = tenantVo.getCreator();
+ String[] tenantArray = tenantVo.getTenantValue().split("_");
if (tenantArray.length > 1 && !creator.equals(tenantArray[0])) {
throw new ConfigurationException("tenantValue should contain creator first");
}
@@ -165,8 +179,7 @@
@Override
public Boolean isExist(String user, String creator) {
boolean result = true;
- Map<String, Object> resultMap =
- queryTenantList(user.toLowerCase(), creator.toLowerCase(), null, 1, 20);
+ Map<String, Object> resultMap = queryTenantList(user, creator, null, 1, 20);
Object tenantList = resultMap.getOrDefault(JobRequestConstants.TOTAL_PAGE(), 0);
int total = Integer.parseInt(tenantList.toString());
if (total == 0) result = false;
@@ -178,11 +191,66 @@
return userTenantMapper.queryTenant(user, creator);
}
- public TenantVo toLowerCase(TenantVo tenantVo) {
- tenantVo.setTenantValue(tenantVo.getTenantValue().toLowerCase());
- tenantVo.setCreator(tenantVo.getCreator().toLowerCase());
- tenantVo.setUser(tenantVo.getUser().toLowerCase());
- tenantVo.setUpdateTime(new Date());
- return tenantVo;
+ @Override
+ public void saveDepartmentTenant(DepartmentTenantVo departmentTenantVo)
+ throws ConfigurationException {
+ TenantVo tenantVo = new TenantVo();
+ BeanUtils.copyProperties(departmentTenantVo, tenantVo);
+ dataProcessing(tenantVo);
+ departmentTenantVo.setUpdateTime(new Date());
+ if (StringUtils.isBlank(departmentTenantVo.getId())) {
+ departmentTenantVo.setCreateTime(new Date());
+ departmentTenantMapper.insertTenant(departmentTenantVo);
+ } else {
+ departmentTenantMapper.updateTenant(departmentTenantVo);
+ }
+ }
+
+ /**
+ * *
+ *
+ * @param departmentId
+ * @param creator
+ * @param tenantValue
+ * @param pageNow
+ * @param pageSize
+ * @return
+ */
+ @Override
+ public Map<String, Object> queryDepartmentTenant(
+ String departmentId, String creator, String tenantValue, Integer pageNow, Integer pageSize) {
+ Map<String, Object> result = new HashMap<>(2);
+ List<DepartmentTenantVo> tenantVos = null;
+ PageHelper.startPage(pageNow, pageSize);
+ try {
+ tenantVos = departmentTenantMapper.queryTenantList(creator, departmentId, tenantValue);
+ } finally {
+ PageHelper.clearPage();
+ }
+ PageInfo<DepartmentTenantVo> pageInfo = new PageInfo<>(tenantVos);
+ result.put("tenantList", tenantVos);
+ result.put(JobRequestConstants.TOTAL_PAGE(), pageInfo.getTotal());
+ return result;
+ }
+
+ public void deleteDepartmentTenant(Integer id) {
+ departmentTenantMapper.deleteTenant(id);
+ }
+
+ @Override
+ public DepartmentTenantVo queryDepartTenant(String creator, String departmentId) {
+ return departmentTenantMapper.queryTenant(creator, departmentId);
+ }
+
+ @Override
+ public List<DepartmentVo> queryDepartmentList() {
+ return new ArrayList<>(
+ departmentMapper.queryDepartmentList().stream()
+ .collect(
+ Collectors.toMap(
+ DepartmentVo::getOrgId,
+ department -> department,
+ (existing, replacement) -> existing))
+ .values());
}
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantServiceImpl.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantServiceImpl.java
index 25d272c..91523cc 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantServiceImpl.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/service/impl/TenantServiceImpl.java
@@ -17,9 +17,12 @@
package org.apache.linkis.configuration.service.impl;
+import org.apache.linkis.configuration.entity.DepartmentTenantVo;
import org.apache.linkis.configuration.entity.TenantVo;
import org.apache.linkis.configuration.service.TenantConfigService;
import org.apache.linkis.configuration.service.TenantService;
+import org.apache.linkis.governance.common.protocol.conf.DepartTenantRequest;
+import org.apache.linkis.governance.common.protocol.conf.DepartTenantResponse;
import org.apache.linkis.governance.common.protocol.conf.TenantRequest;
import org.apache.linkis.governance.common.protocol.conf.TenantResponse;
import org.apache.linkis.rpc.Sender;
@@ -45,10 +48,36 @@
if (null == tenantVo) {
logger.warn(
"TenantCache user {} creator {} data loading failed", request.user(), request.creator());
- return new TenantResponse(request.user(), request.creator(), "");
+ return new TenantResponse(request.user(), request.creator(), "Y", "");
} else {
return new TenantResponse(
- tenantVo.getUser(), tenantVo.getCreator(), tenantVo.getTenantValue());
+ tenantVo.getUser(),
+ tenantVo.getCreator(),
+ tenantVo.getIsValid(),
+ tenantVo.getTenantValue());
+ }
+ }
+
+ @Receiver
+ @Override
+ public DepartTenantResponse getDepartTenantData(
+ DepartTenantRequest departTenantRequest, Sender sender) {
+ DepartmentTenantVo departmentTenantVo =
+ tenantConfigService.queryDepartTenant(
+ departTenantRequest.creator(), departTenantRequest.departmentId());
+ if (null == departmentTenantVo) {
+ logger.warn(
+ "DepartTenant data loading failed creator {} department {},departTenant cache will set '' ",
+ departTenantRequest.creator(),
+ departTenantRequest.departmentId());
+ return new DepartTenantResponse(
+ departTenantRequest.creator(), departTenantRequest.departmentId(), "Y", "");
+ } else {
+ return new DepartTenantResponse(
+ departmentTenantVo.getCreator(),
+ departmentTenantVo.getDepartmentId(),
+ departmentTenantVo.getIsValid(),
+ departmentTenantVo.getTenantValue());
}
}
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/HttpsUtil.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/ClientUtil.java
similarity index 93%
rename from linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/HttpsUtil.java
rename to linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/ClientUtil.java
index 57fd703..56e0caa 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/HttpsUtil.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/ClientUtil.java
@@ -37,15 +37,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HttpsUtil {
+public class ClientUtil {
- private static final Logger logger = LoggerFactory.getLogger(HttpsUtil.class);
+ private static final Logger logger = LoggerFactory.getLogger(ClientUtil.class);
public static DWSClientConfig dwsClientConfig = createClientConfig(null, null);
public static UJESClientImpl client = new UJESClientImpl(dwsClientConfig);
- public static Map<String, Object> sendHttp(String url, Map<String, Object> properties)
+ public static Map<String, Object> getEcmList(String url, Map<String, Object> properties)
throws IOException {
if (null == dwsClientConfig) {
dwsClientConfig = createClientConfig(url, properties);
@@ -58,6 +58,10 @@
return result.getResultMap();
}
+ public static Map<String, Object> getEcmList() throws IOException {
+ return getEcmList(null, null);
+ }
+
private static DWSClientConfig createClientConfig(String url, Map<String, Object> properties) {
String realUrl = "";
if (StringUtils.isBlank(url)) {
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/CommonUtils.java b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/CommonUtils.java
index 2d3f9b2..d689881 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/CommonUtils.java
+++ b/linkis-public-enhancements/linkis-configuration/src/main/java/org/apache/linkis/configuration/util/CommonUtils.java
@@ -42,28 +42,45 @@
public static String ruleMap2String(
String startTime,
String endTime,
- String CPUThreshold,
- String MemoryThreshold,
- String CPUPercentageThreshold,
- String MemoryPercentageThreshold)
+ String crossQueue,
+ String priorityCluster,
+ String targetCPUThreshold,
+ String targetMemoryThreshold,
+ String targetCPUPercentageThreshold,
+ String targetMemoryPercentageThreshold,
+ String originCPUPercentageThreshold,
+ String originMemoryPercentageThreshold)
throws JsonProcessingException {
- Map<String, String> queueRuleMap = new HashMap<>();
Map<String, String> timeRuleMap = new HashMap<>();
- Map<String, String> thresholdRuleMap = new HashMap<>();
+ Map<String, String> queueRuleMap = new HashMap<>();
+ Map<String, String> targetClusterRuleMap = new HashMap<>();
+ Map<String, String> originClusterRuleMap = new HashMap<>();
+ Map<String, String> priorityClusterRuleMap = new HashMap<>();
Map<String, Object> ruleMap = new HashMap<>();
- queueRuleMap.put(KEY_QUEUE_SUFFIX, KEY_ACROSS_CLUSTER_QUEUE_SUFFIX);
timeRuleMap.put(KEY_START_TIME, startTime);
timeRuleMap.put(KEY_END_TIME, endTime);
- thresholdRuleMap.put(KEY_CPU_THRESHOLD, CPUThreshold);
- thresholdRuleMap.put(KEY_MEMORY_THRESHOLD, MemoryThreshold);
- thresholdRuleMap.put(KEY_CPU_PERCENTAGE_THRESHOLD, CPUPercentageThreshold);
- thresholdRuleMap.put(KEY_MEMORY_PERCENTAGE_THRESHOLD, MemoryPercentageThreshold);
- ruleMap.put(KEY_QUEUE_RULE, queueRuleMap);
+ queueRuleMap.put(KEY_CROSS_QUEUE, crossQueue);
+ targetClusterRuleMap.put(KEY_TARGET_CPU_THRESHOLD, targetCPUThreshold);
+ targetClusterRuleMap.put(KEY_TARGET_MEMORY_THRESHOLD, targetMemoryThreshold);
+ targetClusterRuleMap.put(KEY_TARGET_CPU_PERCENTAGE_THRESHOLD, targetCPUPercentageThreshold);
+ targetClusterRuleMap.put(
+ KEY_TARGET_MEMORY_PERCENTAGE_THRESHOLD, targetMemoryPercentageThreshold);
+ originClusterRuleMap.put(KEY_ORIGIN_CPU_PERCENTAGE_THRESHOLD, originCPUPercentageThreshold);
+ originClusterRuleMap.put(
+ KEY_ORIGIN_MEMORY_PERCENTAGE_THRESHOLD, originMemoryPercentageThreshold);
+ priorityClusterRuleMap.put(KEY_PRIORITY_CLUSTER, priorityCluster);
ruleMap.put(KEY_TIME_RULE, timeRuleMap);
- ruleMap.put(KEY_THRESHOLD_RULE, thresholdRuleMap);
+ ruleMap.put(KEY_QUEUE_RULE, queueRuleMap);
+ ruleMap.put(KEY_TARGET_CLUSTER_RULE, targetClusterRuleMap);
+ ruleMap.put(KEY_ORIGIN_CLUSTER_RULE, originClusterRuleMap);
+ ruleMap.put(KEY_PRIORITY_CLUSTER_RULE, priorityClusterRuleMap);
ObjectMapper map2Json = BDPJettyServerHelper.jacksonJson();
String rules = map2Json.writeValueAsString(ruleMap);
return rules;
}
+
+ public static String concatQueue(String crossQueue) {
+ return String.format("\"crossQueue\":\"%s\"", crossQueue);
+ }
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/DepartmentMapper.xml b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/DepartmentMapper.xml
new file mode 100644
index 0000000..9b7db1e
--- /dev/null
+++ b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/common/DepartmentMapper.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+
+<mapper namespace="org.apache.linkis.configuration.dao.DepartmentMapper">
+
+ <sql id="Department_Column_List">
+ cluster_code,user_type,user_name,org_id,org_name,queue_name,db_name,interface_user,is_union_analyse,create_time,user_itsm_no
+ </sql>
+
+
+ <select id="getDepartmentByUser" resultType="org.apache.linkis.configuration.entity.DepartmentVo">
+ select <include refid="Department_Column_List"/>
+ from linkis_org_user
+ <where>
+ <if test="user != null"> `user_name` = #{user}</if>
+ </where>
+ </select>
+ <select id="queryDepartmentList" resultType="org.apache.linkis.configuration.entity.DepartmentVo">
+ select <include refid="Department_Column_List"/> from linkis_org_user where org_id is NOT NULL
+ </select>
+</mapper>
\ No newline at end of file
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/AcrossClusterRuleMapper.xml b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/AcrossClusterRuleMapper.xml
index 2d6c189..adb6c31 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/AcrossClusterRuleMapper.xml
+++ b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/AcrossClusterRuleMapper.xml
@@ -47,14 +47,15 @@
acr.username, acr.create_time, acr.create_by,
acr.update_time, acr.update_by, acr.rules
FROM
- `linkis_ps_configutation_lm_across_cluster_rule` acr
+ `linkis_ps_configuration_across_cluster_rule` acr
WHERE
id = #{id}
+ <if test="username != null">and username = #{username}</if>
</select>
<select id="queryAcrossClusterRuleList" resultMap="acrossClusterRuleMap">
SELECT <include refid="AcrossClusterRule_Column_List"/>
- FROM `linkis_ps_configutation_lm_across_cluster_rule`
+ FROM `linkis_ps_configuration_across_cluster_rule`
<where>
<if test="username != null"> username = #{username}</if>
<if test="creator != null">and creator = #{creator}</if>
@@ -65,7 +66,7 @@
<insert id="insertAcrossClusterRule" parameterType="org.apache.linkis.configuration.entity.AcrossClusterRule">
INSERT INTO
- linkis_ps_configutation_lm_across_cluster_rule (<include refid="AcrossClusterRule_insert_Column_List"/>)
+ linkis_ps_configuration_across_cluster_rule (<include refid="AcrossClusterRule_insert_Column_List"/>)
VALUES
(#{acrossClusterRule.clusterName},#{acrossClusterRule.creator}, #{acrossClusterRule.username},
<choose>
@@ -91,14 +92,42 @@
<delete id="deleteAcrossClusterRule">
DELETE
FROM
- `linkis_ps_configutation_lm_across_cluster_rule`
+ `linkis_ps_configuration_across_cluster_rule`
WHERE
- creator = #{creator} AND username = #{username}
+ id = #{id}
+ </delete>
+
+ <delete id="deleteAcrossClusterRuleByBatch">
+ DELETE
+ FROM
+ `linkis_ps_configuration_across_cluster_rule`
+ WHERE
+ id IN
+ <foreach item="id" collection="ids" open="(" separator="," close=")">
+ #{id}
+ </foreach>
+ </delete>
+
+ <delete id="deleteAcrossClusterRuleByUsername">
+ DELETE
+ FROM
+ `linkis_ps_configuration_across_cluster_rule`
+ WHERE
+ username = #{username}
+ </delete>
+
+ <delete id="deleteAcrossClusterRuleByCrossQueue">
+ DELETE
+ FROM
+ `linkis_ps_configuration_across_cluster_rule`
+ WHERE
+ rules
+ LIKE CONCAT('%',#{crossQueue},'%')
</delete>
<update id="updateAcrossClusterRule" parameterType="org.apache.linkis.configuration.entity.AcrossClusterRule">
UPDATE
- `linkis_ps_configutation_lm_across_cluster_rule`
+ `linkis_ps_configuration_across_cluster_rule`
SET
cluster_name = #{acrossClusterRule.clusterName}, creator = #{acrossClusterRule.creator},
username=#{acrossClusterRule.username}, create_time=#{acrossClusterRule.createTime},
@@ -117,14 +146,58 @@
id = #{acrossClusterRule.id}
</update>
+ <update id="updateAcrossClusterRuleByBatch" parameterType="org.apache.linkis.configuration.entity.AcrossClusterRule">
+ UPDATE
+ `linkis_ps_configuration_across_cluster_rule`
+ SET
+ cluster_name = #{acrossClusterRule.clusterName}, creator = #{acrossClusterRule.creator},
+ <choose>
+ <when test="acrossClusterRule.updateTime != null">
+ update_time=#{acrossClusterRule.updateTime}
+ </when>
+ <otherwise>
+ update_time = now()
+ </otherwise>
+ </choose>,
+ update_By=#{acrossClusterRule.updateBy}, rules=#{acrossClusterRule.rules},
+ is_valid=#{acrossClusterRule.isValid}
+ WHERE
+ id IN
+ <foreach item="id" collection="ids" open="(" separator="," close=")">
+ #{id}
+ </foreach>
+ </update>
+
<update id="validAcrossClusterRule">
UPDATE
- `linkis_ps_configutation_lm_across_cluster_rule`
+ `linkis_ps_configuration_across_cluster_rule`
SET
is_valid = #{isValid}
WHERE
id = #{id}
+ <if test="username != null">and username = #{username}</if>
</update>
+ <update id="validAcrossClusterRuleByBatch">
+ UPDATE
+ `linkis_ps_configuration_across_cluster_rule`
+ SET
+ is_valid = #{isValid}
+ WHERE
+ id IN
+ <foreach item="id" collection="ids" open="(" separator="," close=")">
+ #{id}
+ </foreach>
+ </update>
+ <select id="queryAcrossClusterRuleByUserName" resultMap="acrossClusterRuleMap">
+ SELECT
+ acr.id, acr.cluster_name, acr.creator,
+ acr.username, acr.create_time, acr.create_by,
+ acr.update_time, acr.update_by, acr.rules
+ FROM
+ `linkis_ps_configuration_across_cluster_rule` acr
+ WHERE
+ username = #{username} limit 1
+ </select>
</mapper>
\ No newline at end of file
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/UserTenantMapper.xml b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/UserTenantMapper.xml
index 5d8099f..1494eae 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/UserTenantMapper.xml
+++ b/linkis-public-enhancements/linkis-configuration/src/main/resources/mapper/mysql/UserTenantMapper.xml
@@ -29,14 +29,15 @@
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP"/>
<result column="desc" property="desc" jdbcType="VARCHAR"/>
<result column="bussiness_user" property="bussinessUser" jdbcType="VARCHAR"/>
+ <result column="is_valid" property="isValid" jdbcType="VARCHAR"/>
</resultMap>
<sql id="Tenant_Column_List">
- id,user,creator,tenant_value,create_time,update_time,`desc`,bussiness_user
+ id,user,creator,tenant_value,create_time,update_time,`desc`,bussiness_user,is_valid
</sql>
<sql id="Tenant_insert_Column_List">
- `user`,creator,tenant_value,create_time,update_time,`desc`,bussiness_user
+ `user`,creator,tenant_value,create_time,update_time,`desc`,bussiness_user,is_valid
</sql>
<insert id="createTenant" parameterType="org.apache.linkis.configuration.entity.TenantVo">
@@ -47,7 +48,8 @@
#{createTime},
#{updateTime},
#{desc},
- #{bussinessUser}
+ #{bussinessUser},
+ #{isValid}
)
</insert>
@@ -68,22 +70,23 @@
<update id="updateTenant" parameterType="org.apache.linkis.configuration.entity.TenantVo">
UPDATE linkis_cg_tenant_label_config
- <set>
+ <trim prefix="set" suffixOverrides=",">
<if test="user != null ">user = #{user},</if>
<if test="creator != null">creator = #{creator},</if>
<if test="tenantValue != null">tenant_value = #{tenantValue},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="desc != null">`desc` = #{desc},</if>
- <if test="bussinessUser != null">bussiness_user = #{bussinessUser}</if>
- </set>
+ <if test="bussinessUser != null">bussiness_user = #{bussinessUser},</if>
+ <if test="isValid != null">is_valid = #{isValid}</if>
+ </trim>
WHERE id = #{id}
</update>
<select id="queryTenant" resultMap="TenantMap">
- SELECT <include refid="Tenant_Column_List"/>
- FROM linkis_cg_tenant_label_config
+ select <include refid="Tenant_Column_List"/>
+ from linkis_cg_tenant_label_config
<where>
<if test="user != null">`user` = #{user}</if>
<if test="creator != null">and `creator` = #{creator}</if>
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala
index 963837a..b51d967 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala
+++ b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/conf/Configuration.scala
@@ -46,4 +46,7 @@
val REMOVE_APPLICATION_CACHE =
CommonVars.apply("linkis.configuration.remove.application.cache", "IDE").getValue
+ val USE_USER_DEFAULE_VALUE =
+ CommonVars.apply("wds.linkis.configuration.use.user.default.value", true).getValue
+
}
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala
index 3f86697..e94c65b 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala
+++ b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/service/ConfigurationService.scala
@@ -339,7 +339,8 @@
val engineconfig =
defaultEngineConfigs.asScala.find(_.getKey.equals(creatorConfig.getKey))
if (engineconfig.isDefined) {
- engineconfig.get.setDefaultValue(creatorConfig.getConfigValue)
+ engineconfig.get.setDefaultValue(creatorConfig.getDefaultValue)
+ engineconfig.get.setConfigValue(creatorConfig.getConfigValue)
} else {
defaultEngineConfigs.add(creatorConfig)
}
@@ -375,30 +376,32 @@
}
var defaultEngineConfigs: util.List[ConfigKeyValue] = new util.ArrayList[ConfigKeyValue]()
var defaultCreatorConfigs: util.List[ConfigKeyValue] = new util.ArrayList[ConfigKeyValue]()
+ var defaultUserConfigs: util.List[ConfigKeyValue] = new util.ArrayList[ConfigKeyValue]()
if (useDefaultConfig) {
- val defaultCretorLabelList = LabelParameterParser.changeUserToDefault(labelList, false)
- val defaultCreatorCombinedLabel =
- combinedLabelBuilder.build("", defaultCretorLabelList).asInstanceOf[CombinedLabelImpl]
- val defaultCreatorLabel = labelMapper.getLabelByKeyValue(
- defaultCreatorCombinedLabel.getLabelKey,
- defaultCreatorCombinedLabel.getStringValue
+ // query *-ide default conf
+ val defaultCreatorLabel = getConfByLabelList(
+ LabelParameterParser.changeUserToDefault(labelList, false)
)
if (defaultCreatorLabel != null) {
defaultCreatorConfigs = getConfigByLabelId(defaultCreatorLabel.getId, language)
}
- val defaultEngineLabelList = LabelParameterParser.changeUserToDefault(labelList)
- val defaultEngineCombinedLabel =
- combinedLabelBuilder.build("", defaultEngineLabelList).asInstanceOf[CombinedLabelImpl]
- val defaultEngineLabel = labelMapper.getLabelByKeyValue(
- defaultEngineCombinedLabel.getLabelKey,
- defaultEngineCombinedLabel.getStringValue
+ // query user-* default conf
+ val defaultUserLabel = getConfByLabelList(
+ LabelParameterParser.changeUserToDefault(labelList, true, false)
+ )
+ if (defaultUserLabel != null) {
+ defaultUserConfigs = getConfigByLabelId(defaultUserLabel.getId, language)
+ }
+ // query *-* default conf
+ val defaultEngineLabel = getConfByLabelList(
+ LabelParameterParser.changeUserToDefault(labelList)
)
if (defaultEngineLabel != null) {
defaultEngineConfigs = getConfigByLabelId(defaultEngineLabel.getId, language)
}
if (CollectionUtils.isEmpty(defaultEngineConfigs)) {
logger.warn(
- s"The default configuration is empty. Please check the default configuration information in the database table(默认配置为空,请检查数据库表中关于标签${defaultEngineCombinedLabel.getStringValue}的默认配置信息是否完整)"
+ s"The default configuration is empty. Please check the default configuration information in the database table(默认配置为空,请检查数据库表中关于标签 *-* 的默认配置信息是否完整)"
)
}
val userCreatorLabel = labelList.asScala
@@ -408,6 +411,9 @@
if (Configuration.USE_CREATOR_DEFAULE_VALUE && userCreatorLabel.getCreator != "*") {
replaceCreatorToEngine(defaultCreatorConfigs, defaultEngineConfigs)
}
+ if (Configuration.USE_USER_DEFAULE_VALUE && userCreatorLabel.getUser != "*") {
+ replaceCreatorToEngine(defaultUserConfigs, defaultEngineConfigs)
+ }
}
// add special config limit info
@@ -441,6 +447,11 @@
(configs, defaultEngineConfigs)
}
+ private def getConfByLabelList(labelList: java.util.List[Label[_]]): ConfigLabel = {
+ val combinedLabel = combinedLabelBuilder.build("", labelList).asInstanceOf[CombinedLabelImpl]
+ labelMapper.getLabelByKeyValue(combinedLabel.getLabelKey, combinedLabel.getStringValue)
+ }
+
/**
* Priority: User Configuration-->Creator's Default Engine Configuration-->Default Engine
* Configuration For database initialization: you need to modify the linkis.dml file--associated
diff --git a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/util/LabelParameterParser.scala b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/util/LabelParameterParser.scala
index 67f14b5..19cdc3a 100644
--- a/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/util/LabelParameterParser.scala
+++ b/linkis-public-enhancements/linkis-configuration/src/main/scala/org/apache/linkis/configuration/util/LabelParameterParser.scala
@@ -63,14 +63,16 @@
def changeUserToDefault(
labelList: java.util.List[Label[_]],
- withCreator: Boolean = true
+ withCreator: Boolean = true,
+ withUser: Boolean = true
): java.util.List[Label[_]] = {
val newList = new util.LinkedList[Label[_]]()
if (labelList != null) {
labelList.asScala.foreach(label => {
if (label.isInstanceOf[UserCreatorLabel]) {
val newLabel = labelBuilderFactory.createLabel(classOf[UserCreatorLabel])
- newLabel.setUser("*")
+ if (withUser) newLabel.setUser("*")
+ else newLabel.setUser(label.asInstanceOf[UserCreatorLabel].getUser)
if (withCreator) newLabel.setCreator("*")
else newLabel.setCreator(label.asInstanceOf[UserCreatorLabel].getCreator)
newList.addLast(newLabel)