blob: 2600ab9f34cc708a533c0412ba5c9be8a6a0c1b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.examples
import java.io.File
import java.util.Random
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.examples.util.ExampleUtils
/**
* This example is for Materialized View.
*/
object MVDataMapExample {
def main(args: Array[String]) {
val spark = ExampleUtils.createCarbonSession("MVDataMapExample")
exampleBody(spark)
performanceTest(spark)
spark.close()
}
def exampleBody(spark: SparkSession): Unit = {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
spark.sql("DROP TABLE IF EXISTS mainTable")
spark.sql("DROP TABLE IF EXISTS dimtable")
spark.sql(
"""
| CREATE TABLE mainTable
| (id Int,
| name String,
| city String,
| age Int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
spark.sql(
"""
| CREATE TABLE dimtable
| (name String,
| address String)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
spark.sql(s"""LOAD DATA LOCAL INPATH '$testData' into table mainTable""")
spark.sql(s"""insert into dimtable select name, concat(city, ' street1') as address from
|mainTable group by name, address""".stripMargin)
// 1. create simple sub projection MV datamap
// sub projections to be hit
spark.sql(s"""create datamap simple_sub_projection using 'mv' as
| select id,name from mainTable"""
.stripMargin)
spark.sql(s"""rebuild datamap simple_sub_projection""")
// Check the physical plan which uses MV table simple_sub_projection_table instead of mainTable
spark.sql(s"""select id from mainTable""").explain(true)
// Check the physical plan which uses MV table simple_sub_projection_table instead of mainTable
spark.sql(s"""select sum(id) from mainTable""").explain(true)
// 2. aggregate functions to be hit
spark.sql(
s"""create datamap simple_agg using 'mv' as
| select id,sum(age) from mainTable group by id""".stripMargin)
spark.sql(s"""rebuild datamap simple_agg""")
// Check the physical plan which uses MV table simple_agg_table instead of mainTable
spark.sql(s"""select id,sum(age) from mainTable group by id""").explain(true)
// Check the physical plan of subquery which uses MV table simple_agg_table instead of mainTable
spark.sql(s"""select sub.id from (select id ,sum(age) from mainTable group by id) sub where sub
|.id = 4""".stripMargin).explain(true)
// 3.join with another table and aggregate functions to be hit
spark.sql(s"""create datamap simple_agg_with_join using 'mv' as
| select id,address, sum(age) from mainTable inner join dimtable on mainTable
| .name=dimtable.name group by id ,address""".stripMargin)
spark.sql(s"""rebuild datamap simple_agg_with_join""")
// Check the physical plan which uses MV table simple_agg_with_join_table instead of
// mainTable and dimtable.
spark.sql(s"""select id,address, sum(age) from mainTable inner join dimtable on mainTable
|.name=dimtable.name group by id ,address""".stripMargin).explain(true)
// Check the physical plan which uses MV table simple_agg_with_join_table instead of
// mainTable and dimtable.
spark.sql(s"""select id,address, sum(age) from mainTable inner join dimtable on mainTable
|.name=dimtable.name where id =1 group by id ,address""".stripMargin).explain(true)
// Show datamaps
spark.sql("show datamap").show(false)
// Drop datamap
spark.sql("drop datamap if exists simple_agg_with_join")
spark.sql("DROP TABLE IF EXISTS mainTable")
spark.sql("DROP TABLE IF EXISTS dimtable")
}
private def performanceTest(spark: SparkSession): Unit = {
spark.sql("DROP TABLE IF EXISTS employee_salary")
spark.sql("DROP TABLE IF EXISTS employee_salary_without_mv")
spark.sql("DROP TABLE IF EXISTS emp_address")
createFactTable(spark, "employee_salary")
createFactTable(spark, "employee_salary_without_mv")
spark.sql(
"""
| CREATE TABLE emp_address
| (name String,
| address String)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
spark.sql(
s"""insert into emp_address select name, concat(city, ' street1') as address from
|employee_salary group by name, address""".stripMargin)
spark.sql(
s"""create datamap simple_agg_employee using 'mv' as
| select id,sum(salary) from employee_salary group by id""".stripMargin)
spark.sql(s"""rebuild datamap simple_agg_employee""")
// Test performance of aggregate queries with mv datamap
val timeWithOutMv = time(spark
.sql("select id, name, sum(salary) from employee_salary_without_mv group by id,name")
.collect())
val timeWithMv = time(spark
.sql("select id,name,sum(salary) from employee_salary group by id,name").collect())
// scalastyle:off
println("Time of table with MV is : " + timeWithMv + " time withoutmv : " + timeWithOutMv)
// scalastyle:on
val timeWithOutMvFilter = time(spark
.sql(
"select id, name, sum(salary) from employee_salary_without_mv where name='name10' group " +
"by id,name")
.collect())
val timeWithMvFilter = time(spark
.sql("select id,name,sum(salary) from employee_salary where name='name10' group by id,name")
.collect())
// scalastyle:off
println("Time of table with MV with filter is : " + timeWithMvFilter + " time withoutmv : " +
timeWithOutMvFilter)
// scalastyle:on
// Tests performance of aggregate with join queries.
spark.sql(
s"""create datamap simple_join_agg_employee using 'mv' as
| select id,address, sum(salary) from employee_salary f join emp_address d
| on f.name=d.name group by id,address""".stripMargin)
spark.sql(s"""rebuild datamap simple_join_agg_employee""")
val timeWithMVJoin =
time(spark.sql(
s"""select id,address, sum(salary) from employee_salary f join emp_address d
| on f.name=d.name group by id,address""".stripMargin).collect())
val timeWithOutMVJoin =
time(spark.sql(
s"""select id,address, sum(salary) from employee_salary_without_mv f
|join emp_address d on f.name=d.name group by id,address""".stripMargin).collect())
// scalastyle:off
println("Time of table with MV with join is : " + timeWithMVJoin + " time withoutmv : " +
timeWithOutMVJoin)
// scalastyle:on
spark.sql("DROP TABLE IF EXISTS employee_salary")
spark.sql("DROP TABLE IF EXISTS emp_address")
spark.sql("DROP TABLE IF EXISTS employee_salary_without_mv")
}
private def createFactTable(spark: SparkSession, tableName: String): Unit = {
import spark.implicits._
val rand = new Random()
// Create fact table with datamap
val df = spark.sparkContext.parallelize(1 to 1000000)
.map(x => (x % 1000, "name" + x % 1000, "city" + x % 100, rand.nextInt()))
.toDF("id", "name", "city", "salary")
df.write
.format("carbondata")
.option("tableName", tableName)
.save()
}
// define time function
private def time(code: => Unit): Double = {
val start = System.currentTimeMillis()
code
// return time in second
(System.currentTimeMillis() - start).toDouble / 1000
}
}