package org.apache.carbondata.examples
import scala.util.Random
import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.carbondata.examples.PerfTest._
import org.apache.carbondata.examples.util.ExampleUtils
// scalastyle:off println
* represent one query
class Query(val queryType: String, val queryNo: Int, val sqlString: String) {
* run the query in a batch and calculate average time
* @param sqlContext context to run the query
* @param runs run how many time
* @param datasource datasource to run
def run(sqlContext: SQLContext, runs: Int, datasource: String): QueryResult = {
// run repeated and calculate average time elapsed
require(runs >= 1)
val sqlToRun = makeSQLString(datasource)
val firstTime = withTime {
var totalTime: Long = 0
var result: Array[Row] = null
(1 to (runs - 1)).foreach { x =>
totalTime += withTime {
result = sqlContext.sql(sqlToRun).collect
val avgTime = totalTime / (runs - 1)
QueryResult(datasource, result, avgTime, firstTime)
private def makeSQLString(datasource: String): String = {
sqlString.replaceFirst("tableName", PerfTest.makeTableName(datasource))
* query performance result
case class QueryResult(datasource: String, result: Array[Row], avgTime: Long, firstTime: Long)
class QueryRunner(sqlContext: SQLContext, dataFrame: DataFrame, datasources: Seq[String]) {
* run a query on each datasource
def run(query: Query, runs: Int): Seq[QueryResult] = {
var results = Seq[QueryResult]()
datasources.foreach { datasource =>
val result =, runs, datasource)
results :+= result
private def checkResult(results: Seq[QueryResult]): Unit = {
results.foldLeft(results.head) { (last, cur) =>
if (last.result.sortBy(_.toString()).sameElements(cur.result.sortBy(_.toString()))) cur
else sys.error(s"result is not the same between " +
s"${last.datasource} and " +
private def loadToNative(datasource: String): Unit = {
val savePath = PerfTest.savePath(datasource)
println(s"loading data into $datasource, path: $savePath")
* load data to each datasource
def loadData: Seq[QueryResult] = {
// load data into all datasources
var results = Seq[QueryResult]()
datasources.foreach { datasource =>
val time = withTime {
datasource match {
case "parquet" =>
dataFrame.sqlContext.setConf(s"spark.sql.$datasource.compression.codec", "snappy")
case "orc" =>
dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("orc.compress", "SNAPPY")
case "carbon" =>
sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName(datasource)}")
println(s"loading data into $datasource, path: " +
.option("tableName", PerfTest.makeTableName(datasource))
case _ => sys.error("unsupported data source")
println(s"load data into $datasource completed, time taken ${time/1000000}ms")
results :+= QueryResult(datasource, null, time, time)
def shutDown(): Unit = {
// drop all tables and temp files
datasources.foreach {
case datasource @ ("parquet" | "orc") =>
val f = new File(PerfTest.savePath(datasource))
if (f.exists()) f.delete()
case "carbon" =>
sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName("carbon")}")
case _ => sys.error("unsupported data source")
* template for table data generation
* @param dimension number of dimension columns and their cardinality
* @param measure number of measure columns
case class TableTemplate(dimension: Seq[(Int, Int)], measure: Int)
* utility to generate random data according to template
class TableGenerator(sqlContext: SQLContext) {
* generate a dataframe from random data
def genDataFrame(template: TableTemplate, rows: Int): DataFrame = {
val measures = template.measure
val dimensions = template.dimension.foldLeft(0) {(x, y) => x + y._1}
val cardinality = template.dimension.foldLeft(Seq[Int]()) {(x, y) =>
x ++ (1 to y._1).map(z => y._2)
print(s"generating data: $rows rows of $dimensions dimensions and $measures measures. ")
println("cardinality for each dimension: " + cardinality.mkString(", "))
val dimensionFields = (1 to dimensions).map { id =>
DataTypes.createStructField(s"c$id", DataTypes.StringType, false)
val measureFields = (dimensions + 1 to dimensions + measures).map { id =>
DataTypes.createStructField(s"c$id", DataTypes.IntegerType, false)
val schema = StructType(dimensionFields ++ measureFields)
val data = sqlContext.sparkContext.parallelize(1 to rows).map { x =>
val random = new Random()
val dimSeq = (1 to dimensions).map { y =>
s"P${y}_${random.nextInt(cardinality(y - 1))}"
val msrSeq = (1 to measures).map { y =>
Row.fromSeq(dimSeq ++ msrSeq)
val df = sqlContext.createDataFrame(data, schema)
object PerfTest {
private val olap: Seq[String] = Seq(
"""SELECT c3, c4, sum(c8) FROM tableName
|WHERE c1 = 'P1_23' and c2 = 'P2_43'
|GROUP BY c3, c4""".stripMargin,
"""SELECT c2, c3, sum(c9) FROM tableName
|WHERE c1 = 'P1_432' and c4 = 'P4_3' and c5 = 'P5_2'
|GROUP by c2, c3 """.stripMargin,
"""SELECT c2, count(distinct c1), sum(c8) FROM tableName
|WHERE c3="P3_4" and c5="P5_4"
|GROUP BY c2 """.stripMargin,
"""SELECT c2, c5, count(distinct c1), sum(c7) FROM tableName
|WHERE c4="P4_4" and c5="P5_7" and c8>4
|GROUP BY c2, c5 """.stripMargin
private val point: Seq[String] = Seq(
"""SELECT c4 FROM tableName
|WHERE c1="P1_43" """.stripMargin,
"""SELECT c3 FROM tableName
|WHERE c1="P1_542" and c2="P2_23" """.stripMargin,
"""SELECT c3, c5 FROM tableName
|WHERE c1="P1_52" and c7=4""".stripMargin,
"""SELECT c4, c9 FROM tableName
|WHERE c1="P1_43" and c8<3""".stripMargin
private val filter: Seq[String] = Seq(
"""SELECT * FROM tableName
|WHERE c2="P2_43" """.stripMargin,
"""SELECT * FROM tableName
|WHERE c3="P3_3" """.stripMargin,
"""SELECT * FROM tableName
|WHERE c2="P2_32" and c3="P3_23" """.stripMargin,
"""SELECT * FROM tableName
|WHERE c3="P3_28" and c4="P4_3" """.stripMargin
private val scan: Seq[String] = Seq(
"""SELECT sum(c7), sum(c8), avg(c9), max(c10) FROM tableName """.stripMargin,
"""SELECT sum(c7) FROM tableName
|WHERE c2="P2_32" """.stripMargin,
"""SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
|WHERE c4="P4_4" """.stripMargin,
"""SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName
|WHERE c2="P2_75" and c6<5 """.stripMargin
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("PerfTest")
// prepare performance queries
var workload = Seq[Query]()
olap.zipWithIndex.foreach(x => workload :+= new Query("OLAP Query", x._2, x._1))
point.zipWithIndex.foreach(x => workload :+= new Query("Point Query", x._2, x._1))
filter.zipWithIndex.foreach(x => workload :+= new Query("Filter Query", x._2, x._1))
scan.zipWithIndex.foreach(x => workload :+= new Query("Scan Query", x._2, x._1))
// prepare data
val rows = 3 * 1000 * 1000
val dimension = Seq((1, 1 * 1000), (1, 100), (1, 50), (2, 10)) // cardinality for each column
val measure = 5 // number of measure
val template = TableTemplate(dimension, measure)
val df = new TableGenerator(cc).genDataFrame(template, rows)
println("generate data completed")
// run all queries against all data sources
val datasource = Seq("parquet", "orc", "carbon")
val runner = new QueryRunner(cc, df, datasource)
val results = runner.loadData
println(s"load performance: ${ / 1000000L).mkString(", ")}")
var parquetTime: Double = 0
var orcTime: Double = 0
var carbonTime: Double = 0
println(s"query id: ${datasource.mkString(", ")}, result in millisecond")
workload.foreach { query =>
// run 4 times each round, will print performance of first run and avg time of last 3 runs
print(s"${query.queryType} ${query.queryNo}: ")
val results =, 4)
print(s"${ / 1000000L).mkString(", ")} ")
println(s"[sql: ${query.sqlString.replace('\n', ' ')}]")
parquetTime += results(0).avgTime
orcTime += results(1).avgTime
carbonTime += results(2).avgTime
println(s"Total time: ${parquetTime / 1000000}, ${orcTime / 1000000}, " +
s"${carbonTime / 1000000} = 1 : ${parquetTime / orcTime} : ${parquetTime / carbonTime}")
def makeTableName(datasource: String): String = {
def savePath(datasource: String): String =
def withTime(body: => Unit): Long = {
val start = System.nanoTime()
System.nanoTime() - start
// scalastyle:on println