blob: 2e93a6c403d970ca1c4a3020ad4400dd4ee6fe1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonOption, _}
/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends RelationProvider
with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister {
override def shortName(): String = "carbondata"
/**
* Returns a new base relation with the given parameters.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
// if path is provided we can directly create Hadoop relation. \
// Otherwise create datasource relation
parameters.get("path") match {
case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None)
case _ =>
val options = new CarbonOption(parameters)
val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
val identifier = tableIdentifier match {
case Seq(name) => TableIdentifier(name, None)
case Seq(db, name) => TableIdentifier(name, Some(db))
}
CarbonDatasourceRelation(identifier, None)(sqlContext)
}
}
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: SchemaRDD): BaseRelation = {
// To avoid derby problem, dataframe need to be writen and read using CarbonContext
require(sqlContext.isInstanceOf[CarbonContext], "Error in saving dataframe to carbon file, " +
"must use CarbonContext to save dataframe")
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
"the path to store carbon file is the 'storePath' specified when creating CarbonContext")
val options = new CarbonOption(parameters)
val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
val cc = CarbonContext.getInstance(sqlContext.sparkContext)
cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
case (SaveMode.Append, _) =>
(false, true)
case (SaveMode.Ignore, exists) =>
(!exists, false)
}
if (doSave) {
// save data when the save mode is Overwrite.
new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters)
} else if (doAppend) {
new CarbonDataFrameWriter(data).appendToCarbonFile(parameters)
}
createRelation(sqlContext, parameters)
}
override def createRelation(sqlContext: SQLContext,
paths: Array[String],
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema)
}
}
/**
* Creates carbon relation compliant to data source api.
* This relation is stored to hive metastore
*/
private[sql] case class CarbonDatasourceRelation(
tableIdentifier: TableIdentifier,
alias: Option[String])
(@transient context: SQLContext)
extends BaseRelation with Serializable {
lazy val carbonRelation: CarbonRelation = {
CarbonEnv.get
.carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext)
.asInstanceOf[CarbonRelation]
}
def getDatabaseName(): String = tableIdentifier.database.getOrElse("default")
def getTable(): String = tableIdentifier.table
def schema: StructType = carbonRelation.schema
def sqlContext: SQLContext = context
override def sizeInBytes: Long = carbonRelation.sizeInBytes
}
/**
* Represents logical plan for one carbon table
*/
case class CarbonRelation(
databaseName: String,
tableName: String,
var metaData: CarbonMetaData,
tableMeta: TableMeta,
alias: Option[String])(@transient sqlContext: SQLContext)
extends LeafNode with MultiInstanceRelation {
def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"${
childDim.getColName.substring(dimName.length + 1)
}:array<${ getArrayChildren(childDim.getColName) }>"
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ getStructChildren(childDim.getColName) }>"
case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
}
}
def getArrayChildren(dimName: String): String = {
metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
case dType => addDecimalScaleAndPrecision(childDim, dType)
}
}).mkString(",")
}
def getStructChildren(dimName: String): String = {
metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
case "array" => s"${
childDim.getColName.substring(dimName.length + 1)
}:array<${ getArrayChildren(childDim.getColName) }>"
case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ metaData.carbonTable.getChildren(childDim.getColName)
.asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
}>"
case dType => s"${ childDim.getColName
.substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}).mkString(",")
}
override def newInstance(): LogicalPlan = {
CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext)
.asInstanceOf[this.type]
}
val dimensionsAttr = {
val sett = new java.util.LinkedHashSet(tableMeta.carbonTable
.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName).asScala.asJava)
sett.asScala.toSeq.filter(dim => !dim.isInvisible ||
(dim.isInvisible && dim.isInstanceOf[CarbonImplicitDimension]))
.map(dim => {
val dimval = metaData.carbonTable
.getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
val output: DataType = dimval.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
case "struct" =>
CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
case dType =>
val dataType = addDecimalScaleAndPrecision(dimval, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
AttributeReference(
dim.getColName,
output,
nullable = true)(qualifiers = tableName +: alias.toSeq)
})
}
val measureAttr = {
val factTable = tableMeta.carbonTable.getFactTableName
new java.util.LinkedHashSet(
tableMeta.carbonTable.
getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
.map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.getName
.toLowerCase match {
case "float" => "double"
case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
case others => others
}),
nullable = true)(qualifiers = tableName +: alias.toSeq))
}
override val output = {
val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
.asScala
columns.filter(!_.isInvisible).map { column =>
if (column.isDimension()) {
val output: DataType = column.getDataType.getName.toLowerCase match {
case "array" =>
CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
case "struct" =>
CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
case dType =>
val dataType = addDecimalScaleAndPrecision(column, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
AttributeReference(column.getColName, output,
nullable = true
)(qualifiers = tableName +: alias.toSeq)
} else {
AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType(
column.getDataType.getName.toLowerCase match {
case "float" => "double"
case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
.getColumnSchema.getScale + ")"
case others => others
}
),
nullable = true
)(qualifiers = tableName +: alias.toSeq)
}
}
}
// TODO: Use data from the footers.
override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
override def equals(other: Any): Boolean = {
other match {
case p: CarbonRelation =>
p.databaseName == databaseName && p.output == output && p.tableName == tableName
case _ => false
}
}
def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
var dType = dataType
if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) {
dType +=
"(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
dType
}
private var tableStatusLastUpdateTime = 0L
private var sizeInBytesLocalValue = 0L
def sizeInBytes: Long = {
val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
tableMeta.carbonTable.getAbsoluteTableIdentifier)
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
tableMeta.storePath,
tableMeta.carbonTableIdentifier).getPath
val fileType = FileFactory.getFileType(tablePath)
if(FileFactory.isFileExist(tablePath, fileType)) {
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
}
}
sizeInBytesLocalValue
}
}