Merge pull request #163 from yangzhiyue/master
close #162, I contributed the bml-engine-hook
diff --git a/bml/bml-engine-hook/pom.xml b/bml/bml-engine-hook/pom.xml
new file mode 100644
index 0000000..037a4ab
--- /dev/null
+++ b/bml/bml-engine-hook/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>linkis</artifactId>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <version>0.9.1</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>linkis-bml-hook</artifactId>
+ <version>${linkis.version}</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <artifactId>linkis-bmlclient</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <artifactId>linkis-bmlcommon</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <artifactId>linkis-ujes-engine</artifactId>
+ <version>${linkis.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+
+
+</project>
\ No newline at end of file
diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala
new file mode 100644
index 0000000..a8d04c8
--- /dev/null
+++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala
@@ -0,0 +1,11 @@
+package com.webank.wedatasphere.linkis.bml.conf
+
+import com.webank.wedatasphere.linkis.common.conf.CommonVars
+
+/**
+ * created by cooperyang on 2019/9/23
+ * Description:
+ */
+object BmlHookConf {
+ val WORK_DIR_STR = CommonVars("wds.linkis.bml.work.dir", "user.dir")
+}
diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala
new file mode 100644
index 0000000..b67b065
--- /dev/null
+++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala
@@ -0,0 +1,9 @@
+package com.webank.wedatasphere.linkis.bml.exception
+
+import com.webank.wedatasphere.linkis.common.exception.ErrorException
+
+/**
+ * created by cooperyang on 2019/9/25
+ * Description:
+ */
+case class BmlHookDownloadException(errMsg:String) extends ErrorException(50046, errMsg)
diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala
new file mode 100644
index 0000000..28f3cc0
--- /dev/null
+++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala
@@ -0,0 +1,78 @@
+package com.webank.wedatasphere.linkis.bml.hook
+
+import java.io.File
+import java.util
+
+import com.webank.wedatasphere.linkis.bml.client.{BmlClient, BmlClientFactory}
+import com.webank.wedatasphere.linkis.bml.exception.BmlHookDownloadException
+import com.webank.wedatasphere.linkis.bml.utils.BmlHookUtils
+import com.webank.wedatasphere.linkis.common.exception.ErrorException
+import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
+import com.webank.wedatasphere.linkis.engine.ResourceExecuteRequest
+import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
+import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
+import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest
+import org.apache.commons.lang.StringUtils
+
+import scala.collection.JavaConversions._
+/**
+ * created by cooperyang on 2019/9/23
+ * Description:
+ */
+class BmlEnginePreExecuteHook extends EnginePreExecuteHook with Logging{
+ override val hookName: String = "BmlEnginePreExecuteHook"
+
+ val RESOURCES_STR = "resources"
+
+ val RESOURCE_ID_STR = "resourceId"
+
+ val VERSION_STR = "version"
+
+ val FILE_NAME_STR = "fileName"
+
+ val processUser:String = System.getProperty("user.name")
+
+ val defaultUser:String = "hadoop"
+
+ val bmlClient:BmlClient = if (StringUtils.isNotEmpty(processUser))
+ BmlClientFactory.createBmlClient(processUser) else BmlClientFactory.createBmlClient(defaultUser)
+
+ val seperator:String = File.separator
+
+ val pathType:String = "file://"
+
+ override def callPreExecuteHook(engineExecutorContext: EngineExecutorContext, executeRequest: ExecuteRequest): Unit = {
+ //1.删除工作目录以前的资源文件
+ //2.下载资源到当前进程的工作目录
+
+ val workDir = BmlHookUtils.getCurrentWorkDir
+ val jobId = engineExecutorContext.getJobId
+ executeRequest match {
+ case resourceExecuteRequest:ResourceExecuteRequest => val resources = resourceExecuteRequest.resources
+ resources foreach {
+ case resource:util.Map[String, Object] => val fileName = resource.get(FILE_NAME_STR).toString
+ val resourceId = resource.get(RESOURCE_ID_STR).toString
+ val version = resource.get(VERSION_STR).toString
+ val fullPath = if (workDir.endsWith(seperator)) pathType + workDir + fileName else
+ pathType + workDir + seperator + fileName
+ val response = Utils.tryCatch{
+ bmlClient.downloadResource(processUser, resourceId, version, fullPath, true)
+ }{
+ case error:ErrorException => logger.error("download resource for {} failed", error)
+ throw error
+ case t:Throwable => logger.error(s"download resource for $jobId failed", t)
+ val e1 = BmlHookDownloadException(t.getMessage)
+ e1.initCause(t)
+ throw t
+ }
+ if (response.isSuccess){
+ logger.info(s"for job $jobId resourceId $resourceId version $version download to path $fullPath ok")
+ }else{
+ logger.warn(s"for job $jobId resourceId $resourceId version $version download to path $fullPath Failed")
+ }
+ case _ => logger.warn("job resource cannot download")
+ }
+ case _ =>
+ }
+ }
+}
diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala
new file mode 100644
index 0000000..632e9f9
--- /dev/null
+++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala
@@ -0,0 +1,29 @@
+package com.webank.wedatasphere.linkis.bml.hook
+
+/**
+ * created by cooperyang on 2019/9/23
+ * Description:
+ */
+
+case class ResourceVersion(resourceId:String, version:String)
+
+
+trait BmlResourceParser {
+ /**
+ * 通过传入的code
+ * @param code
+ * @return
+ */
+ def getResource(code:String):Array[ResourceVersion]
+}
+
+
+object DefaultBmlResourceParser extends BmlResourceParser{
+ /**
+ * 通过传入的code
+ *
+ * @param code
+ * @return
+ */
+ override def getResource(code: String): Array[ResourceVersion] = Array.empty
+}
\ No newline at end of file
diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala
new file mode 100644
index 0000000..d82b46b
--- /dev/null
+++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala
@@ -0,0 +1,20 @@
+package com.webank.wedatasphere.linkis.bml.utils
+
+import com.webank.wedatasphere.linkis.common.utils.Utils
+
+/**
+ * created by cooperyang on 2019/9/24
+ * Description:
+ */
+object BmlHookUtils {
+ val WORK_DIR_STR = "user.dir"
+ def getCurrentWorkDir:String = System.getProperty(WORK_DIR_STR)
+
+
+ def deleteAllFiles(workDir:String):Unit = {
+
+ }
+
+
+
+}
diff --git a/pom.xml b/pom.xml
index f6f0d0b..e22161c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,8 +77,10 @@
<module>publicService/configuration</module>
<module>publicService/variable</module>
<module>publicService/workspace</module>
+ <module>publicService/workspace/client/workspace-httpclient</module>
<module>metadata</module>
<module>ujes/engine</module>
+ <module>bml/bml-engine-hook</module>
<module>ujes/enginemanager</module>
<module>ujes/entrance</module>
<module>ujes/definedEngines/spark/engine</module>
diff --git a/ujes/definedEngines/python/engine/pom.xml b/ujes/definedEngines/python/engine/pom.xml
index eadecd3..c615032 100644
--- a/ujes/definedEngines/python/engine/pom.xml
+++ b/ujes/definedEngines/python/engine/pom.xml
@@ -65,6 +65,11 @@
<artifactId>scalatest_2.11</artifactId>
<version>2.2.6</version>
</dependency>
+ <dependency>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <artifactId>linkis-bml-hook</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/ujes/definedEngines/spark/engine/pom.xml b/ujes/definedEngines/spark/engine/pom.xml
index fcf86d5..1981f34 100644
--- a/ujes/definedEngines/spark/engine/pom.xml
+++ b/ujes/definedEngines/spark/engine/pom.xml
@@ -57,6 +57,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>com.webank.wedatasphere.linkis</groupId>
+ <artifactId>linkis-bml-hook</artifactId>
+ <version>${linkis.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala
new file mode 100644
index 0000000..52d2998
--- /dev/null
+++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala
@@ -0,0 +1,9 @@
+package com.webank.wedatasphere.linkis.engine
+
+/**
+ * created by cooperyang on 2019/11/29
+ * Description:
+ */
+trait ResourceExecuteRequest {
+ val resources:java.util.List[Object]
+}
diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala
index 2512a10..3de772f 100644
--- a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala
+++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala
@@ -59,5 +59,5 @@
val ENGINE_PUSH_PROGRESS_TO_ENTRANCE = CommonVars("wds.linkis.engine.push.progress.enable", true)
-
+ val ENGINE_PRE_EXECUTE_HOOK_CLASSES = CommonVars("wds.linkis.engine.pre.hook.class", "com.webank.wedatasphere.linkis.bml.hook.BmlEnginePreExecuteHook")
}
diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala
index dd192e8..95c4107 100644
--- a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala
+++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala
@@ -18,13 +18,17 @@
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
+import com.webank.wedatasphere.linkis.engine.conf.EngineConfiguration
import com.webank.wedatasphere.linkis.engine.exception.EngineErrorException
+import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
import com.webank.wedatasphere.linkis.resourcemanager.Resource
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils
+import scala.collection.mutable.ArrayBuffer
+
/**
* Created by enjoyyin on 2018/9/17.
*/
@@ -41,6 +45,24 @@
private var succeedNum = 0
+ private val enginePreExecuteHooks:Array[EnginePreExecuteHook] = {
+ val hooks = new ArrayBuffer[EnginePreExecuteHook]()
+ EngineConfiguration.ENGINE_PRE_EXECUTE_HOOK_CLASSES.getValue.split(",") foreach {
+ hookStr => Utils.tryCatch{
+ val clazz = Class.forName(hookStr)
+ val obj = clazz.newInstance()
+ obj match {
+ case hook:EnginePreExecuteHook => hooks += hook
+ case _ => logger.warn(s"obj is not a engineHook obj is ${obj.getClass}")
+ }
+ }{
+ case e:Exception => logger.error("failed to load class", e)
+ }
+ }
+ hooks.toArray
+ }
+
+
def setCodeParser(codeParser: CodeParser) = this.codeParser = Some(codeParser)
def setResultSetListener(resultSetListener: ResultSetListener) = this.resultSetListener = Some(resultSetListener)
def getResultSetListener = resultSetListener
@@ -95,6 +117,15 @@
else if(isSupportParallelism) whenAvailable(f) else ensureIdle(f)
ensureOp {
val engineExecutorContext = createEngineExecutorContext(executeRequest)
+ Utils.tryCatch{
+ enginePreExecuteHooks foreach {
+ hook => logger.info(s"${hook.hookName} begins to do a hook")
+ hook.callPreExecuteHook(engineExecutorContext, executeRequest)
+ logger.info(s"${hook.hookName} ends to do a hook")
+ }
+ }{
+ case e:Exception => logger.info("failed to do with hook")
+ }
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes = Utils.tryCatch(codeParser.map(_.parse(executeRequest.code, engineExecutorContext)).getOrElse(Array(executeRequest.code))){
diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala
new file mode 100644
index 0000000..1bbbfcb
--- /dev/null
+++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala
@@ -0,0 +1,13 @@
+package com.webank.wedatasphere.linkis.engine.extension
+
+import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
+import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest
+
+/**
+ * created by cooperyang on 2019/11/29
+ * Description:
+ */
+trait EnginePreExecuteHook {
+ val hookName:String
+ def callPreExecuteHook(engineExecutorContext:EngineExecutorContext, executeRequest: ExecuteRequest)
+}
\ No newline at end of file