blob: d23b18f082d91b2731482f7118a72053259184aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.core.constants.CarbonCommonConstants
/**
* Insert into carbon table from other source
*/
object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
}
def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
: LogicalPlan = {
if (relation.carbonRelation.output.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
sys
.error("Maximum supported column by carbon is:" + CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS
)
}
if (child.output.size >= relation.carbonRelation.output.size ) {
InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
sys.error("Cannot insert into target table because column number are different")
}
}
}
object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
var sqlContext: SQLContext = _
def init(sqlContext: SQLContext) {
this.sqlContext = sqlContext
}
private def processUpdateQuery(
table: UnresolvedRelation,
columns: List[String],
selectStmt: String,
filter: String): LogicalPlan = {
var includedDestColumns = false
var includedDestRelation = false
var addedTupleId = false
def prepareTargetReleation(relation: UnresolvedRelation): Subquery = {
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(
UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
// include tuple id and rest of the required columns in subqury
Subquery(table.alias.getOrElse(""), Project(projList, relation))
}
// get the un-analyzed logical plan
val targetTable = prepareTargetReleation(table)
val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform {
case Project(projectList, child) if (!includedDestColumns) =>
includedDestColumns = true
if (projectList.size != columns.size) {
sys.error("Number of source and destination columns are not matching")
}
val renamedProjectList = projectList.zip(columns).map{ case(attr, col) =>
attr match {
case UnresolvedAlias(child) =>
UnresolvedAlias(Alias(child, col + "-updatedColumn")())
case _ => attr
}
}
val list = Seq(
UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList
Project(list, child)
case Filter(cond, child) if (!includedDestRelation) =>
includedDestRelation = true
Filter(cond, Join(child, targetTable, Inner, None))
case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
t != table.tableIdentifier) =>
includedDestRelation = true
Join(r, targetTable, Inner, None)
}
val updatedSelectPlan = if (!includedDestRelation) {
// special case to handle self join queries
// Eg. update tableName SET (column1) = (column1+1)
selectPlan transform {
case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
addedTupleId == false) =>
addedTupleId = true
targetTable
}
} else {
selectPlan
}
val finalPlan = if (filter.length > 0) {
val alias = table.alias.getOrElse("")
var transformed: Boolean = false
// Create a dummy projection to include filter conditions
SQLParser.parse("select * from " +
table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext) transform {
case UnresolvedRelation(t, Some(a)) if (
!transformed && t == table.tableIdentifier && a == alias) =>
transformed = true
// Add the filter condition of update statement on destination table
Subquery(alias, updatedSelectPlan)
}
} else {
updatedSelectPlan
}
val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
val destinationTable = UnresolvedRelation(tidSeq, table.alias)
ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
}
def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
var addedTupleId = false
val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform {
case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
addedTupleId == false) =>
addedTupleId = true
val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
Seq.empty, isDistinct = false), "tupleId")())
val projList = Seq(
UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
// include tuple id in subqury
Project(projList, relation)
}
ProjectForDeleteCommand(
selectPlan,
tidSeq,
System.currentTimeMillis().toString)
}
override def apply(logicalplan: LogicalPlan): LogicalPlan = {
logicalplan transform {
case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)
}
}
}