blob: bf9ac6b23de4fd82da0852805378b66a9b0eef89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.view
import java.io.File
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.exception.ProcessMetaDataException
class MVTest extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
drop()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
val projectPath = new File(this.getClass.getResource("/").getPath + "../../../../")
.getCanonicalPath.replaceAll("\\\\", "/")
val integrationPath = s"$projectPath/integration"
val resourcesPath = s"$integrationPath/spark/src/test/resources"
sql(
"""
| CREATE TABLE fact_table (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
| STORED AS carbondata
""".stripMargin)
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
}
test("test create mv on hive table") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source as select * from fact_table")
sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
val df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql("select empname, avg(salary) from fact_table group by empname"))
sql(s"drop materialized view mv1")
sql("drop table source")
}
test("test disable mv with carbonproperties and sessionparam") {
//1. Prepare the source table and MV, make sure the MV is enabled
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source as select * from fact_table")
sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
var df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
//2. test disable mv with carbon.properties
// 2.1 disable MV when set carbon.enable.mv = false in the carbonproperties
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"false")
df = sql("select empname, avg(salary) from source group by empname")
assert(!isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
// 2.2 enable MV when configuared value is invalid
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"invalidvalue")
df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
// 2.3 enable mv when set carbon.enable.mv = true in the carbonproperties
df = sql("select empname, avg(salary) from source group by empname")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_MV,"true")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
//3. test disable mv with sessionparam
// 3.1 disable MV when set carbon.enable.mv = false in the sessionparam
sql("set carbon.enable.mv = false")
df = sql("select empname, avg(salary) from source group by empname")
assert(!isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
// 3.2 validate configuared sessionparam
val exMessage = intercept[Exception] {
sql("set carbon.enable.mv = invalidvalue")
}
assert(exMessage.getMessage.contains("Invalid value invalidvalue for key carbon.enable.mv"))
// 3.3 enable mv when set carbon.enable.mv = true in the sessionparam
sql("set carbon.enable.mv = true")
df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
ThreadLocalSessionInfo.getCarbonSessionInfo.
getSessionParams.removeProperty(CarbonCommonConstants.CARBON_ENABLE_MV)
}
test("test create mv on orc table") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source using orc as select * from fact_table")
sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
val df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql("select empname, avg(salary) from fact_table group by empname"))
sql(s"drop materialized view mv1")
sql("drop table source")
}
test("test create mv on parquet table") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source using parquet as select * from fact_table")
sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
val df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql("select empname, avg(salary) from fact_table group by empname"))
sql(s"drop materialized view mv1")
sql("drop table source")
}
test("test create mv on carbon table") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source using carbondata as select * from fact_table")
sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
val df = sql("select empname, avg(salary) from source group by empname")
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql("select empname, avg(salary) from fact_table group by empname"))
sql(s"drop materialized view mv1")
sql("drop table source")
}
test("test create mv fail because of name used") {
sql("drop table if exists mv1")
sql("drop materialized view if exists mv1")
sql("create table mv1 using orc as select * from fact_table")
try {
sql(
"create materialized view mv1 as select empname, deptname, avg(salary) from fact_table group by empname, deptname")
} catch {
case _: TableAlreadyExistsException => // Expected
} finally {
sql("drop table if exists mv1")
sql("drop materialized view if exists mv1")
}
}
test("test drop mv") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("create table source using carbondata as select * from fact_table")
try {
sql(
"create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
try {
sql("drop table mv1")
} catch {
case _: ProcessMetaDataException => // Expected
}
sql("drop materialized view mv1")
} finally {
sql("drop table source")
}
}
test("test refresh mv on manual") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("drop table if exists fact_table_compare")
sql("create table fact_table_compare using carbondata as select * from fact_table")
sql("create table source using carbondata as select * from fact_table")
try {
val ctasQuery = "select empname, deptname, count(salary) from source group by empname, deptname"
val testQuery = "select empname, count(salary) from source group by empname"
val compareQuery = "select empname, count(salary) from fact_table_compare group by empname"
sql(s"create materialized view mv1 properties('refresh_trigger_mode'='on_manual') as $ctasQuery")
var df = sql(testQuery)
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql(compareQuery))
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE source OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
df = sql(testQuery)
assert(!isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
sql("refresh materialized view mv1")
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table_compare OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
df = sql(testQuery)
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql(compareQuery))
} finally {
sql("drop materialized view mv1")
sql("drop table fact_table_compare")
sql("drop table source")
}
}
test("test incremental refresh mv") {
sql("drop materialized view if exists mv1")
sql("drop table if exists source")
sql("drop table if exists fact_table_compare")
sql("create table fact_table_compare using carbondata as select * from fact_table")
sql("create table source using carbondata as select * from fact_table")
try {
val query = "select empname, deptname, salary from source where empname='arvind'"
val compareQuery = "select empname, deptname, salary from fact_table_compare where empname='arvind'"
sql(s"create materialized view mv1 as $query")
var df = sql(query)
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql(compareQuery))
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table_compare OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE source OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
df = sql(query)
assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
checkAnswer(df, sql(compareQuery))
} finally {
sql("drop table fact_table_compare")
sql("drop materialized view mv1")
sql("drop table source")
}
}
override def afterAll {
drop()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
}
def drop(): Unit = {
sql("drop table IF EXISTS fact_table")
sql("drop table IF EXISTS fact_table")
}
def isTableAppearedInPlan(logicalPlan: LogicalPlan, tableName: String): Boolean = {
val tables = logicalPlan collect {
case relation: LogicalRelation => relation.catalogTable.get
case relation: HiveTableRelation => relation.tableMeta
}
tables.exists(_.identifier.table.equalsIgnoreCase(tableName))
}
}