blob: a683eed742fe063e37319e8644be3b0b6647165c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.sql.commands
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
// use new database
sql("drop database if exists cache_db cascade").collect()
sql("drop database if exists cache_empty_db cascade").collect()
sql("create database cache_db").collect()
sql("create database cache_empty_db").collect()
dropTable
sql("use cache_db").collect()
sql(
"""
| CREATE TABLE cache_db.cache_1
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
// bloom
sql("CREATE DATAMAP IF NOT EXISTS cache_1_bloom ON TABLE cache_db.cache_1 USING 'bloomfilter' " +
"DMPROPERTIES('INDEX_COLUMNS'='deptno')")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_1 ")
sql(
"""
| CREATE TABLE cache_2
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_db.cache_2 ")
sql("insert into table cache_2 select * from cache_1").collect()
sql(
"""
| CREATE TABLE cache_3
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_3 ")
// use default database
sql("use default").collect()
sql(
"""
| CREATE TABLE cache_4
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql("insert into table cache_4 select * from cache_db.cache_2").collect()
// standard partition table
sql(
"""
| CREATE TABLE cache_5
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| PARTITIONED BY (deptno int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(
"insert into table cache_5 select empno,empname,designation,doj,workgroupcategory," +
"workgroupcategoryname,deptname,projectcode,projectjoindate,projectenddate,attendance," +
"utilization,salary,deptno from cache_4").collect()
// count star to cache index
sql("select max(deptname) from cache_db.cache_1").collect()
sql("SELECT deptno FROM cache_db.cache_1 where deptno=10").collect()
sql("select count(*) from cache_db.cache_2").collect()
sql("select count(*) from cache_4").collect()
sql("select count(*) from cache_5").collect()
sql("select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname").collect()
}
test("test drop cache invalidation in case of invalid segments"){
sql(s"CREATE TABLE empTable(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql("select count(*) from empTable").show()
var showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("3/3 index files cached"))
sql("delete from table empTable where segment.id in(0)").show()
// check whether count(*) query invalidates the cache for the invalid segments
sql("select count(*) from empTable").show()
showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
sql("delete from table empTable where segment.id in(1)").show()
// check whether select * query invalidates the cache for the invalid segments
sql("select * from empTable").show()
showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
}
test("test external table show cache") {
sql(s"CREATE TABLE employeeTable(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) stored by 'carbondata'")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE employeeTable")
val table = CarbonEnv.getCarbonTable(Some("default"), "employeeTable")(sqlContext.sparkSession)
val location = FileFactory
.getUpdatedFilePath(
table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "/Fact/Part0/Segment_0")
sql(s"CREATE EXTERNAL TABLE extTable stored as carbondata LOCATION '${location}'")
sql("select * from extTable").show()
val rows = sql("SHOW METACACHE ON TABLE extTable").collect()
var isPresent = false
rows.foreach(row => {
if (row.getString(2).equalsIgnoreCase("1/1 index files cached (external table)")){
isPresent = true
}
})
Assert.assertTrue(isPresent)
}
override protected def afterAll(): Unit = {
sql("use default").collect()
dropTable
}
private def dropTable = {
sql("DROP TABLE IF EXISTS cache_db.cache_1")
sql("DROP TABLE IF EXISTS cache_db.cache_2")
sql("DROP TABLE IF EXISTS cache_db.cache_3")
sql("DROP TABLE IF EXISTS default.cache_4")
sql("DROP TABLE IF EXISTS default.cache_5")
sql("DROP TABLE IF EXISTS empTable")
sql("DROP TABLE IF EXISTS employeeTable")
sql("DROP TABLE IF EXISTS extTable")
}
test("show cache") {
// Empty database
sql("use cache_empty_db").collect()
val result1 = sql("show metacache").collect()
assertResult(2)(result1.length)
assertResult(Row("cache_empty_db", "ALL", "0 B", "0 B", "0 B", "DRIVER"))(result1(1))
// Database with 3 tables but only 2 are in cache
sql("use cache_db").collect()
val result2 = sql("show metacache").collect()
assertResult(4)(result2.length)
// Make sure PreAgg tables are not in SHOW METADATA
sql("use default").collect()
val result3 = sql("show metacache").collect()
val dataMapCacheInfo = result3
.map(row => row.getString(1))
.filter(table => table.equals("cache_4_cache_4_count"))
assertResult(0)(dataMapCacheInfo.length)
}
test("show metacache on table") {
sql("use cache_db").collect()
// Table with Index, Dictionary & Bloom filter
val result1 = sql("show metacache on table cache_1").collect()
assertResult(3)(result1.length)
assertResult("1/1 index files cached")(result1(0).getString(2))
assertResult("bloomfilter")(result1(2).getString(2))
// Table with Index and Dictionary
val result2 = sql("show metacache on table cache_db.cache_2").collect()
assertResult(2)(result2.length)
assertResult("2/2 index files cached")(result2(0).getString(2))
assertResult("0 B")(result2(1).getString(1))
// Table not in cache
checkAnswer(sql("show metacache on table cache_db.cache_3"),
Seq(Row("Index", "0 B", "0/1 index files cached", "DRIVER"),
Row("Dictionary", "0 B", "", "DRIVER")))
sql("use default").collect()
// Table with 5 index files
val result5 = sql("show metacache on table cache_5").collect()
assertResult(2)(result5.length)
assertResult("5/5 index files cached")(result5(0).getString(2))
}
test("test index files cached for table with single partition") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')")
sql("insert into partitionTable values(1,'cc','dd')")
sql("insert into partitionTable values(2,'aa','bb')")
sql("insert into partitionTable values(1,'aa','ee')")
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb")))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
checkAnswer(sql("select * from partitionTable where col3='ee'"), Seq(Row(1,"aa","ee")))
showCache = sql("SHOW METACACHE on table partitionTable").collect()
result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index =>
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb") ||
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=ee") &&
result.size == 3))
assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files cached"))
sql("drop table if exists partitionTable")
}
test("test index files cached for table with multiple partition") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string, col4 string, col5 int) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb','cc',1),(1,'aa1','bb1','ff',3)")
sql("insert into partitionTable values(1,'cc','dd','ff',3)")
sql("insert into partitionTable values(2,'aa','bb','gg',2)")
sql("insert into partitionTable values(1,'aa','ee','kk',4)")
checkAnswer(sql("select * from partitionTable where col3='bb' and col4='cc'"), Seq(Row(1,"aa","bb","cc",1)))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")) && result.size == 1)
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/5 index files cached"))
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb","cc",1),Row(2,"aa","bb","gg",2)))
showCache = sql("SHOW METACACHE on table partitionTable").collect()
result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")||
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=gg")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
sql("drop table if exists partitionTable")
}
test("test index files cached for table with partition without filter") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')")
sql("insert into partitionTable values(1,'cc','dd')")
sql("insert into partitionTable values(2,'aa','bb')")
sql("insert into partitionTable values(1,'aa','ee')")
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb")))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
sql("select * from partitionTable").collect()
showCache = sql("SHOW METACACHE on table partitionTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("5/5 index files cached"))
sql("drop table if exists partitionTable")
}
}