blob: 100ad17ccd49d4c92ba7e4b4611b38d60fb8bb01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.util
import java.util.Objects
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
object SparkTypeConverter {
def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = {
Objects.requireNonNull(table)
Objects.requireNonNull(columns)
if (columns.isEmpty) {
throw new IllegalArgumentException("column list is empty")
}
val fields = new java.util.ArrayList[StructField](columns.size)
val allColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
// find the column and add it to fields array
columns.foreach { column =>
val col = allColumns.find(_.getColumnName.equalsIgnoreCase(column)).getOrElse(
throw new IllegalArgumentException(column + " does not exist")
)
fields.add(StructField(col.getColumnName, convertCarbonToSparkDataType(col, table)))
}
StructType(fields)
}
/**
* Converts from carbon datatype to corresponding spark datatype.
*/
def convertCarbonToSparkDataType(
columnSchema: ColumnSchema,
table: CarbonTable): types.DataType = {
if (CarbonDataTypes.isDecimal(columnSchema.getDataType)) {
val scale = columnSchema.getScale
val precision = columnSchema.getPrecision
if (scale == 0 && precision == 0) {
DecimalType(18, 2)
} else {
DecimalType(precision, scale)
}
} else if (CarbonDataTypes.isArrayType(columnSchema.getDataType)) {
CarbonMetastoreTypes
.toDataType(s"array<${ getArrayChildren(table, columnSchema.getColumnName) }>")
} else if (CarbonDataTypes.isStructType(columnSchema.getDataType)) {
CarbonMetastoreTypes
.toDataType(s"struct<${ getStructChildren(table, columnSchema.getColumnName) }>")
} else if (CarbonDataTypes.isMapType(columnSchema.getDataType)) {
CarbonMetastoreTypes
.toDataType(s"map<${ getMapChildren(table, columnSchema.getColumnName) }>")
} else {
columnSchema.getDataType match {
case CarbonDataTypes.STRING => StringType
case CarbonDataTypes.SHORT => ShortType
case CarbonDataTypes.INT => IntegerType
case CarbonDataTypes.LONG => LongType
case CarbonDataTypes.DOUBLE => DoubleType
case CarbonDataTypes.FLOAT => FloatType
case CarbonDataTypes.BYTE => ByteType
case CarbonDataTypes.BINARY => BinaryType
case CarbonDataTypes.BOOLEAN => BooleanType
case CarbonDataTypes.TIMESTAMP => TimestampType
case CarbonDataTypes.DATE => DateType
case CarbonDataTypes.VARCHAR => StringType
}
}
}
def getArrayChildren(table: CarbonTable, dimName: String): String = {
table.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"array<${ getArrayChildren(table, childDim.getColName) }>"
case "struct" => s"struct<${ getStructChildren(table, childDim.getColName) }>"
case "map" => s"map<${ getMapChildren(table, childDim.getColName) }>"
case dType => addDecimalScaleAndPrecision(childDim, dType)
}
}).mkString(",")
}
def getStructChildren(table: CarbonTable, dimName: String): String = {
table.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"${
childDim.getColName.substring(dimName.length + 1)
}:array<${ getArrayChildren(table, childDim.getColName) }>"
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ table.getChildren(childDim.getColName)
.asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
}>"
case "map" => s"${
childDim.getColName.substring(dimName.length + 1)
}:map<${ getMapChildren(table, childDim.getColName) }>"
case dType => s"${ childDim.getColName
.substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}).mkString(",")
}
def getMapChildren(table: CarbonTable, dimName: String): String = {
table.getChildren(dimName).asScala.flatMap { childDim =>
// Map<String, String> is stored internally as Map<Struct<String, String>> in carbon schema
// and stored as Array<Struct<String, String>> in the actual data storage. So while parsing
// the map dataType we can ignore the struct child and directly get the children of struct
// which are actual children of map
val structChildren = table.getChildren(childDim.getColName).asScala
structChildren.map { structChild =>
structChild.getDataType.getName.toLowerCase match {
case "array" => s"array<${ getArrayChildren(table, structChild.getColName) }>"
case "struct" => s"struct<${ getStructChildren(table, structChild.getColName) }>"
case "map" => s"map<${ getMapChildren(table, structChild.getColName) }>"
case dType => addDecimalScaleAndPrecision(structChild, dType)
}
}
}.mkString(",")
}
def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
var dType = dataType
if (CarbonDataTypes.isDecimal(dimval.getDataType)) {
dType +=
"(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
dType
}
private def recursiveMethod(
table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"${
childDim.getColName.substring(dimName.length + 1)
}:array<${ getArrayChildren(table, childDim.getColName) }>"
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ getStructChildren(table, childDim.getColName) }>"
case "map" => s"${
childDim.getColName.substring(dimName.length + 1)
}:map<${ getMapChildren(table, childDim.getColName) }>"
case dType => s"${
childDim.getColName
.substring(dimName.length + 1)
}:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}
}