blob: 4de0761309445eaf3b4e4accaf3d5147bd31ac4f [file] [log] [blame]
package org.apache.carbondata.spark.testsuite.segmentreading
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* Created by rahul on 19/9/17.
*/
class TestSegmentReading extends QueryTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
cleanAllTable()
sql(
"create table carbon_table(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data1.csv' INTO TABLE carbon_table OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
}
private def cleanAllTable(): Unit = {
sql("drop table if exists carbon_table")
sql("drop table if exists carbon_table_join")
sql("drop table if exists carbon_table_update")
sql("drop table if exists carbon_table_delete")
sql("drop table if exists carbon_table_show_seg")
sql("drop table if exists carbon_table_compact")
sql("drop table if exists carbon_table_alter")
sql("drop table if exists carbon_table_alter_new")
sql("drop table if exists carbon_table_recreate")
}
override def afterAll(): Unit = {
cleanAllTable()
// reset
defaultConfig()
sqlContext.sparkSession.conf.unset("carbon.input.segments.default.carbon_table")
}
test("test SET -V for segment reading property") {
sql("SET -v").show(200,false)
try {
checkExistence(sql("SET -v"), true, "Property to configure the list of segments to query.")
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test count(*) for segment reading property") {
try {
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test SET propertyname for segment reading property") {
try {
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("SET carbon.input.segments.default.carbon_table"),
Seq(Row("carbon.input.segments.default.carbon_table", "1"))
)
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("set valid segments and query from table") {
try {
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
sql("SET carbon.input.segments.default.carbon_table=*")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
sql("SET carbon.input.segments.default.carbon_table=0")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test Multiple times set segment") {
try {
sql("SET carbon.input.segments.default.carbon_table=0")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
sql("SET carbon.input.segments.default.carbon_table=1,0,1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
sql("SET carbon.input.segments.default.carbon_table=2,0")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
val trapped = intercept[Exception] {
sql("SET carbon.input.segments.default.carbon_table=2,a")
}
val trappedAgain = intercept[Exception] {
sql("SET carbon.input.segments.default.carbon_table=,")
}
assert(trapped.getMessage
.equalsIgnoreCase(
"carbon.input.segments.<database_name>.<table_name> value range is not valid"))
assert(trappedAgain.getMessage
.equalsIgnoreCase("carbon.input.segments.<database_name>.<table_name> value can't be empty."))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test filter with segment reading"){
try {
sql("SET carbon.input.segments.default.carbon_table=*")
checkAnswer(sql("select count(empno) from carbon_table where empno = 15"),Seq(Row(2)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(empno) from carbon_table where empno = 15"),Seq(Row(1)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test group by with segment reading") {
try {
sql("SET carbon.input.segments.default.carbon_table=*")
checkAnswer(sql("select empno,count(empname) from carbon_table where empno = 15 group by empno"),Seq(Row(15,2)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select empno,count(empname) from carbon_table where empno = 15 group by empno"),Seq(Row(15,1)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test join with segment reading"){
try {
sql("SET carbon.input.segments.default.carbon_table=*")
sql("drop table if exists carbon_table_join")
sql(
"create table carbon_table_join(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql("insert into carbon_table_join select * from carbon_table").show()
checkAnswer(sql("select count(a.empno) from carbon_table a inner join carbon_table_join b on a.empno = b.empno"),Seq(Row(22)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(a.empno) from carbon_table a inner join carbon_table_join b on a.empno = b.empno"),Seq(Row(11)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test aggregation with segment reading") {
try {
sql("SET carbon.input.segments.default.carbon_table=*")
checkAnswer(sql("select sum(empno) from carbon_table"), Seq(Row(1411)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select sum(empno) from carbon_table"), Seq(Row(1256)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test update query with segment reading"){
try {
sql("drop table if exists carbon_table_update")
sql(
"create table carbon_table_update(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_update OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data1.csv' INTO TABLE carbon_table_update OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql("SET carbon.input.segments.default.carbon_table=1")
intercept[MalformedCarbonCommandException]{
sql("update carbon_table_update a set(a.empname) = (select b.empname from carbon_table b where a.empno=b.empno)").show()
}
sql("SET carbon.input.segments.default.carbon_table=*")
sql("SET carbon.input.segments.default.carbon_table_update=1")
intercept[MalformedCarbonCommandException]{
sql("update carbon_table_update a set(a.empname) = (select b.empname from carbon_table b where a.empno=b.empno)").show()
}
sql("SET carbon.input.segments.default.carbon_table=*")
sql("SET carbon.input.segments.default.carbon_table_update=*")
checkAnswer(sql("select count(*) from carbon_table_update where empname='rahul'"), Seq(Row(0)))
sql("update carbon_table_update a set(a.empname) = ('rahul')").show()
checkAnswer(sql("select count(*) from carbon_table_update where empname='rahul'"), Seq(Row(20)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test delete query with segment reading"){
try {
sql("drop table if exists carbon_table_delete")
sql(
"create table carbon_table_delete(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_delete OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_delete OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql("SET carbon.input.segments.default.carbon_table=*")
sql("SET carbon.input.segments.default.carbon_table=1")
intercept[MalformedCarbonCommandException]{
sql("delete from carbon_table_delete where empno IN (select empno from carbon_table where empname='ayushi')").show()
}
sql("SET carbon.input.segments.default.carbon_table_delete=1")
intercept[MalformedCarbonCommandException]{
sql("delete from carbon_table_delete where empno IN (select empno from carbon_table where empname='ayushi')").show()
}
sql("SET carbon.input.segments.default.carbon_table=*")
sql("SET carbon.input.segments.default.carbon_table_delete=*")
checkAnswer(sql("select count(*) from carbon_table_delete"), Seq(Row(20)))
sql("delete from carbon_table_delete where empno IN (select empno from carbon_table where empname='ayushi')").show()
checkAnswer(sql("select count(*) from carbon_table_delete"), Seq(Row(18)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test show segments"){
try {
sql("drop table if exists carbon_table_show_seg")
sql(
"create table carbon_table_show_seg(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql("alter table carbon_table_show_seg compact 'major'")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
val df = sql("SHOW SEGMENTS for table carbon_table_show_seg as select * from carbon_table_show_seg_segments")
val col = df.collect().map{
row => Row(row.getString(0),row.getString(1),row.getString(7))
}.toSeq
assert(col.equals(Seq(
Row("2","Success","NA"),
Row("1","Compacted","0.1"),
Row("0.1","Success","NA"),
Row("0","Compacted","0.1")
)))
}
finally {
sql("SET carbon.input.segments.default.carbon_table=*")
}
}
test("test segment reading after compaction"){
sql("drop table if exists carbon_table_compact")
sql(
"create table carbon_table_compact(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
sql("alter table carbon_table_compact compact 'major'")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
checkAnswer(sql("select count(*) from carbon_table_compact"),Seq(Row(30)))
sql(" SET carbon.input.segments.default.carbon_table_compact=0.1")
checkAnswer(sql("select count(*) from carbon_table_compact"),Seq(Row(20)))
}
test("set segment id then alter table name and check select query") {
try {
sql("drop table if exists carbon_table_alter")
sql("drop table if exists carbon_table_alter_new")
sql(
"create table carbon_table_alter(empno int, empname String, designation String, doj " +
"Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_alter OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_alter OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
checkAnswer(sql("select count(*) from carbon_table_alter"),
Seq(Row(20)))
sql(
"SET carbon.input.segments.default.carbon_table_alter=1")
checkAnswer(sql(
"select count(*) from carbon_table_alter"), Seq(Row(10)))
sql(
"alter table carbon_table_alter rename to carbon_table_alter_new")
checkAnswer(sql(
"select count(*) from carbon_table_alter_new")
, Seq(Row(20)))
}
finally {
sql(
"SET carbon.input.segments.default.carbon_table=*")
}
}
test("drop and recreate table to check segment reading") {
try {
sql("drop table if exists carbon_table_recreate")
sql(
"create table carbon_table_recreate(empno int, empname String, designation String, doj " +
"Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
checkAnswer(sql("select count(*) from carbon_table_recreate"),
Seq(Row(20)))
sql(
"SET carbon.input.segments.default.carbon_table_recreate=1")
checkAnswer(sql(
"select count(*) from carbon_table_recreate"), Seq(Row(10)))
sql("drop table if exists carbon_table_recreate")
sql(
"create table carbon_table_recreate(empno int, empname String, designation String, doj " +
"Timestamp," +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String," +
"projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
"utilization int,salary int) STORED AS carbondata")
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
stripMargin)
checkAnswer(sql(
"select count(*) from carbon_table_recreate"), Seq(Row(10)))
}
finally {
sql(
"SET carbon.input.segments.default.carbon_table=*")
}
}
test("test with the adaptive execution") {
sql("set spark.sql.adaptive.enabled=true")
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
// segment doesn't exist
sql("SET carbon.input.segments.default.carbon_table=5")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(0)))
sql("SET carbon.input.segments.default.carbon_table=1")
checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
sql("set spark.sql.adaptive.enabled=false")
}
}