blob: cd68f91590d88b7ed45d6984a2b72847225641df [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.filesystem.reader
import java.util
import com.webank.wedatasphere.linkis.common.io.{FsPath, FsReader, MetaData, Record}
import com.webank.wedatasphere.linkis.storage.fs.FileSystem
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import com.webank.wedatasphere.linkis.storage.resultset.{ResultSetFactory, ResultSetReader}
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
/**
* Created by johnnwang on 2019/4/16.
*/
class RSReader extends FileReader {
def this(fileSystem: FileSystem, fsPath: FsPath, params: util.HashMap[String, String]) = {
this()
init(fileSystem, fsPath, params)
}
override protected var fsReader: FsReader[MetaData, Record] = _
private var rsType: InnerParent = _
override def init(fileSystem: FileSystem, fsPath: FsPath, params: util.HashMap[String, String]): Unit = {
val resultset = ResultSetFactory.getInstance.getResultSetByPath(fsPath)
kind = resultset.resultSetType()
fsReader = ResultSetReader.getResultSetReader(resultset, fileSystem.read(fsPath)).asInstanceOf[FsReader[MetaData, Record]]
rsType = fsReader.getMetaData match {
case t: TableMetaData => new InnerTableReader(t)
case l: LineMetaData => new InnerLineReader(l)
}
}
override def getHead: util.HashMap[String, Object] = rsType.getHead
override def getBody: util.ArrayList[util.ArrayList[String]] = rsType.getBody
class InnerParent {
def getHead: util.HashMap[String, Object] = null
def getBody: util.ArrayList[util.ArrayList[String]] = null
}
class InnerLineReader(private val metaData: MetaData) extends InnerParent {
override def getHead: util.HashMap[String, Object] = {
val lineMetaData = metaData.asInstanceOf[LineMetaData]
import scala.collection.JavaConversions._
val map = new util.HashMap[String, Object]
map += "metadata" -> lineMetaData.getMetaData
map += "type" -> kind
map
}
override def getBody: util.ArrayList[util.ArrayList[String]] = {
val body = new util.ArrayList[util.ArrayList[String]]
val bodyChild = new util.ArrayList[String]
while (fsReader.hasNext) bodyChild.add(fsReader.getRecord.asInstanceOf[LineRecord].getLine)
fsReader.close()
body.add(bodyChild)
body
}
}
class InnerTableReader(private val metaData: MetaData) extends InnerParent {
override def getHead: util.HashMap[String, Object] = {
val columns = metaData.asInstanceOf[TableMetaData].columns
val resultsetMetaDataList = new util.ArrayList[String]()
columns.foreach(f => resultsetMetaDataList.add(f.toString()))
import scala.collection.JavaConversions._
val map = new util.HashMap[String, Object]
map += "metadata" -> resultsetMetaDataList
map += "type" -> kind
map
}
/**
* Grab 5000 rows directly here.(这里直接先抓取5000行)
*
* @return
*/
override def getBody: util.ArrayList[util.ArrayList[String]] = {
val body = new util.ArrayList[util.ArrayList[String]]
var row = 0
while (fsReader.hasNext && row <= 5000) {
val rows = fsReader.getRecord.asInstanceOf[TableRecord].row
val bodyChild = new util.ArrayList[String]()
rows.foreach {
f => if (f == null) bodyChild.add("NULL") else bodyChild.add(f.toString)
}
body.add(bodyChild)
row += 1
}
fsReader.close()
body
}
}
override protected var kind: String = _
}