blob: 5c5e28a5ee6e70a9ecb8e6de957017bebf7124f4 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.storage.resultset.table
import com.webank.wedatasphere.linkis.common.io.resultset.ResultDeserializer
import com.webank.wedatasphere.linkis.storage.domain.{Column, DataType, Dolphin}
import com.webank.wedatasphere.linkis.storage.exception.StorageErrorException
import scala.collection.mutable.ArrayBuffer
class TableResultDeserializer extends ResultDeserializer[TableMetaData, TableRecord]{
var metaData: TableMetaData = _
import DataType._
override def createMetaData(bytes: Array[Byte]): TableMetaData = {
val colByteLen = Dolphin.getString(bytes, 0, Dolphin.INT_LEN).toInt
val colString = Dolphin.getString(bytes, Dolphin.INT_LEN, colByteLen)
val colArray = if(colString.endsWith(Dolphin.COL_SPLIT)) colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else colString.split(Dolphin.COL_SPLIT)
var index = Dolphin.INT_LEN + colByteLen
if(colArray.length % 3 != 0) throw new StorageErrorException(52001,"Parsing metadata failed(解析元数据失败)")
val columns = new ArrayBuffer[Column]()
for(i <- 0 until (colArray.length, 3)){
var len = colArray(i).toInt
val colName = Dolphin.getString(bytes, index, len)
index += len
len = colArray(i + 1).toInt
val colType = Dolphin.getString(bytes, index, len)
index += len
len = colArray(i + 2).toInt
val colComment = Dolphin.getString(bytes, index, len)
index += len
columns += Column(colName, colType, colComment)
}
metaData = new TableMetaData(columns.toArray)
metaData
}
/**
* colByteLen:All column fields are long(所有列字段长 记录的长度)
* colString:Obtain column length(获得列长):10,20,21
* colArray:Column length array(列长数组)
* Get data by column length(通过列长获得数据)
* @param bytes
* @return
*/
override def createRecord(bytes: Array[Byte]): TableRecord = {
val colByteLen = Dolphin.getString(bytes, 0, Dolphin.INT_LEN).toInt
val colString = Dolphin.getString(bytes, Dolphin.INT_LEN, colByteLen)
val colArray = if(colString.endsWith(Dolphin.COL_SPLIT)) colString.substring(0, colString.length -1).split(Dolphin.COL_SPLIT) else colString.split(Dolphin.COL_SPLIT)
var index = Dolphin.INT_LEN + colByteLen
val data = colArray.indices.map { i =>
val len = colArray(i).toInt
val res = Dolphin.getString(bytes, index, len)
index += len
if(i >= metaData.columns.length) res
else
toValue(metaData.columns(i).dataType,res)
}.toArray
new TableRecord(data)
}
}