blob: b1e41ff920ba8c6698706234b902b1a49d64deff [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.carbondata.sql.commands
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
// use new database
sql("drop database if exists cache_db cascade").collect()
sql("drop database if exists cache_empty_db cascade").collect()
sql("create database cache_db").collect()
sql("create database cache_empty_db").collect()
sql("use cache_db").collect()
| CREATE TABLE cache_db.cache_1
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED AS carbondata
// bloom
sql("CREATE INDEX IF NOT EXISTS cache_1_bloom ON TABLE cache_db.cache_1(deptno) AS 'bloomfilter' ")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_1 ")
| CREATE TABLE cache_2
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED AS carbondata
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_db.cache_2 ")
sql("insert into table cache_2 select * from cache_1").collect()
| CREATE TABLE cache_3
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED AS carbondata
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE cache_3 ")
// use default database
sql("use default").collect()
| CREATE TABLE cache_4
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptno int, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| STORED AS carbondata
sql("insert into table cache_4 select * from cache_db.cache_2").collect()
// standard partition table
| CREATE TABLE cache_5
| (empno int, empname String, designation String, doj Timestamp, workgroupcategory int,
| workgroupcategoryname String, deptname String, projectcode int,
| projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,
| salary int)
| PARTITIONED BY (deptno int)
| STORED AS carbondata
"insert into table cache_5 select empno,empname,designation,doj,workgroupcategory," +
"workgroupcategoryname,deptname,projectcode,projectjoindate,projectenddate,attendance," +
"utilization,salary,deptno from cache_4").collect()
// count star to cache index
sql("select max(deptname) from cache_db.cache_1").collect()
sql("SELECT deptno FROM cache_db.cache_1 where deptno=10").collect()
sql("select count(*) from cache_db.cache_2").collect()
sql("select count(*) from cache_4").collect()
sql("select count(*) from cache_5").collect()
sql("select workgroupcategoryname,count(empname) as count from cache_4 group by workgroupcategoryname").collect()
test("test drop cache invalidation in case of invalid segments"){
sql(s"CREATE TABLE empTable(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) STORED AS carbondata")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE empTable")
sql("select count(*) from empTable").show()
var showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("3/3 index files cached"))
sql("delete from table empTable where in(0)").show()
// check whether count(*) query invalidates the cache for the invalid segments
sql("select count(*) from empTable").show()
showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
sql("delete from table empTable where in(1)").show()
// check whether select * query invalidates the cache for the invalid segments
sql("select * from empTable").show()
showCache = sql("SHOW METACACHE on table empTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
test("test external table show cache") {
sql(s"CREATE TABLE employeeTable(empno int, empname String, designation String, " +
s"doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " +
s"deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp," +
s"attendance int, utilization int, salary int) STORED AS carbondata")
sql(s"LOAD DATA INPATH '$resourcesPath/data.csv' INTO TABLE employeeTable")
val table = CarbonEnv.getCarbonTable(Some("default"), "employeeTable")(sqlContext.sparkSession)
val location = FileFactory
table.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "/Fact/Part0/Segment_0")
sql(s"CREATE EXTERNAL TABLE extTable stored as carbondata LOCATION '${location}'")
sql("select * from extTable").show()
val rows = sql("SHOW METACACHE ON TABLE extTable").collect()
var isPresent = false
rows.foreach(row => {
if (row.getString(2).equalsIgnoreCase("1/1 index files cached (external table)")){
isPresent = true
override protected def afterAll(): Unit = {
sql("use default").collect()
private def dropTable = {
sql("DROP TABLE IF EXISTS cache_db.cache_1")
sql("DROP TABLE IF EXISTS cache_db.cache_2")
sql("DROP TABLE IF EXISTS cache_db.cache_3")
sql("DROP TABLE IF EXISTS default.cache_4")
sql("DROP TABLE IF EXISTS default.cache_5")
sql("DROP TABLE IF EXISTS empTable")
sql("DROP TABLE IF EXISTS employeeTable")
sql("DROP TABLE IF EXISTS extTable")
sql("drop table if exists carbonTable")
test("show cache") {
// Empty database
sql("use cache_empty_db").collect()
val result1 = sql("show metacache").collect()
// Database with 3 tables but only 2 are in cache
sql("use cache_db").collect()
val result2 = sql("show metacache").collect()
// Make sure PreAgg tables are not in SHOW METADATA
sql("use default").collect()
val result3 = sql("show metacache").collect()
val indexCacheInfo = result3
.map(row => row.getString(1))
.filter(table => table.equals("cache_4_cache_4_count"))
test("show metacache on table") {
sql("use cache_db").collect()
// Table with Index & Bloom filter
val result1 = sql("show metacache on table cache_1").collect()
assertResult("1/1 index files cached")(result1(0).getString(2))
// Table with Index
val result2 = sql("show metacache on table cache_db.cache_2").collect()
assertResult("2/2 index files cached")(result2(0).getString(2))
// Table not in cache
checkAnswer(sql("show metacache on table cache_db.cache_3"),
Seq(Row("Index", "0 B", "0/1 index files cached", "DRIVER")))
sql("use default").collect()
// Table with 5 index files
val result5 = sql("show metacache on table cache_5").collect()
assertResult("5/5 index files cached")(result5(0).getString(2))
test("test index files cached for table with single partition") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')")
sql("insert into partitionTable values(1,'cc','dd')")
sql("insert into partitionTable values(2,'aa','bb')")
sql("insert into partitionTable values(1,'aa','ee')")
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb")))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
checkAnswer(sql("select * from partitionTable where col3='ee'"), Seq(Row(1,"aa","ee")))
showCache = sql("SHOW METACACHE on table partitionTable").collect()
result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index =>
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb") ||
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=ee") &&
result.size == 3))
assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files cached"))
sql("drop table if exists partitionTable")
test("test index files cached for table with multiple partition") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string, col4 string, col5 int) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb','cc',1),(1,'aa1','bb1','ff',3)")
sql("insert into partitionTable values(1,'cc','dd','ff',3)")
sql("insert into partitionTable values(2,'aa','bb','gg',2)")
sql("insert into partitionTable values(1,'aa','ee','kk',4)")
checkAnswer(sql("select * from partitionTable where col3='bb' and col4='cc'"), Seq(Row(1,"aa","bb","cc",1)))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")) && result.size == 1)
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/5 index files cached"))
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb","cc",1),Row(2,"aa","bb","gg",2)))
showCache = sql("SHOW METACACHE on table partitionTable").collect()
result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")||
index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=gg")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
sql("drop table if exists partitionTable")
test("test index files cached for table with partition without filter") {
sql("drop table if exists partitionTable")
sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata")
sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')")
sql("insert into partitionTable values(1,'cc','dd')")
sql("insert into partitionTable values(2,'aa','bb')")
sql("insert into partitionTable values(1,'aa','ee')")
checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb")))
var showCache = sql("SHOW METACACHE on table partitionTable").collect()
val tableIdentifier = new TableIdentifier("partitionTable", Some("default"))
val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath
var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath))
assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2)
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached"))
sql("select * from partitionTable").collect()
showCache = sql("SHOW METACACHE on table partitionTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("5/5 index files cached"))
sql("drop table if exists partitionTable")
test("test cache expiration using expiringMap") {
sql("drop table if exists carbonTable")
sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata tblproperties('index_cache_expiration_seconds'='1')")
sql("insert into carbonTable select 1, 'ab', 'vf'")
checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(1)))
var showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
sql("drop table if exists carbonTable")
test("test cache expiration using expiringMap with bloom") {
sql("drop table if exists carbonTable")
sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
sql("insert into carbonTable select 1, 'ab', 'vf'")
sql("drop index if exists cache_2_bloom on carbonTable")
sql("CREATE INDEX IF NOT EXISTS cache_2_bloom ON TABLE carbonTable (col3) AS 'bloomfilter' ")
checkAnswer(sql("select count(*) from carbonTable where col3='vf'"), Seq(Row(1)))
var showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache.length == 1)
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
sql("drop table if exists carbonTable")
test("test cache expiration using expiringMap with invalid cache expiration time") {
sql("drop table if exists carbonTable")
intercept[MalformedCarbonCommandException] {
sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
}.getMessage.contains("Invalid cache_expiration_time value found: ab")
test("test cache expiration using expiringMap with alter set and unset tblproperties") {
sql("drop table if exists carbonTable")
sql("create table carbonTable(col1 int, col2 string,col3 string) stored as carbondata " +
sql("insert into carbonTable select 1, 'ab', 'vf'")
// check cache expiration with 10 seconds
checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(1)))
var showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
// check cache expiration with 3 seconds with alter set
sql("alter table carbontable set tblproperties('index_cache_expiration_seconds'='3')")
sql("insert into carbonTable select 1, 'ab', 'vf'")
checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/2 index files cached"))
// revert cache expiration property
sql("ALTER TABLE carbonTable UNSET TBLPROPERTIES('index_cache_expiration_seconds')")
checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
showCache = sql("show metacache on table carbonTable").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("2/2 index files cached"))
sql("drop table if exists carbonTable")
test("test cache expiration using expiringMap more than one table") {
sql("drop table if exists carbonTable1")
sql("create table carbonTable1(col1 int, col2 string,col3 string) stored as carbondata " +
sql("drop table if exists carbonTable2")
sql("create table carbonTable2(col1 int, col2 string,col3 string) stored as carbondata " +
sql("insert into carbonTable1 select 1, 'ab', 'vf'")
sql("insert into carbonTable2 select 1, 'ab', 'vf'")
checkAnswer(sql("select count(*) from carbonTable1"), Seq(Row(1)))
checkAnswer(sql("select count(*) from carbonTable2"), Seq(Row(1)))
var showCache = sql("show metacache on table carbonTable2").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("0/1 index files cached"))
showCache = sql("show metacache on table carbonTable1").collect()
assert(showCache(0).get(2).toString.equalsIgnoreCase("1/1 index files cached"))
sql("drop table if exists carbonTable1")
sql("drop table if exists carbonTable2")