blob: be6a4f172c0fa20be2894dbe1ce8762b949a0292 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.storage.resultset
import java.io.{IOException, OutputStream}
import java.util
import com.webank.wedatasphere.linkis.common.io.resultset.{ResultSerializer, ResultSet, ResultSetWriter}
import com.webank.wedatasphere.linkis.common.io.{Fs, FsPath, MetaData, Record}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.storage.FSFactory
import com.webank.wedatasphere.linkis.storage.domain.Dolphin
import com.webank.wedatasphere.linkis.storage.exception.StorageErrorException
import com.webank.wedatasphere.linkis.storage.utils.{FileSystemUtils, StorageUtils}
import scala.collection.mutable.ArrayBuffer
/**
* Created by johnnwang on 10/17/18.
*/
class StorageResultSetWriter[K <: MetaData, V <: Record](resultSet: ResultSet[K,V], maxCacheSize: Long,
storePath: FsPath) extends ResultSetWriter[K,V](resultSet = resultSet, maxCacheSize = maxCacheSize, storePath = storePath) with Logging{
private val serializer: ResultSerializer = resultSet.createResultSetSerializer
private var moveToWriteRow = false
private var outputStream: OutputStream = _
private var rowCount = 0
private val buffer = new ArrayBuffer[Byte]()
private var fs: Fs = _
private var rMetaData: MetaData = _
private var proxyUser:String = StorageUtils.getJvmUser
def setProxyUser(proxyUser:String): Unit = {
this.proxyUser = proxyUser
}
def isEmpty:Boolean = {
rMetaData == null && buffer.length <= Dolphin.FILE_EMPTY
}
def init(): Unit = {
writeLine(resultSet.getResultSetHeader)
}
def createNewFile: Unit = {
if(storePath != null && outputStream == null){
fs = FSFactory.getFsByProxyUser(storePath,proxyUser)
fs.init(null)
FileSystemUtils.createNewFile(storePath, proxyUser,true)
outputStream = fs.write(storePath, false)
info(s"Succeed to create a new file:$storePath")
}
}
def writeLine(bytes: Array[Byte]): Unit = {
if(buffer.length > maxCacheSize) {
if(outputStream == null) {
createNewFile
}
flush()
outputStream.write(bytes)
} else {
buffer.appendAll(bytes)
}
}
override def toString: String = {
if(outputStream == null){
if(isEmpty) return ""
new String(buffer.toArray,Dolphin.CHAR_SET)
} else {
storePath.getSchemaPath
}
}
override def toFSPath: FsPath = storePath
override def addMetaDataAndRecordString(content: String): Unit = {
if(!moveToWriteRow){
val bytes = content.getBytes(Dolphin.CHAR_SET)
writeLine(bytes)
}
moveToWriteRow = true
}
override def addRecordString(content: String): Unit = {}
@scala.throws[IOException]
override def addMetaData(metaData: MetaData): Unit = {
if(!moveToWriteRow) {
{
rMetaData = metaData
init()
writeLine(serializer.metaDataToBytes(metaData))
}
moveToWriteRow = true
}
}
@scala.throws[IOException]
override def addRecord(record: Record): Unit = {
if(moveToWriteRow) {
rowCount = rowCount + 1
writeLine(serializer.recordToBytes(record))
}
}
def closeFs: Unit = {
if(fs != null)
fs.close()
}
override def close(): Unit = {
Utils.tryFinally(if(outputStream != null ) flush()){
closeFs
if(outputStream != null){
outputStream.close()
}}
}
override def flush(): Unit = {
createNewFile
if(outputStream != null) {
if(buffer.nonEmpty) {
outputStream.write(buffer.toArray)
buffer.clear()
}
outputStream.flush()
}
}
}