blob: 677bbb826fcada69a1da01610fa9a689027b6280 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.benchmark
import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SaveMode, SparkSession}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.spark.util.DataGenerator
// scalastyle:off println
object SimpleQueryBenchmark {
def parquetTableName: String = "comparetest_parquet"
def orcTableName: String = "comparetest_orc"
def carbonTableName(version: String): String = s"comparetest_carbonV$version"
// performance test queries, they are designed to test various data access type
val queries: Array[Query] = Array(
// ===========================================================================
// == FULL SCAN AGGREGATION ==
// ===========================================================================
Query(
"select sum(m1) from $table",
"full scan",
"full scan query, 1 aggregate"
),
Query(
"select sum(m1), sum(m2) from $table",
"full scan",
"full scan query, 2 aggregate"
),
Query(
"select sum(m1), sum(m2), sum(m3) from $table",
"full scan",
"full scan query, 3 aggregate"
),
Query(
"select sum(m1), sum(m2), sum(m3), sum(m4) from $table",
"full scan",
"full scan query, 4 aggregate"
),
Query(
"select sum(m1), sum(m2), sum(m3), sum(m4), avg(m5) from $table",
"full scan",
"full scan query, 5 aggregate"
),
Query(
"select count(distinct id) from $table",
"full scan",
"full scan and count distinct of high card column"
),
Query(
"select count(distinct country) from $table",
"full scan",
"full scan and count distinct of medium card column"
),
Query(
"select count(distinct city) from $table",
"full scan",
"full scan and count distinct of low card column"
),
// ===========================================================================
// == FULL SCAN GROUP BY AGGREGATE ==
// ===========================================================================
Query(
"select country, sum(m1) as metric from $table group by country order by metric",
"aggregate",
"group by on big data, on medium card column, medium result set,"
),
Query(
"select city, sum(m1) as metric from $table group by city order by metric",
"aggregate",
"group by on big data, on low card column, small result set,"
),
Query(
"select id, sum(m1) as metric from $table group by id order by metric desc limit 100",
"topN",
"top N on high card column"
),
Query(
"select country,sum(m1) as metric from $table group by country order by metric desc limit 10",
"topN",
"top N on medium card column"
),
Query(
"select city,sum(m1) as metric from $table group by city order by metric desc limit 10",
"topN",
"top N on low card column"
),
// ===========================================================================
// == FILTER SCAN GROUP BY AGGREGATION ==
// ===========================================================================
Query(
"select country, sum(m1) as metric from $table where city='city8' group by country " +
"order by metric",
"filter scan and aggregate",
"group by on large data, small result set"
),
Query(
"select id, sum(m1) as metric from $table where planet='planet10' group by id " +
"order by metric",
"filter scan and aggregate",
"group by on medium data, large result set"
),
Query(
"select city, sum(m1) as metric from $table where country='country12' group by city " +
"order by metric",
"filter scan and aggregate",
"group by on medium data, small result set"
),
// ===========================================================================
// == FILTER SCAN ==
// ===========================================================================
Query(
"select * from $table where city = 'city3' limit 10000",
"filter scan",
"filter on low card dimension, limit, medium result set, fetch all columns"
),
Query(
"select * from $table where country = 'country9' ",
"filter scan",
"filter on low card dimension, medium result set, fetch all columns"
),
Query(
"select * from $table where planet = 'planet101' ",
"filter scan",
"filter on medium card dimension, small result set, fetch all columns"
),
Query(
"select * from $table where id = '408938' ",
"filter scan",
"filter on high card dimension"
),
Query(
"select * from $table where country='country10000' ",
"filter scan",
"filter on low card dimension, not exist"
),
Query(
"select * from $table where country='country2' and city ='city8' ",
"filter scan",
"filter on 2 dimensions, small result set, fetch all columns"
),
Query(
"select * from $table where city='city1' and country='country2' and planet ='planet3' ",
"filter scan",
"filter on 3 dimensions, small result set, fetch all columns"
),
Query(
"select * from $table where m1 < 3",
"filter scan",
"filter on measure, small result set, fetch all columns"
),
Query(
"select * from $table where id like '1%' ",
"fuzzy filter scan",
"like filter, big result set"
),
Query(
"select * from $table where id like '%111'",
"fuzzy filter scan",
"like filter, medium result set"
),
Query(
"select * from $table where id like 'xyz%' ",
"fuzzy filter scan",
"like filter, full scan but not exist"
)
)
private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
: Double = time {
// partitioned by last 1 digit of id column
val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
dfWithPartition.write
.partitionBy("partitionCol")
.mode(SaveMode.Overwrite)
.parquet(table)
spark.read.parquet(table).createOrReplaceTempView(table)
}
private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
// partitioned by last 1 digit of id column
input.write
.mode(SaveMode.Overwrite)
.orc(table)
spark.read.orc(table).createOrReplaceTempView(table)
}
private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
"V3"
)
spark.sql(s"drop table if exists $tableName")
time {
input.write
.format("carbondata")
.option("tableName", tableName)
.option("table_blocksize", "32")
.mode(SaveMode.Overwrite)
.save()
}
}
// load data into parquet, carbonV2, carbonV3
private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
val df = DataGenerator.generateDataFrame(spark, totalNum = 10 * 10 * 1000).cache
println(s"loading ${df.count} records, schema: ${df.schema}")
val table1Time = if (table1.endsWith("parquet")) {
loadParquetTable(spark, df, table1)
} else if (table1.endsWith("orc")) {
loadOrcTable(spark, df, table1)
} else {
sys.error("invalid table: " + table1)
}
val table2Time = loadCarbonTable(spark, df, table2)
println(s"load completed, time: $table1Time, $table2Time")
df.unpersist()
}
// Run all queries for the specified table
private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = {
println(s"start running queries for $tableName...")
var result: Array[Row] = null
queries.zipWithIndex.map { case (query, index) =>
val sqlText = query.sqlText.replace("$table", tableName)
print(s"running query ${index + 1}: $sqlText ")
val rt = time {
result = spark.sql(sqlText).collect()
}
println(s"=> $rt sec")
(rt, result)
}
}
private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row],
table2: String, result2: Array[Row]): Unit = {
// check result size instead of result value, because some test case include
// aggregation on double column which will give different result since carbon
// records are sorted
if (result1.length != result2.length) {
val num = index + 1
println(s"$table1 result for query $num: ")
println(s"""${result1.mkString(",")}""")
println(s"$table2 result for query $num: ")
println(s"""${result2.mkString(",")}""")
sys.error(s"result not matching for query $num (${queries(index).desc})")
}
}
// run test cases and print comparison result
private def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = new Date
// run queries on parquet and carbon
val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1)
// do GC and sleep for some time before running next table
System.gc()
Thread.sleep(1000)
System.gc()
Thread.sleep(1000)
val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2)
// check result by comparing output from parquet and carbon
table1Result.zipWithIndex.foreach { case (result, index) =>
printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2)
}
// print all response time in JSON format, so that it can be analyzed later
queries.zipWithIndex.foreach { case (query, index) =>
println("{" +
s""""query":"${index + 1}", """ +
s""""$table1 time":${table1Result(index)._1}, """ +
s""""$table2 time":${table2Result(index)._1}, """ +
s""""fetched":${table1Result(index)._2.length}, """ +
s""""type":"${query.queryType}", """ +
s""""desc":"${query.desc}", """ +
s""""date": "${formatter.format(date)}" """ +
"}"
)
}
}
def main(args: Array[String]): Unit = {
CarbonProperties.getInstance()
.addProperty("carbon.enable.vector.reader", "true")
.addProperty("enable.unsafe.sort", "true")
.addProperty("carbon.blockletgroup.size.in.mb", "32")
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val storeLocation = s"$rootPath/examples/spark2/target/store"
val master = Option(System.getProperty("spark.master"))
.orElse(sys.env.get("MASTER"))
.orElse(Option("local[8]"))
val spark = SparkSession
.builder()
.master(master.get)
.enableHiveSupport()
.config("spark.driver.host", "127.0.0.1")
.config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
.getOrCreate()
CarbonEnv.getInstance(spark)
spark.sparkContext.setLogLevel("warn")
val table1 = parquetTableName
val table2 = carbonTableName("3")
prepareTable(spark, table1, table2)
runTest(spark, table1, table2)
CarbonUtil.deleteFoldersAndFiles(new File(table1))
spark.sql(s"drop table if exists $table2")
spark.close()
}
def time(code: => Unit): Double = {
val start = System.currentTimeMillis()
code
// return time in second
(System.currentTimeMillis() - start).toDouble / 1000
}
}
// scalastyle:on println