blob: e65c20473e87af1df5a9f0fba3a57cd3e86f3784 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.enginemanager.hook
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.enginemanager.{Engine, EngineHook}
import com.webank.wedatasphere.linkis.enginemanager.conf.EngineManagerConfiguration.ENGINE_UDF_APP_NAME
import com.webank.wedatasphere.linkis.protocol.engine.RequestEngine
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.udf.api.rpc.{RequestUdfTree, ResponseUdfTree}
import com.webank.wedatasphere.linkis.udf.entity.{UDFInfo, UDFTree}
import org.apache.commons.collections.CollectionUtils
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.StringUtils
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.JavaConversions._
import scala.collection.mutable
class JarLoaderEngineHook extends EngineHook with Logging{
override def beforeCreateSession(requestEngine: RequestEngine): RequestEngine = {
info("start loading UDFs")
val udfInfos = extractUdfInfos(requestEngine).filter{info => info.getUdfType == 0 && info.getExpire == false && StringUtils.isNotBlank(info.getPath) && isJarExists(info) && info.getLoad == true }
// add to class path
val jars = new mutable.HashSet[String]()
udfInfos.foreach{udfInfo => jars.add("file://" + udfInfo.getPath)}
val jarPaths = jars.mkString(",")
if(StringUtils.isBlank(requestEngine.properties.get("jars"))){
requestEngine.properties.put("jars", jarPaths)
} else {
requestEngine.properties.put("jars", requestEngine.properties.get("jars") + "," + jarPaths)
}
info("added jars: " + jarPaths)
//jars.foreach(fetchRemoteFile)
//info("copied jars.")
info("end loading UDFs")
requestEngine
}
override def afterCreatedSession(engine: Engine, requestEngine: RequestEngine): Unit = {
}
protected def isJarExists(udfInfo: UDFInfo) : Boolean = {
true
// if(FileUtils.getFile(udfInfo.getPath).exists()){
// true
// } else {
// info(s"The jar file [${udfInfo.getPath}] of UDF [${udfInfo.getUdfName}] doesn't exist, ignore it.")
// false
// }
}
protected def extractUdfInfos(requestEngine: RequestEngine): mutable.ArrayBuffer[UDFInfo] = {
val udfInfoBuilder = new mutable.ArrayBuffer[UDFInfo]
val userName = requestEngine.user
val udfTree = queryUdfRpc(userName)
extractUdfInfos(udfInfoBuilder, udfTree, userName)
udfInfoBuilder
}
protected def extractUdfInfos(udfInfoBuilder: mutable.ArrayBuffer[UDFInfo], udfTree: UDFTree, userName: String) : Unit = {
if(CollectionUtils.isNotEmpty(udfTree.getUdfInfos)){
for(udfInfo <- udfTree.getUdfInfos){
udfInfoBuilder.append(udfInfo)
}
}
if(CollectionUtils.isNotEmpty(udfTree.getChildrens)){
for(child <- udfTree.getChildrens){
var childInfo = child
if(TreeType.specialTypes.contains(child.getUserName)){
childInfo = queryUdfRpc(userName, child.getId, child.getUserName)
} else {
childInfo = queryUdfRpc(userName, child.getId, TreeType.SELF)
}
extractUdfInfos(udfInfoBuilder, childInfo, userName)
}
}
}
private def queryUdfRpc(userName: String, treeId: Long = -1, treeType: String = "self"): UDFTree = {
val udfTree = Sender.getSender(ENGINE_UDF_APP_NAME.getValue)
.ask(RequestUdfTree(userName, treeType, treeId, "udf"))
.asInstanceOf[ResponseUdfTree]
.udfTree
//info("got udf tree:" + new ObjectMapper().writer().withDefaultPrettyPrinter().writeValueAsString(udfTree))
udfTree
}
}