package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.core.constants.CarbonCommonConstants
* Insert into carbon table from other source
object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
: LogicalPlan = {
if (relation.carbonRelation.output.size > CarbonCommonConstants
.error("Maximum supported column by carbon is:" + CarbonCommonConstants
if (child.output.size >= relation.carbonRelation.output.size ) {
InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
sys.error("Cannot insert into target table because column number are different")
object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
var sqlContext: SQLContext = _
def init(sqlContext: SQLContext) {
this.sqlContext = sqlContext
private def processUpdateQuery(
table: UnresolvedRelation,
columns: List[String],
selectStmt: String,
filter: String): LogicalPlan = {
var includedDestColumns = false
var includedDestRelation = false
var addedTupleId = false
def prepareTargetReleation(relation: UnresolvedRelation): Subquery = {
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(
UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
// include tuple id and rest of the required columns in subqury
Subquery(table.alias.getOrElse(""), Project(projList, relation))
// get the un-analyzed logical plan
val targetTable = prepareTargetReleation(table)
val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform {
case Project(projectList, child) if (!includedDestColumns) =>
includedDestColumns = true
if (projectList.size != columns.size) {
sys.error("Number of source and destination columns are not matching")
val renamedProjectList ={ case(attr, col) =>
attr match {
case UnresolvedAlias(child) =>
UnresolvedAlias(Alias(child, col + "-updatedColumn")())
case _ => attr
val list = Seq(
UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList
Project(list, child)
case Filter(cond, child) if (!includedDestRelation) =>
includedDestRelation = true
Filter(cond, Join(child, targetTable, Inner, None))
case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
t != table.tableIdentifier) =>
includedDestRelation = true
Join(r, targetTable, Inner, None)
val updatedSelectPlan = if (!includedDestRelation) {
// special case to handle self join queries
// Eg. update tableName SET (column1) = (column1+1)
selectPlan transform {
case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
addedTupleId == false) =>
addedTupleId = true
} else {
val finalPlan = if (filter.length > 0) {
val alias = table.alias.getOrElse("")
var transformed: Boolean = false
// Create a dummy projection to include filter conditions
SQLParser.parse("select * from " +
table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext) transform {
case UnresolvedRelation(t, Some(a)) if (
!transformed && t == table.tableIdentifier && a == alias) =>
transformed = true
// Add the filter condition of update statement on destination table
Subquery(alias, updatedSelectPlan)
} else {
val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
val destinationTable = UnresolvedRelation(tidSeq, table.alias)
ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
var addedTupleId = false
val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform {
case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
addedTupleId == false) =>
addedTupleId = true
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(
UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
// include tuple id in subqury
Project(projList, relation)
override def apply(logicalplan: LogicalPlan): LogicalPlan = {
logicalplan transform {
case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)