blob: af12be909316c4cf00a2a34907d9a06f38b94e95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.preaggregate
import org.apache.spark.sql.CarbonDatasourceHadoopRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
class TestPreAggregateWithSubQuery extends QueryTest with BeforeAndAfterAll {
override def beforeAll: Unit = {
sql("DROP TABLE IF EXISTS mainTable")
sql("DROP TABLE IF EXISTS mainTable1")
sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
sql("CREATE TABLE mainTable1(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
sql("create datamap agg0 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable1")
}
test("test sub query PreAggregate table selection 1") {
val df = sql(
"""
| SELECT t2.newnewname AS newname
| FROM mainTable1 t1
| JOIN (
| select
| name AS newnewname,
| sum(age) AS sum
| FROM mainTable
| GROUP BY name ) t2
| ON t1.name = t2.newnewname
| GROUP BY t2.newnewname
""".stripMargin)
matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
}
test("test sub query PreAggregate table selection 2") {
val df = sql("select t1.name,t1.city from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname")
matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
}
test("test sub query PreAggregate table selection 3") {
val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name as newnewname,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.newnewname")
matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
}
test("test sub query PreAggregate table selection 4") {
val df = sql("select t1.name,t2.sum from mainTable1 t1 join (select name,sum(age) as sum from mainTable group by name )t2 on t1.name=t2.name group by t1.name, t2.sum")
matchTable(collectLogicalRelation(df.queryExecution.analyzed), "maintable_agg0")
}
/**
* Below method will be used to collect all the logical relation from logical plan
* @param logicalPlan
* query logical plan
* @return all the logical relation
*/
def collectLogicalRelation(logicalPlan: LogicalPlan) : Seq[LogicalRelation] = {
logicalPlan.collect{
case l:LogicalRelation => l
}
}
/**
* Below method will be used to match the logical relation
* @param logicalRelations
* all logical relation
* @param tableName
* table name
*/
def matchTable(logicalRelations: Seq[LogicalRelation], tableName: String) {
assert(logicalRelations.exists {
case l:LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.getTableName.
equalsIgnoreCase(tableName)
})
}
override def afterAll: Unit = {
sql("DROP TABLE IF EXISTS mainTable")
sql("DROP TABLE IF EXISTS mainTable1")
}
}