blob: e7d63672a4abcc0d08f9f0dad0e58c2d8ba40c3c [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.bml.client.impl
import java.io.{File, IOException, InputStream}
import java.util
import com.webank.wedatasphere.linkis.bml.client.AbstractBmlClient
import com.webank.wedatasphere.linkis.bml.common._
import com.webank.wedatasphere.linkis.bml.conf.BmlConfiguration
import com.webank.wedatasphere.linkis.bml.http.HttpConf
import com.webank.wedatasphere.linkis.bml.protocol._
import com.webank.wedatasphere.linkis.bml.request._
import com.webank.wedatasphere.linkis.bml.response._
import com.webank.wedatasphere.linkis.common.io.FsPath
import com.webank.wedatasphere.linkis.httpclient.authentication.AuthenticationStrategy
import com.webank.wedatasphere.linkis.httpclient.config.{ClientConfig, ClientConfigBuilder}
import com.webank.wedatasphere.linkis.httpclient.dws.DWSHttpClient
import com.webank.wedatasphere.linkis.httpclient.dws.authentication.{StaticAuthenticationStrategy, TokenAuthenticationStrategy}
import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig
import com.webank.wedatasphere.linkis.storage.FSFactory
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
import org.slf4j.{Logger, LoggerFactory}
/**
* created by cooperyang on 2019/5/23
* Description:
*/
class HttpBmlClient extends AbstractBmlClient{
private val logger:Logger = LoggerFactory.getLogger(classOf[HttpBmlClient])
val serverUrl:String = HttpConf.gatewayInstance
val maxConnection:Int = 10
val readTimeout:Int = 10000
val authenticationStrategy:AuthenticationStrategy = new TokenAuthenticationStrategy()
val clientConfig:ClientConfig = ClientConfigBuilder.newBuilder().addUJESServerUrl(serverUrl)
.connectionTimeout(30000).discoveryEnabled(false)
.loadbalancerEnabled(false).maxConnectionSize(maxConnection)
.retryEnabled(false).readTimeout(readTimeout)
.setAuthenticationStrategy(authenticationStrategy).setAuthTokenKey(BmlConfiguration.AUTH_TOKEN_KEY.getValue)
.setAuthTokenValue(BmlConfiguration.AUTH_TOKEN_VALUE.getValue).build()
val dwsClientConfig:DWSClientConfig = new DWSClientConfig(clientConfig)
dwsClientConfig.setDWSVersion(BmlConfiguration.DWS_VERSION.getValue)
val dwsClientName:String = "BML-Client"
val dwsClient:DWSHttpClient = new DWSHttpClient(dwsClientConfig, dwsClientName)
val FIRST_VERSION:String = "v000001"
override def downloadResource(user:String, resourceID: String): BmlDownloadResponse = {
downloadResource(user, resourceID, "")
}
override def downloadResource(user: String, resourceId: String, version: String): BmlDownloadResponse = {
val bmlDownloadAction = BmlDownloadAction()
import scala.collection.JavaConversions._
bmlDownloadAction.getParameters +="resourceId"->resourceId
// TODO: 不能放非空的参数
if(version != null)bmlDownloadAction.getParameters +="version"->version
bmlDownloadAction.setUser(user)
val result = dwsClient.execute(bmlDownloadAction)
new BmlDownloadResponse(true,bmlDownloadAction.getInputStream,resourceId,version,null)
/* result match {
case downloadResult:BmlResourceDownloadResult => val isSuccess = if (downloadResult.getStatusCode == 0) true else false
if (isSuccess){
downloadResult.setInputStream(bmlDownloadAction.getInputStream)
BmlDownloadResponse(isSuccess, downloadResult.inputStream, downloadResult.getResourceId, downloadResult.getVersion, "")
}else{
logger.error(s"user ${user} download resource $resourceId version $version failed, status code is ${ downloadResult.getStatusCode}")
BmlDownloadResponse(isSuccess, null, null, null, null)
}
case r:BmlResult => logger.error(s"result type ${r.getResultType} not match BmlResourceDownloadResult")
throw POSTResultNotMatchException()
case _ => throw POSTResultNotMatchException()
}*/
}
/**
* 下载资源到指定的path中
* @param user 用户名
* @param resourceId 资源ID
* @param version 版本信息
* @param path 指定的目录,前面要加schema share:// local:// 等
* @param overwrite 是否是追加
* @return 返回的inputStream已经被全部读完,所以返回一个null,另外的fullFileName是整个文件的名字
*/
override def downloadResource(user: String, resourceId: String, version: String, path: String, overwrite:Boolean = false): BmlDownloadResponse = {
//1检查目录是否存在,包括path的schema
//2检查文件是否存在,如果文件存在,并且overwrite是false,则报错
//3获取downloaded_file_name 拼成一个完整的filePath
//4获取inputStream,然后写入到filePath中
val fsPath = new FsPath(path)
val fileSystem = FSFactory.getFsByProxyUser(fsPath, user)
fileSystem.init(new util.HashMap[String, String]())
// if (fileSystem.exists(fsPath)){
// logger.error(s"path $path not exists")
// throw IllegalPathException()
// }
// val getBasicAction = BmlGetBasicAction(resourceId)
// val getBasicResult = dwsClient.execute(getBasicAction) match{
// case result:BmlGetBasicResult => result
// case _ => throw GetResultNotMatchException()
// }
// val fileName:StringBuilder = new StringBuilder
// fileName.append(path).append(if (path.endsWith("/")) "" else "/")
// if (getBasicResult != null && getBasicResult.getStatusCode == 0){
// val downloadedFileName = getBasicResult.downloadedFileName
// if (StringUtils.isNotEmpty(downloadedFileName)){
// fileName.append(downloadedFileName)
// }else{
// throw BmlResponseErrorException("返回的downloadedFileName参数为空")
// }
// }else{
// logger.error(s"获取 $resourceId 资源失败, BmlServer的返回码是 ${getBasicResult.getStatusCode}")
// throw BmlResponseErrorException("通过http方式获取")
// }
val fullFileName = path
val downloadAction = BmlDownloadAction() // TODO: 这里暂时还没改
import scala.collection.JavaConversions._
downloadAction.getParameters += "resourceId" -> resourceId
// TODO: 不能放非空的参数
if(version != null) downloadAction.getParameters += "version" -> version
downloadAction.setUser(user)
val downloadResult = dwsClient.execute(downloadAction)
val fullFilePath = new FsPath(fullFileName)
if (downloadResult != null){
val inputStream = downloadAction.getInputStream
val outputStream = fileSystem.write(fullFilePath, overwrite)
try{
IOUtils.copy(inputStream, outputStream)
}catch{
case e:IOException => logger.error("inputStream和outputStream流copy失败", e)
val exception = BmlClientFailException("inputStream和outputStream流copy失败")
exception.initCause(e)
throw e
case t:Throwable => logger.error("流复制失败",t)
throw t
}finally{
IOUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(outputStream)
}
BmlDownloadResponse(true, null, resourceId, version, fullFileName)
}else{
BmlDownloadResponse(false, null, null, null, null)
}
}
/**
* 更新资源信息
*
* @param resourceID 资源id
* @param filePath 目标文件路径
* @return resourceId 新的版本信息
*/
override def updateResource(user:String, resourceID: String, filePath: String): BmlUpdateResponse = {
val inputStream:InputStream = getInputStream(filePath)
updateResource(user, resourceID, filePath, inputStream)
}
override def updateResource(user:String, resourceID: String, filePath: String, inputStream: InputStream): BmlUpdateResponse = {
val _inputStreams = new util.HashMap[String, InputStream]()
_inputStreams.put("file", inputStream)
val bmlUpdateAction = BmlUpdateAction(null, _inputStreams)
bmlUpdateAction.setUser(user)
bmlUpdateAction.inputStreamNames.put("file", pathToName(filePath))
bmlUpdateAction.getParameters.put("resourceId",resourceID)
val result = dwsClient.execute(bmlUpdateAction)
result match{
case updateResult:BmlUpdateResult => val isSuccess= if (updateResult.getStatus == 0) true else false
if (isSuccess){
val resourceId = updateResult.getResourceId
val version = updateResult.getVersion
BmlUpdateResponse(isSuccess, resourceId, version)
}else{
logger.error(s"user $user update resource failed, status code is ${updateResult.getStatusCode}")
BmlUpdateResponse(isSuccess, null, null)
}
case r:BmlResult => logger.error(s"result type ${r.getResultType} not match BmlResourceDownloadResult")
throw POSTResultNotMatchException()
case _ => throw POSTResultNotMatchException()
}
}
/**
* relateResource方法将targetFilePath路径的文件关联到resourceID下面
* targetFilePath需要包括schema,如果不包含schema,默认是hdfs
*
* @param resourceID resourceID
* @param targetFilePath 指定文件目录
* @return BmlRelateResult 包含resourceId和新的version
*/
override def relateResource(resourceID: String, targetFilePath: String): BmlRelateResponse = {
null
}
/**
* 获取resourceid 对应资源的所有版本
* @param user 用户名
* @param resourceId 资源Id
* @return resourceId对应下的所有版本信息
*/
override def getVersions(user: String, resourceId: String): BmlResourceVersionsResponse = {
val getVersionsAction = BmlGetVersionsAction(user, resourceId)
val result = dwsClient.execute(getVersionsAction)
result match{
case _result:BmlResourceVersionResult => val isSuccess= if (_result.getStatus == 0) true else false
if (isSuccess){
val resourceId = _result.getResourceId
val resourceVersions = _result.getResourceVersions
BmlResourceVersionsResponse(isSuccess,resourceId, resourceVersions)
}else{
logger.error(s"user $user get versions failed, status code is ${_result.getStatusCode}")
BmlResourceVersionsResponse(isSuccess, null, null)
}
case r:BmlResult => logger.error(s"result type ${r.getResultType} not match BmlResourceDownloadResult")
throw POSTResultNotMatchException()
case _ => throw POSTResultNotMatchException()
}
}
/**
* 上传文件,用户指定文件路径,客户端自动获取输入流
* @param user 用户名
* @param filePath 文件路径
* @return 包含resourceId和version
*/
override def uploadResource(user: String, filePath: String): BmlUploadResponse = {
val inputStream:InputStream = getInputStream(filePath)
uploadResource(user, filePath, inputStream)
}
private def pathToName(filePath:String):String = new File(filePath).getName
/**
* 上传资源
*
* @param user 用户名
* @param filePath 上传的资源的路径
* @param inputStream 上传资源的输入流
* @return
*/
override def uploadResource(user: String, filePath: String, inputStream: InputStream): BmlUploadResponse = {
val _inputStreams = new util.HashMap[String, InputStream]()
_inputStreams.put("file", inputStream)
val uploadAction = BmlUploadAction(null, _inputStreams)
uploadAction.inputStreamNames.put("file", pathToName(filePath))
uploadAction.setUser(user)
val result = dwsClient.execute(uploadAction)
result match {
case bmlUploadResult:BmlUploadResult => val isSuccess = if(bmlUploadResult.getStatus == 0) true else false
if (isSuccess){
val resourceId = bmlUploadResult.getResourceId
val version = bmlUploadResult.getVersion
BmlUploadResponse(isSuccess, resourceId,version)
}else{
logger.error(s"user $user upload resource failed, status code is ${bmlUploadResult.getStatusCode}")
BmlUploadResponse(isSuccess, null, null)
}
case r:BmlResult => logger.error(s"result type ${r.getResultType} not match BmlResourceDownloadResult")
throw POSTResultNotMatchException()
case _ => throw POSTResultNotMatchException()
}
}
/**
*
*/
override def deleteResource(user: String, resourceId: String, version: String): BmlDeleteResponse = {
null
}
override def deleteResource(user: String, resourceId: String): BmlDeleteResponse = {
val deleteAction = BmlDeleteAction(resourceId)
deleteAction.getParameters.put("resourceId", resourceId)
val result = dwsClient.execute(deleteAction)
result match {
case bmlDeleteResult: BmlDeleteResult => val isSuccess= if (bmlDeleteResult.getStatus == 0) true else false
if (isSuccess){
BmlDeleteResponse(isSuccess)
}else{
logger.error(s"user $user update resource failed, status code is ${bmlDeleteResult.getStatusCode}")
BmlDeleteResponse(isSuccess)
}
case r:BmlResult => logger.error(s"result type ${r.getResultType} not match BmlResourceDownloadResult")
throw POSTResultNotMatchException()
case _ => throw POSTResultNotMatchException()
}
}
//todo 现在是为了通过编译
private def getInputStream(str: String):InputStream = {
null
}
}