blob: 9b3ff877d65dc63174625307921f0aa161cfd997 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.util.LinkedHashSet
import scala.Array.canBuildFrom
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{CarbonMetastoreTypes, SparkTypeConverter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonTablePath
/**
* Represents logical plan for one carbon table
*/
case class CarbonRelation(
databaseName: String,
tableName: String,
var metaData: CarbonMetaData,
carbonTable: CarbonTable)
extends LeafNode with MultiInstanceRelation {
override def newInstance(): LogicalPlan = {
CarbonRelation(databaseName, tableName, metaData, carbonTable)
.asInstanceOf[this.type]
}
val dimensionsAttr: Seq[AttributeReference] = {
val sett = new LinkedHashSet(carbonTable.getVisibleDimensions.asScala.asJava)
sett.asScala.toSeq.map(dim => {
val dimval = metaData.carbonTable.getDimensionByName(dim.getColName)
val output: DataType = dimval.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(
s"array<${SparkTypeConverter.getArrayChildren(carbonTable, dim.getColName)}>")
case "struct" =>
CarbonMetastoreTypes.toDataType(
s"struct<${SparkTypeConverter.getStructChildren(carbonTable, dim.getColName)}>")
case "map" =>
CarbonMetastoreTypes.toDataType(
s"map<${SparkTypeConverter.getMapChildren(carbonTable, dim.getColName)}>")
case dType =>
val dataType = addDecimalScaleAndPrecision(dimval, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
AttributeReference(
dim.getColName,
output,
nullable = true)()
})
}
val measureAttr = {
val factTable = carbonTable.getTableName
new LinkedHashSet(
carbonTable.getVisibleMeasures.asScala.asJava).asScala.toSeq
.map { x =>
val metastoreType = metaData.carbonTable.getMeasureByName(x.getColName)
.getDataType.getName.toLowerCase match {
case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
case others => others
}
AttributeReference(
x.getColName,
CarbonMetastoreTypes.toDataType(metastoreType),
nullable = true)()
}
}
override val output = {
val columns = carbonTable.getCreateOrderColumn()
.asScala
val partitionColumnSchemas = if (carbonTable.getPartitionInfo() != null) {
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
} else {
Nil
}
val otherColumns = columns.filterNot(a => partitionColumnSchemas.contains(a.getColumnSchema))
val partitionColumns = columns.filter(a => partitionColumnSchemas.contains(a.getColumnSchema))
// convert each column to Attribute
(otherColumns ++= partitionColumns).filter(!_.isInvisible).map { column: CarbonColumn =>
if (column.isDimension()) {
val output: DataType = column.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(
s"array<${SparkTypeConverter.getArrayChildren(carbonTable, column.getColName)}>")
case "struct" =>
CarbonMetastoreTypes.toDataType(
s"struct<${SparkTypeConverter.getStructChildren(carbonTable, column.getColName)}>")
case "map" =>
CarbonMetastoreTypes.toDataType(
s"map<${SparkTypeConverter.getMapChildren(carbonTable, column.getColName)}>")
case dType =>
val dataType = SparkTypeConverter.addDecimalScaleAndPrecision(column, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
AttributeReference(column.getColName, output, nullable = true )(
qualifier = Option(tableName + "." + column.getColName))
} else {
val output = CarbonMetastoreTypes.toDataType {
column.getDataType.getName.toLowerCase match {
case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
.getColumnSchema.getScale + ")"
case others => others
}
}
AttributeReference(column.getColName, output, nullable = true)(
qualifier = Option(tableName + "." + column.getColName))
}
}
}
// TODO: Use data from the footers.
// TODO For 2.1
// override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
// Todo for 2.2
// override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes =
// this.sizeInBytes)
// override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
override def equals(other: Any): Boolean = {
other match {
case p: CarbonRelation =>
p.databaseName == databaseName && p.output == output && p.tableName == tableName
case _ => false
}
}
def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
var dType = dataType
if (DataTypes.isDecimal(dimval.getDataType)) {
dType +=
"(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
dType
}
private var tableStatusLastUpdateTime = 0L
private var sizeInBytesLocalValue = 0L
def sizeInBytes: Long = {
if (carbonTable.isExternalTable) {
val tablePath = carbonTable.getTablePath
if (FileFactory.isFileExist(tablePath)) {
sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
}
} else {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val allSegments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments(carbonTable.isChildTableForMV)
if (allSegments.getValidSegments.isEmpty) {
sizeInBytesLocalValue = 0L
} else {
val tablePath = carbonTable.getTablePath
if (FileFactory.isFileExist(tablePath)) {
// get the valid segments
val segments = allSegments.getValidSegments.asScala
var size = 0L
// for each segment calculate the size
segments.foreach { validSeg =>
// for older store
if (null != validSeg.getLoadMetadataDetails.getDataSize &&
null != validSeg.getLoadMetadataDetails.getIndexSize) {
size = size + validSeg.getLoadMetadataDetails.getDataSize.toLong +
validSeg.getLoadMetadataDetails.getIndexSize.toLong
} else {
size = size + FileFactory.getDirectorySize(
CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
}
}
// update the new table status time
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
// update the new size
sizeInBytesLocalValue = size
}
}
}
}
sizeInBytesLocalValue
}
}