blob: e0fece3215c124dc1c6436f21f4d243ad787d759 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.storage.resultset
import java.io.{ByteArrayInputStream, IOException, InputStream}
import com.webank.wedatasphere.linkis.common.io.resultset.{ResultSet, ResultSetReader}
import com.webank.wedatasphere.linkis.common.io.{MetaData, Record}
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.storage.domain.Dolphin
import com.webank.wedatasphere.linkis.storage.exception.StorageWarnException
import com.webank.wedatasphere.linkis.storage.utils.StorageUtils
import scala.collection.mutable.ArrayBuffer
/**
* Created by johnnwang on 10/17/18.
*/
class StorageResultSetReader[K <: MetaData, V <: Record](resultSet: ResultSet[K, V], inputStream: InputStream) extends ResultSetReader[K, V](resultSet, inputStream) with Logging{
private val deserializer = resultSet.createResultSetDeserializer
private var metaData: K = _
private var row: Record = _
private var colCount = 0
private var rowCount = 0
private val READ_CACHE = 1024
private val bytes = new Array[Byte](READ_CACHE)
def this(resultSet: ResultSet[K, V], value: String) = {
this(resultSet, new ByteArrayInputStream(value.getBytes(Dolphin.CHAR_SET)))
}
def init(): Unit = {
val resType = Dolphin.getType(inputStream)
if (resultSet.resultSetType != resType) throw new IOException("File type does not match(文件类型不匹配): " + ResultSetFactory.resultSetType.getOrElse(resType, "TABLE"))
}
/**
* Read a row of data
   Read the line length first
  * Get the entire row of data by the length of the line, first obtain the column length in the entire row of data,
* and then divide into column length to split the data
* 读取一行数据
* 先读取行长度
* 通过行长度获取整行数据,在整行数据中先获取列长度,进而分割成列长度从而分割数据
* @return
*/
def readLine(): Array[Byte] = {
var rowLen = 0
try rowLen = Dolphin.readInt(inputStream)
catch {
case t:StorageWarnException => info(s"Read finished(读取完毕)") ; return null
case t: Throwable => throw t
}
val rowBuffer = ArrayBuffer[Byte]()
var len = 0
//Read the entire line, except for the data of the line length(读取整行,除了行长的数据)
while (rowLen > 0 && len >= 0) {
if (rowLen > READ_CACHE)
len = StorageUtils.readBytes(inputStream,bytes, READ_CACHE)
else
len = StorageUtils.readBytes(inputStream,bytes, rowLen)
if (len > 0) {
rowLen -= len
rowBuffer ++= bytes.slice(0, len)
}
}
rowCount = rowCount + 1
rowBuffer.toArray
}
@scala.throws[IOException]
override def getRecord: Record = {
if (metaData == null) throw new IOException("Must read metadata first(必须先读取metadata)")
if (row == null) throw new IOException("Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)")
row
}
@scala.throws[IOException]
override def getMetaData: MetaData = {
if(metaData == null) init()
metaData = deserializer.createMetaData(readLine())
metaData
}
@scala.throws[IOException]
override def skip(recordNum: Int): Int = {
if(recordNum < 0 ) return -1
if(metaData == null) getMetaData
for(i <- recordNum until (0, -1)){
try inputStream.skip(Dolphin.readInt(inputStream)) catch { case t: Throwable => return -1}
}
recordNum
}
@scala.throws[IOException]
override def getPosition: Long = rowCount
@scala.throws[IOException]
override def hasNext: Boolean = {
if(metaData == null) getMetaData
val line = readLine()
if(line == null) return false
row = deserializer.createRecord(line)
if(row == null) return false
true
}
@scala.throws[IOException]
override def available: Long = inputStream.available()
override def close(): Unit = inputStream.close()
}