blob: ee34e2485a05c0004be068641def68d40899f899 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.storage.source
import java.util
import com.webank.wedatasphere.linkis.common.io.{FsReader, FsWriter, MetaData, Record}
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import com.webank.wedatasphere.linkis.storage.script.{ScriptMetaData, VariableParser}
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
import org.apache.commons.io.IOUtils
import org.apache.commons.math3.util.Pair
import scala.collection.JavaConversions._
/**
* Created by johnnwang on 2020/1/15.
*/
abstract class AbstractFileSource extends FileSource {
protected var start: Int = 0
protected var end: Int = -1
protected var count: Int = 0
protected var totalLine = 0
protected var shuffler: Record => Record = r => r
protected var pageTrigger: Boolean = false
protected var params: util.Map[String, String] = new util.HashMap[String, String]
protected var fsReader: FsReader[_ <: MetaData, _ <: Record] = _
protected var `type`: String = "script/text"
def setType(`type`: String): AbstractFileSource = {
params += "type" -> `type`
this.`type` = `type`
this
}
override def shuffle(s: Record => Record): AbstractFileSource = {
this.shuffler = s
this
}
override def page(page: Int, pageSize: Int): AbstractFileSource = {
if (pageTrigger) return this
start = (page - 1) * pageSize
end = pageSize * page - 1
pageTrigger = true
this
}
override def addParams(params: util.Map[String, String]): AbstractFileSource = {
this.params.putAll(params)
this
}
override def addParams(key: String, value: String): FileSource = {
this.params += key -> value
this
}
override def getParams(): util.Map[String, String] = this.params
def setFsReader[K <: MetaData, V <: Record](fsReader: FsReader[K, V]): AbstractFileSource = {
this.fsReader = fsReader
this
}
override def write[K <: MetaData, V <: Record](fsWriter: FsWriter[K, V]): Unit = {
`while`(fsWriter.addMetaData, fsWriter.addRecord)
}
def `while`[M](m: MetaData => M, r: Record => Unit): M = {
val metaData = fsReader.getMetaData
val t = m(metaData)
while (fsReader.hasNext && ifContinueRead) {
if (ifStartRead) {
r(shuffler(fsReader.getRecord))
totalLine += 1
}
count += 1
}
t
}
override def close(): Unit = IOUtils.closeQuietly(this.fsReader)
def collectRecord(record: Record): Array[String] = {
record match {
case t: TableRecord => t.row.map(_.toString)
case l: LineRecord => Array(l.getLine)
}
}
override def collect(): Pair[Object, util.ArrayList[Array[String]]] = {
val record = new util.ArrayList[Array[String]]
val metaData = `while`(collectMetaData, r => record.add(collectRecord(r)))
this.params += "totalLine" -> totalLine.toString
new Pair(metaData, record)
}
//如果不分页,则一直读,如果分页,则 count需要小于count
def ifContinueRead: Boolean = !pageTrigger || count <= end
def ifStartRead: Boolean = !pageTrigger || count >= start
def collectMetaData(metaData: MetaData): Object = {
//script/text ,tableResultset,lineResultSet
metaData match {
case s: ScriptMetaData => VariableParser.getMap(s.getMetaData)
case l: LineMetaData => l.getMetaData
case t: TableMetaData => t.columns.map(_.toString)
}
}
}