blob: 3393bc9ee6985f5015d413976c66fc9a85c6ef90 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.csv
import java.util
import com.webank.wedatasphere.linkis.engine.configuration.SparkConfiguration
import com.webank.wedatasphere.linkis.storage.{domain => wds}
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import com.webank.wedatasphere.linkis.storage.resultset.{ResultSetReader}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
/**
* Created by allenlliu on 2019/1/26.
*/
object DolphinToSpark {
val bigDecimalPrecision = 20
val bigDecimalScale = 10
def createTempView(spark: SparkSession, tableName: String, res: String): Unit = {
createTempView(spark, tableName, res, false)
}
def createTempView(spark: SparkSession, tableName: String, res: String, forceReplace: Boolean): Unit = {
if (forceReplace || spark.sessionState.catalog.getTempView(tableName).isEmpty) {
val reader = ResultSetReader.getTableResultReader(res)
val metadata = reader.getMetaData.asInstanceOf[TableMetaData]
val rowList = new util.ArrayList[Row]()
var len = SparkConfiguration.DOLPHIN_LIMIT_LEN.getValue
while (reader.hasNext && len > 0){
rowList.add(Row.fromSeq(reader.getRecord.asInstanceOf[TableRecord].row))
len = len -1
}
val df: DataFrame = spark.createDataFrame(rowList,metadataToSchema(metadata))
df.createOrReplaceTempView(tableName)
}
}
def metadataToSchema(metaData: TableMetaData):StructType = {
new StructType(metaData.columns.map(field => StructField(field.columnName,toSparkType(field.dataType))))
}
def toSparkType(dataType:wds.DataType):DataType = dataType match {
case wds.NullType => NullType
//case wds.StringType | wds.CharType | wds.VarcharType | wds.StructType | wds.ListType | wds.ArrayType | wds.MapType => StringType
case wds.BooleanType => BooleanType
case wds.ShortIntType => ShortType
case wds.IntType => IntegerType
case wds.LongType => LongType
case wds.FloatType => FloatType
case wds.DoubleType => DoubleType
case wds.DecimalType => DecimalType(bigDecimalPrecision,bigDecimalScale)
case wds.DateType => DateType
//case wds.TimestampType => TimestampType
case wds.BinaryType => BinaryType
case _ => StringType
}
}