| /* |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.webank.wedatasphere.linkis.storage.resultset.table |
| |
| import com.webank.wedatasphere.linkis.common.io.resultset.ResultSerializer |
| import com.webank.wedatasphere.linkis.common.io.{MetaData, Record} |
| import com.webank.wedatasphere.linkis.storage.domain.Dolphin |
| |
| import scala.collection.mutable.ArrayBuffer |
| |
| |
| class TableResultSerializer extends ResultSerializer{ |
| |
| |
| override def metaDataToBytes(metaData: MetaData): Array[Byte] = { |
| val tableMetaData = metaData.asInstanceOf[TableMetaData] |
| lineToBytes(tableMetaData.columns.map(_.toArray).reduce((a1, a2) => a1 ++ a2)) |
| } |
| |
| override def recordToBytes(record: Record): Array[Byte] = { |
| val tableRecord = record.asInstanceOf[TableRecord] |
| lineToBytes(tableRecord.row) |
| } |
| |
| /** |
| * Convert a row of data to an array of Bytes |
| * Convert the data to byte and get the corresponding total byte length to write to the file |
| * Data write format: line length (fixed length) column length (fixed length) field index comma segmentation real data |
| * For example: 000000004900000000116,10,3,4,5,peace1johnnwang1101true11.51 |
| * The length of the line does not include its own length |
| * 将一行数据转换为Bytes的数组 |
| * 对数据转换为byte,并获取相应的总byte长度写入文件 |
| * 数据写入格式:行长(固定长度) 列长(固定长度) 字段索引逗号分割 真实数据 |
| * 如:000000004900000000116,10,3,4,5,peace1johnnwang1101true11.51 |
| * 其中行长不包括自身长度 |
| * @param line |
| */ |
| def lineToBytes(line: Array[Any]): Array[Byte] = { |
| //Data cache(数据缓存) |
| val dataBytes = ArrayBuffer[Array[Byte]]() |
| //Column cache(列缓存) |
| val colIndex = ArrayBuffer[Array[Byte]]() |
| var colByteLen = 0 |
| var length = 0 |
| line.foreach { data => |
| val bytes = if(data == null ) Dolphin.NULL_BYTES else Dolphin.getBytes(data) |
| dataBytes += bytes |
| val colBytes = Dolphin.getBytes(bytes.length) |
| colIndex += colBytes += Dolphin.COL_SPLIT_BYTES |
| colByteLen += colBytes.length + Dolphin.COL_SPLIT_LEN |
| length += bytes.length |
| } |
| length += colByteLen + Dolphin.INT_LEN |
| toByteArray(length, colByteLen, colIndex, dataBytes) |
| } |
| |
| /** |
| * Splice a row of data into a byte array(将一行的数据拼接成byte数组) |
| * @param length The total length of the line data byte, excluding its own length(行数据byte总长度,不包括自身的长度) |
| * @param colByteLen Record field index byte column length(记录字段索引byte的列长) |
| * @param colIndex Field index, including separator comma(字段索引,包括分割符逗号) |
| * @param dataBytes Byte of real data(真实数据的byte) |
| * @return |
| */ |
| def toByteArray(length: Int, colByteLen: Int, colIndex: ArrayBuffer[Array[Byte]], dataBytes: ArrayBuffer[Array[Byte]]): Array[Byte] = { |
| val row = ArrayBuffer[Byte]() |
| colIndex ++= dataBytes |
| row.appendAll(Dolphin.getIntBytes(length)) |
| row.appendAll(Dolphin.getIntBytes(colByteLen)) |
| colIndex.foreach(row.appendAll(_)) |
| row.toArray |
| } |
| |
| |
| |
| |
| } |