blob: a7511fdf0ff8001a0cb884b2e04b7ec27def593b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.preaggregate
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
override def beforeAll: Unit = {
sql("DROP TABLE IF EXISTS mainTable")
sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
}
test("test pre agg create table with expression 1") {
sql(
s"""
| CREATE DATAMAP agg0 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| count(age)
| FROM mainTable GROUP BY name
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
}
test("test pre agg create table with expression 2") {
sql(
s"""
| CREATE DATAMAP agg1 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| sum(CASE WHEN age=35 THEN id ELSE 0 END)
| FROM mainTable GROUP BY name
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
}
test("test pre agg create table with expression 3") {
sql(
s"""
| CREATE DATAMAP agg2 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| sum(CASE WHEN age=35 THEN id ELSE 0 END),
| city
| FROM mainTable GROUP BY name,city
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
}
test("test pre agg create table with expression 4") {
sql(
s"""
| CREATE DATAMAP agg3 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| sum(CASE WHEN age=27 THEN id ELSE 0 END)
| FROM mainTable GROUP BY name
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
}
test("test pre agg create table with expression 5") {
sql(
s"""
| CREATE DATAMAP agg4 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| sum(CASE WHEN age=27 THEN id ELSE 0 END),
| SUM(CASE WHEN age=35 THEN id ELSE 0 END)
| FROM mainTable GROUP BY name
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
}
test("test pre agg create table with expression 6") {
sql(
s"""
| CREATE DATAMAP agg5 ON TABLE mainTable USING 'preaggregate' AS
| SELECT name,
| COUNT(CASE WHEN age=27 THEN(CASE WHEN name='eason' THEN id ELSE 0 END) ELSE 0 END)
| FROM mainTable GROUP BY name
| """.stripMargin)
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count")
}
test("test pre agg table selection with expression 1") {
val df = sql("select name as NewName, count(age) as sum from mainTable group by name order by name")
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
}
test("test pre agg table selection with expression 2") {
val df = sql("select name as NewName, sum(case when age=35 then id else 0 end) as sum from mainTable group by name order by name")
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
}
test("test pre agg table selection with expression 3") {
val df = sql("select sum(case when age=35 then id else 0 end) from maintable")
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
checkAnswer(df, Seq(Row(6.0)))
}
test("test pre agg table selection with expression 4") {
val df = sql("select sum(case when age=27 then id else 0 end) from maintable")
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
checkAnswer(df, Seq(Row(2.0)))
}
test("test pre agg table selection with expression 5") {
val df = sql("select sum(case when age=27 then id else 0 end), sum(case when age=35 then id else 0 end) from maintable")
preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
checkAnswer(df, Seq(Row(2.0,6.0)))
}
/**
* Below method will be used to validate the table name is present in the plan or not
*
* @param plan query plan
* @param actualTableName table name to be validated
*/
def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
var isValidPlan = false
plan.transform {
// first check if any preaTable1 scala function is applied it is present is in plan
// then call is from create preaTable1regate table class so no need to transform the query plan
case ca:CarbonRelation =>
if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
isValidPlan = true
}
}
ca
case logicalRelation:LogicalRelation =>
if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
isValidPlan = true
}
}
logicalRelation
}
if(!isValidPlan) {
assert(false)
} else {
assert(true)
}
}
test("Test Pre_aggregate with decimal column with order by") {
sql("drop table if exists maintable")
sql("create table maintable(name string, decimal_col decimal(30,16)) stored by 'carbondata'")
sql("insert into table maintable select 'abc',452.564")
sql(
"create datamap ag1 on table maintable using 'preaggregate' as select name,avg(decimal_col)" +
" from maintable group by name")
checkAnswer(sql("select avg(decimal_col) from maintable group by name order by name"),
Seq(Row(452.56400000000000000000)))
}
override def afterAll: Unit = {
sql("DROP TABLE IF EXISTS mainTable")
}
}