blob: 789e4cec38207c471fabb0f1b7d07f679df55c85 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.jobhistory.util
import java.io.{InputStream, OutputStream}
import java.util.Date
import com.webank.wedatasphere.linkis.common.conf.CommonVars
import com.webank.wedatasphere.linkis.common.io.FsPath
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.jobhistory.entity.QueryTask
import com.webank.wedatasphere.linkis.protocol.query.RequestInsertTask
import com.webank.wedatasphere.linkis.storage.FSFactory
import com.webank.wedatasphere.linkis.storage.fs.FileSystem
import com.webank.wedatasphere.linkis.storage.utils.{FileSystemUtils, StorageUtils}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.time.DateFormatUtils
/**
* Created by patinousward on 2019/2/25.
*/
object QueryUtils extends Logging {
private val CODE_STORE_PREFIX = CommonVars("bdp.dataworkcloud.query.store.prefix", "hdfs:///tmp/bdp-ide/")
private val CODE_STORE_SUFFIX = CommonVars("bdp.dataworkcloud.query.store.suffix", "")
private val CHARSET = "utf-8"
private val CODE_SPLIT = ";"
private val LENGTH_SPLIT = "#"
def storeExecutionCode(requestInsertTask: RequestInsertTask): Unit = {
if (requestInsertTask.getExecutionCode.length < 60000) return
val user: String = requestInsertTask.getUmUser
val path: String = getCodeStorePath(user)
val fsPath: FsPath = new FsPath(path)
val fileSystem = FSFactory.getFsByProxyUser(fsPath, user).asInstanceOf[FileSystem]
fileSystem.init(null)
var os: OutputStream = null
var position = 0L
val codeBytes = requestInsertTask.getExecutionCode.getBytes(CHARSET)
path.intern() synchronized {
Utils.tryFinally {
if (!fileSystem.exists(fsPath)) FileSystemUtils.createNewFile(fsPath, user, true)
os = fileSystem.write(fsPath, false)
position = fileSystem.get(path).getLength
IOUtils.write(codeBytes, os)
} {
IOUtils.closeQuietly(os)
if (fileSystem != null) fileSystem.close()
}
}
val length = codeBytes.length
requestInsertTask.setExecutionCode(path + CODE_SPLIT + position + LENGTH_SPLIT + length)
}
def exchangeExecutionCode(queryTask: QueryTask): Unit = {
import scala.util.control.Breaks._
if (queryTask.getExecutionCode == null || !queryTask.getExecutionCode.startsWith(StorageUtils.HDFS_SCHEMA)) return
val codePath = queryTask.getExecutionCode
val path = codePath.substring(0, codePath.lastIndexOf(CODE_SPLIT))
val codeInfo = codePath.substring(codePath.lastIndexOf(CODE_SPLIT) + 1)
val infos: Array[String] = codeInfo.split(LENGTH_SPLIT)
val position = infos(0).toInt
var lengthLeft = infos(1).toInt
val tub = new Array[Byte](1024)
val executionCode: StringBuilder = new StringBuilder
val fsPath: FsPath = new FsPath(path)
val fileSystem = FSFactory.getFsByProxyUser(fsPath, queryTask.getUmUser).asInstanceOf[FileSystem]
fileSystem.init(null)
var is: InputStream = null
if (!fileSystem.exists(fsPath)) return
Utils.tryFinally {
is = fileSystem.read(fsPath)
if (position > 0) is.skip(position)
breakable {
while (lengthLeft > 0) {
val readed = is.read(tub)
val useful = Math.min(readed, lengthLeft)
if (useful < 0) break()
lengthLeft -= useful
executionCode.append(new String(tub, 0, useful, CHARSET))
}
}
} {
if (fileSystem != null) fileSystem.close()
IOUtils.closeQuietly(is)
}
queryTask.setExecutionCode(executionCode.toString())
}
private def getCodeStorePath(user: String): String = {
val date: String = DateFormatUtils.format(new Date, "yyyyMMdd")
s"${CODE_STORE_PREFIX.getValue}${user}${CODE_STORE_SUFFIX.getValue}/executionCode/${date}/_scripts"
}
}