| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.examples |
| |
| import java.io.File |
| |
| import org.apache.spark.sql.{SaveMode, SparkSession} |
| |
| import org.apache.carbondata.examples.util.ExampleUtils |
| |
| /** |
| * This example is for pre-aggregate tables. |
| */ |
| |
| object PreAggregateDataMapExample { |
| |
| def main(args: Array[String]) { |
| val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample") |
| exampleBody(spark) |
| spark.close() |
| } |
| |
| def exampleBody(spark : SparkSession): Unit = { |
| val rootPath = new File(this.getClass.getResource("/").getPath |
| + "../../../..").getCanonicalPath |
| val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv" |
| |
| // 1. simple usage for Pre-aggregate tables creation and query |
| spark.sql("DROP TABLE IF EXISTS mainTable") |
| spark.sql("DROP TABLE IF EXISTS mainTable_other") |
| spark.sql(""" |
| | CREATE TABLE mainTable |
| | (id Int, |
| | name String, |
| | city String, |
| | age Int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| |
| spark.sql(""" |
| | CREATE TABLE mainTable_other |
| | (id Int, |
| | name String, |
| | city String, |
| | age Int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| |
| spark.sql(s""" |
| LOAD DATA LOCAL INPATH '$testData' into table mainTable |
| """) |
| |
| spark.sql(s""" |
| LOAD DATA LOCAL INPATH '$testData' into table mainTable_other |
| """) |
| |
| // 1. create pre-aggregate table by datamap |
| |
| // sum() be hit |
| spark.sql( |
| s"""create datamap preagg_sum on table mainTable using 'preaggregate' as |
| | select id,sum(age) from mainTable group by id""" |
| .stripMargin) |
| |
| // avg() and sum() both be hit, because avg = sum()/count() |
| spark.sql( |
| s"""create datamap preagg_avg on table mainTable using 'preaggregate' as |
| | select id,avg(age) from mainTable group by id""" |
| .stripMargin) |
| |
| // count() be hit |
| spark.sql( |
| s"""create datamap preagg_count_age on table mainTable using 'preaggregate' as |
| | select id,count(age) from mainTable group by id""" |
| .stripMargin) |
| |
| // min() be hit |
| spark.sql( |
| s"""create datamap preagg_min on table mainTable using 'preaggregate' as |
| | select id,min(age) from mainTable group by id""" |
| .stripMargin) |
| |
| // max() be hit |
| spark.sql( |
| s"""create datamap preagg_max on table mainTable using 'preaggregate' as |
| | select id,max(age) from mainTable group by id""" |
| .stripMargin) |
| |
| // show datamap |
| spark.sql("show datamap on table mainTable").show(false) |
| |
| // drop datamap |
| spark.sql("drop datamap preagg_count_age on table mainTable").show() |
| spark.sql("show datamap on table mainTable").show(false) |
| |
| spark.sql( |
| s""" |
| | create datamap preagg_case on table mainTable using 'preaggregate' as |
| | select name,sum(case when age=35 then id else 0 end) from mainTable group by name |
| | """.stripMargin) |
| |
| spark.sql( |
| s"""create datamap preagg_count on table maintable using 'preaggregate' as |
| | select name, count(*) from maintable group by name""".stripMargin) |
| |
| spark.sql("show datamap on table maintable").show(false) |
| |
| spark.sql( |
| s""" |
| | SELECT id,max(age) |
| | FROM mainTable group by id |
| """.stripMargin).show() |
| |
| spark.sql( |
| s""" |
| | select name, count(*) from |
| | mainTable group by name |
| """.stripMargin).show() |
| |
| spark.sql( |
| s""" |
| | select name as NewName, |
| | sum(case when age=35 then id else 0 end) as sum |
| | from mainTable group by name order by name |
| """.stripMargin).show() |
| |
| spark.sql( |
| s""" |
| | select t1.name,t1.city from mainTable_other t1 join |
| | (select name as newnewname,sum(age) as sum |
| | from mainTable group by name) t2 on t1.name=t2.newnewname |
| """.stripMargin).show() |
| |
| // 2.compare the performance : with pre-aggregate VS main table |
| |
| // build the test data, please increase the data for more obvious comparison. |
| // if set the data is larger than 100M, it will take 10+ mins. |
| |
| import spark.implicits._ |
| |
| import scala.util.Random |
| val r = new Random() |
| val df = spark.sparkContext.parallelize(1 to 10 * 10 * 1000) |
| .map(x => ("No." + r.nextInt(100000), "name" + x % 8, "city" + x % 50, x % 60)) |
| .toDF("ID", "name", "city", "age") |
| |
| // Create table with pre-aggregate |
| spark.sql("DROP TABLE IF EXISTS personTable") |
| spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg") |
| df.write.format("carbondata") |
| .option("tableName", "personTable") |
| .option("compress", "true") |
| .mode(SaveMode.Overwrite).save() |
| |
| // Create table without pre-aggregate |
| df.write.format("carbondata") |
| .option("tableName", "personTableWithoutAgg") |
| .option("compress", "true") |
| .mode(SaveMode.Overwrite).save() |
| |
| // create pre-aggregate table by datamap |
| spark.sql(""" |
| CREATE datamap preagg_avg on table personTable using 'preaggregate' as |
| | select id,avg(age) from personTable group by id |
| """.stripMargin) |
| |
| // define time function |
| def time(code: => Unit): Double = { |
| val start = System.currentTimeMillis() |
| code |
| // return time in second |
| (System.currentTimeMillis() - start).toDouble / 1000 |
| } |
| |
| val time_without_aggTable = time { |
| spark.sql( |
| s""" |
| | SELECT id, avg(age) |
| | FROM personTableWithoutAgg group by id |
| """.stripMargin).count() |
| } |
| |
| val time_with_aggTable = time { |
| spark.sql( |
| s""" |
| | SELECT id, avg(age) |
| | FROM personTable group by id |
| """.stripMargin).count() |
| } |
| // scalastyle:off |
| println("time for query on table with pre-aggregate table:" + time_with_aggTable.toString) |
| println("time for query on table without pre-aggregate table:" + time_without_aggTable.toString) |
| // scalastyle:on |
| |
| // 3. if avg function is defined for a column, sum also can be used on that;but not other way |
| // round |
| val time_without_aggTable_sum = time { |
| spark.sql( |
| s""" |
| | SELECT id, sum(age) |
| | FROM personTableWithoutAgg group by id |
| """.stripMargin).count() |
| } |
| |
| val time_with_aggTable_sum = time { |
| spark.sql( |
| s""" |
| | SELECT id, sum(age) |
| | FROM personTable group by id |
| """.stripMargin).count() |
| } |
| // scalastyle:off |
| println("time for query with function sum on table with pre-aggregate table:" + |
| time_with_aggTable_sum.toString) |
| println("time for query with function sum on table without pre-aggregate table:" + |
| time_without_aggTable_sum.toString) |
| // scalastyle:on |
| |
| spark.sql("DROP TABLE IF EXISTS mainTable") |
| spark.sql("DROP TABLE IF EXISTS mainTable_other") |
| spark.sql("DROP TABLE IF EXISTS personTable") |
| spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg") |
| } |
| } |