blob: e8abeda7d591a2c46aa06420e954c88488645fba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.addsegment
import java.io.File
import java.nio.file.{Files, Paths}
import scala.io.Source
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter, Field, Schema}
class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
dropTable
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
}
test("Test add segment ") {
createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
copy(path, newPath)
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')").show()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test added segment drop") {
createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
copy(path, newPath)
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')").show()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
sql("delete from table addsegment1 where segment.id in (2)")
sql("clean files for table addsegment1")
val oldFolder = FileFactory.getCarbonFile(newPath)
assert(oldFolder.listFiles.length == 2,
"Added segment path should not be deleted physically when clean files are called")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test compact on added segment") {
createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
copy(path, newPath)
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')").show()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
sql("alter table addsegment1 compact 'major'").show()
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
sql("clean files for table addsegment1")
val oldFolder = FileFactory.getCarbonFile(newPath)
assert(oldFolder.listFiles.length == 2,
"Added segment path should not be deleted physically when clean files are called")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test compact on multiple added segments") {
createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
for (i <- 0 until 10) {
copy(path, newPath+i)
}
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
for (i <- 0 until 10) {
sql(s"alter table addsegment1 add segment options('path'='${newPath+i}', 'format'='carbon')").show()
}
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(110)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
sql("alter table addsegment1 compact 'minor'").show()
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
sql("clean files for table addsegment1")
val oldFolder = FileFactory.getCarbonFile(newPath)
assert(oldFolder.listFiles.length == 0, "Added segment path should be deleted when clean files are called")
for (i <- 0 until 10) {
FileFactory.deleteAllFilesOfDir(new File(newPath+i))
}
}
test("Test update on added segment") {
createCarbonTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
copy(path, newPath)
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')").show()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
sql("""update addsegment1 d set (d.empname) = ('ravi') where d.empname = 'arvind'""").show()
checkAnswer(sql("select count(*) from addsegment1 where empname='ravi'"), Seq(Row(2)))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test validation on added segment") {
sql("drop table if exists addsegment2")
createCarbonTable()
sql(
"""
| CREATE TABLE addsegment2 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int)
| STORED AS carbondata
""".stripMargin)
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment2") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
val newPath = storeLocation + "/" + "addsegtest"
copy(path, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
val ex = intercept[Exception] {
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')").show()
}
assert(ex.getMessage.contains("Schema is not same"))
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test added segment with different format") {
createCarbonTable()
createParquetTable()
sql("select * from addsegment2").show()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')").show()
assert(sql("select empname, designation, doj, workgroupcategory , workgroupcategoryname from addsegment1").collect().length == 20)
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), Seq(Row("arvind"),Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
sql("show segments for table addsegment1").show(100, false)
val showSeg = sql("show segments for table addsegment1").collectAsList()
val size = getDataSize(newPath)
assert(showSeg.get(0).get(6).toString.equalsIgnoreCase(size))
assert(showSeg.get(0).get(7).toString.equalsIgnoreCase("NA"))
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test update/delete blocking on mixed format segments") {
createCarbonTable()
createParquetTable()
sql("select * from addsegment2").show()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')")
val exception1 = intercept[MalformedCarbonCommandException](sql(
"""update addsegment1 d set (d.empname) = ('ravi') where d.empname = 'arvind'""").show())
assertResult("Unsupported update operation on table containing mixed format segments")(
exception1.getMessage())
val exception2 = intercept[MalformedCarbonCommandException](sql(
"delete from addsegment1 where deptno = 10"))
assertResult("Unsupported delete operation on table containing mixed format segments")(
exception2.getMessage())
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test delete by id for added segment") {
createCarbonTable()
createParquetTable
sql("select * from addsegment2").show()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')").show()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(40)))
sql("show segments for table addsegment1").show(100, false)
sql("delete from table addsegment1 where segment.id in(3)")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
sql("show segments for table addsegment1").show(100, false)
sql("delete from table addsegment1 where segment.id in(2)")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
sql("show segments for table addsegment1").show(100, false)
sql("delete from table addsegment1 where segment.id in(0,1)")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
sql("clean files for table addsegment1")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test added segment with different format more than two") {
createCarbonTable()
createParquetTable()
createOrcTable()
val newPath1 = copyseg("addsegment2", "addsegtest1")
val newPath2 = copyseg("addsegment3", "addsegtest2")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath1', 'format'='parquet')").show()
sql(s"alter table addsegment1 add segment options('path'='$newPath2', 'format'='orc')").show()
assert(sql("select empname, designation, doj, workgroupcategory , workgroupcategoryname from addsegment1").collect().length == 30)
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), Seq(Row("arvind"),Row("arvind"),Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
assert(sql("select deptname, deptno from addsegment1 where empname = 'arvind'")
.collect().length == 3)
FileFactory.deleteAllFilesOfDir(new File(newPath1))
FileFactory.deleteAllFilesOfDir(new File(newPath2))
}
test("Test added segment with different format more than two and use set segment") {
createCarbonTable()
createParquetTable()
createOrcTable()
val newPath1 = copyseg("addsegment2", "addsegtest1")
val newPath2 = copyseg("addsegment3", "addsegtest2")
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath1', 'format'='parquet')").show()
sql(s"alter table addsegment1 add segment options('path'='$newPath2', 'format'='orc')").show()
assert(sql("select empname, designation, doj, workgroupcategory , workgroupcategoryname from addsegment1").collect().length == 30)
sql("SET carbon.input.segments.default.addsegment1 = 0")
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), Seq(Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(10)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql("SET carbon.input.segments.default.addsegment1 = 0,1")
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), Seq(Row("arvind"),Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
sql("SET carbon.input.segments.default.addsegment1 = *")
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), Seq(Row("arvind"),Row("arvind"),Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
FileFactory.deleteAllFilesOfDir(new File(newPath1))
FileFactory.deleteAllFilesOfDir(new File(newPath2))
}
test("Test added segment with different format and test compaction") {
createCarbonTable()
createParquetTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')").show()
sql("alter table addsegment1 compact 'major'")
assert(sql("select empname, designation, doj, workgroupcategory , workgroupcategoryname from addsegment1").collect().length == 40)
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(40)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(40)))
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("test filter queries on mixed formats table") {
createCarbonTable()
createParquetTable()
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
val res1 = sql("select empname, deptname from addsegment1 where deptno=10")
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')")
val res2 = sql("select * from addsegment1 where deptno=10")
assert(res1.collect().length == 6)
assert(res2.collect().length == 6)
assert(sql("select empname, deptname, deptno from addsegment1 where empname = 'arvind'")
.collect().length == 2)
// For testing filter columns not in projection list
assert(sql("select deptname, deptno from addsegment1 where empname = 'arvind'")
.collect().length == 2)
assert(sql("select deptname, sum(salary) from addsegment1 where empname = 'arvind' group by deptname").collect().length == 1)
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("Test show segments for added segment with different format") {
createCarbonTable()
createParquetTable()
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='PARQUET')").show()
checkExistence(sql(s"show segments for table addsegment1"), true, "spark/target/warehouse/addsegtest")
checkExistence(sql(s"show history segments for table addsegment1"), true, "spark/target/warehouse/addsegtest")
FileFactory.deleteAllFilesOfDir(new File(newPath))
}
test("test parquet table") {
sql("drop table if exists addSegCar")
sql("drop table if exists addSegPar")
sql("drop table if exists addSegParless")
sql("drop table if exists addSegParmore")
sql("create table addSegCar(a int, b string) STORED AS carbondata")
sql("create table addSegPar(a int, b string) using parquet")
sql("create table addSegParless(a int) using parquet")
sql("create table addSegParmore(a int, b string, c string) using parquet")
sql("insert into addSegCar values (1,'a')")
sql("insert into addSegPar values (2,'b')")
sql("insert into addSegParless values (3)")
sql("insert into addSegParmore values (4,'c', 'x')")
val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegPar"))
val table2 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegParless"))
val table3 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegParmore"))
sql(s"alter table addSegCar add segment options('path'='${table1.location}', 'format'='parquet')")
intercept[Exception] {
sql(s"alter table addSegCar add segment options('path'='${table2.location}', 'format'='parquet')")
}
sql(s"alter table addSegCar add segment options('path'='${table3.location}', 'format'='parquet')")
assert(sql("select * from addSegCar").collect().length == 3)
sql("drop table if exists addSegCar")
sql("drop table if exists addSegPar")
sql("drop table if exists addSegParless")
sql("drop table if exists addSegParmore")
}
test("test add segment partition table") {
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("drop table if exists orc_table")
sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)")
sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata")
sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
sql("select * from parquet_table").show
val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("parquet_table")).location
// add data from parquet table to carbon table
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')")
checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
// load new data into carbon table
sql("insert into carbon_table select * from parquet_table")
checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table union all select * from parquet_table"))
// add another data from orc table to carbon table
sql("create table orc_table(value int, name string, age int) using orc partitioned by (name, age)")
sql("insert into orc_table values (30, 'orc', 50), (40, 'orc', 13)")
sql("insert into orc_table values (30, 'fast', 10), (10, 'fast', 13)")
val orcRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("orc_table")).location
sql(s"alter table carbon_table add segment options ('path'='$orcRootPath', 'format'='orc', 'partition'='name:string,age:int')")
checkAnswer(sql("select * from carbon_table"),
sql("select * from parquet_table " +
"union all select * from parquet_table " +
"union all select * from orc_table"))
// filter query on partition column
checkAnswer(sql("select count(*) from carbon_table where name = 'amy'"), Row(4))
// do compaction
sql("alter table carbon_table compact 'major'")
checkAnswer(sql("select * from carbon_table"),
sql("select * from parquet_table " +
"union all select * from parquet_table " +
"union all select * from orc_table"))
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("drop table if exists orc_table")
}
test("show segment after add segment to partition table") {
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)")
sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata")
sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
sql("select * from parquet_table").show
val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("parquet_table")).location
// add data from parquet table to carbon table
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')")
checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table"))
// test show segment
checkExistence(sql(s"show segments for table carbon_table"), true, "spark/target/warehouse/parquet_table")
checkExistence(sql(s"show history segments for table carbon_table"), true, "spark/target/warehouse/parquet_table")
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
}
test("test add segment partition table, missing partition option") {
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name, age)")
sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata")
sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
sql("select * from parquet_table").show
val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("parquet_table")).location
// add data from parquet table to carbon table
val exception = intercept[AnalysisException](
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet')")
)
assert(exception.message.contains("partition option is required"))
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
}
test("test add segment partition table, unmatched partition") {
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("create table parquet_table(value int, name string, age int) using parquet partitioned by (name)")
sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata")
sql("insert into parquet_table values (30, 'amy', 12), (40, 'bob', 13)")
sql("insert into parquet_table values (30, 'amy', 20), (10, 'bob', 13)")
sql("insert into parquet_table values (30, 'cat', 12), (40, 'dog', 13)")
sql("select * from parquet_table").show
val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("parquet_table")).location
// add data from parquet table to carbon table
// unmatched partition
var exception = intercept[AnalysisException](
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string')")
)
assert(exception.message.contains("Partition is not same"))
// incorrect partition option
exception = intercept[AnalysisException](
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')")
)
assert(exception.message.contains("input segment path does not comply to partitions in carbon table"))
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
}
test("test add segment partition table, incorrect partition") {
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
sql("create table parquet_table(value int) using parquet")
sql("create table carbon_table(value int) partitioned by (name string, age int) stored as carbondata")
sql("insert into parquet_table values (30), (40)")
sql("select * from parquet_table").show
val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("parquet_table")).location
// add data from parquet table to carbon table
// incorrect partition option
val exception = intercept[RuntimeException](
sql(s"alter table carbon_table add segment options ('path'='$parquetRootPath', 'format'='parquet', 'partition'='name:string,age:int')")
)
assert(exception.getMessage.contains("invalid partition path"))
sql("drop table if exists parquet_table")
sql("drop table if exists carbon_table")
}
private def copyseg(tableName: String, pathName: String): String = {
val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier(tableName))
val path1 = table1.location
val newPath1 = storeLocation + "/" + pathName
FileFactory.deleteAllFilesOfDir(new File(newPath1))
copy(path1.toString, newPath1)
newPath1
}
test("Test add segment by carbon written by sdk") {
val tableName = "add_segment_test"
sql(s"drop table if exists $tableName")
sql(
s"""
| CREATE TABLE $tableName (empno int, empname string, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int)
| STORED AS carbondata
|""".stripMargin)
val externalSegmentPath = storeLocation + "/" + "external_segment"
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
// write into external segment folder
val schemaFilePath = s"$storeLocation/$tableName/Metadata/schema"
val writer = CarbonWriter.builder
.outputPath(externalSegmentPath)
.withSchemaFile(schemaFilePath)
.writtenBy("AddSegmentTestCase")
.withCsvInput()
.build()
val source = Source.fromFile(s"$resourcesPath/data.csv")
var count = 0
for (line <- source.getLines()) {
if (count != 0) {
writer.write(line.split(","))
}
count = count + 1
}
writer.close()
sql(s"alter table $tableName add segment options('path'='$externalSegmentPath', 'format'='carbon')").show()
checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(10)))
sql(s"select * from $tableName").show()
expectSameResultBySchema(externalSegmentPath, schemaFilePath, tableName)
expectSameResultInferSchema(externalSegmentPath, tableName)
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
sql(s"drop table $tableName")
}
/**
* use sdk to read the specified path using specified schema file
* and compare result with select * from tableName
*/
def expectSameResultBySchema(pathToRead: String, schemaFilePath: String, tableName: String): Unit = {
val tableRows = sql(s"select * from $tableName").collectAsList()
val projection = Seq("empno", "empname", "designation", "doj",
"workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
"projectcode", "projectjoindate", "projectenddate", "attendance",
"utilization", "salary").toArray
val reader = CarbonReader.builder(pathToRead)
.withRowRecordReader()
.withReadSupport(classOf[CarbonRowReadSupport])
.projection(projection)
.build()
var count = 0
while (reader.hasNext) {
val row = reader.readNextRow.asInstanceOf[CarbonRow]
val tableRow = tableRows.get(count)
var columnIndex = 0
for (column <- row.getData) {
val tableRowColumn = tableRow.get(columnIndex)
Assert.assertEquals(s"cell[$count, $columnIndex] not equal", tableRowColumn.toString, column.toString)
columnIndex = columnIndex + 1
}
count += 1
}
reader.close()
}
/**
* use sdk to read the specified path by inferring schema
* and compare result with select * from tableName
*/
def expectSameResultInferSchema(pathToRead: String, tableName: String): Unit = {
val tableRows = sql(s"select * from $tableName").collectAsList()
val projection = Seq("empno", "empname", "designation", "doj",
"workgroupcategory", "workgroupcategoryname", "deptno", "deptname",
"projectcode", "projectjoindate", "projectenddate", "attendance",
"utilization", "salary").toArray
val reader = CarbonReader.builder(pathToRead)
.withRowRecordReader()
.withReadSupport(classOf[CarbonRowReadSupport])
.projection(projection)
.build()
var count = 0
while (reader.hasNext) {
val row = reader.readNextRow.asInstanceOf[CarbonRow]
val tableRow = tableRows.get(count)
var columnIndex = 0
for (column <- row.getData) {
val tableRowColumn = tableRow.get(columnIndex)
Assert.assertEquals(s"cell[$count, $columnIndex] not equal", tableRowColumn.toString, column.toString)
columnIndex = columnIndex + 1
}
count += 1
}
reader.close()
}
test("Test add segment by carbon written by sdk, and 1 load") {
val tableName = "add_segment_test"
sql(s"drop table if exists $tableName")
sql(
s"""
| CREATE TABLE $tableName (empno int, empname string, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int)
| STORED AS carbondata
|""".stripMargin)
sql(
s"""
|LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE $tableName
|OPTIONS('DELIMITER'=',', 'QUOTECHAR'='"')
|""".stripMargin)
val externalSegmentPath = storeLocation + "/" + "external_segment"
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
var fields: Array[Field] = new Array[Field](14)
fields(0) = new Field("empno", DataTypes.INT)
fields(1) = new Field("empname", DataTypes.STRING)
fields(2) = new Field("designation", DataTypes.STRING)
fields(3) = new Field("doj", DataTypes.TIMESTAMP)
fields(4) = new Field("workgroupcategory", DataTypes.INT)
fields(5) = new Field("workgroupcategoryname", DataTypes.STRING)
fields(6) = new Field("deptno", DataTypes.INT)
fields(7) = new Field("deptname", DataTypes.STRING)
fields(8) = new Field("projectcode", DataTypes.INT)
fields(9) = new Field("projectjoindate", DataTypes.TIMESTAMP)
fields(10) = new Field("projectenddate", DataTypes.DATE)
fields(11) = new Field("attendance", DataTypes.INT)
fields(12) = new Field("utilization", DataTypes.INT)
fields(13) = new Field("salary", DataTypes.INT)
// write into external segment folder
val writer = CarbonWriter.builder
.outputPath(externalSegmentPath)
.writtenBy("AddSegmentTestCase")
.withCsvInput(new Schema(fields))
.build()
val source = Source.fromFile(s"$resourcesPath/data.csv")
var count = 0
for (line <- source.getLines()) {
if (count != 0) {
writer.write(line.split(","))
}
count = count + 1
}
writer.close()
sql(s"alter table $tableName add segment options('path'='$externalSegmentPath', 'format'='carbon')").show()
checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(20)))
checkAnswer(sql(s"select count(*) from $tableName where empno = 11"), Seq(Row(2)))
checkAnswer(sql(s"select sum(empno) from $tableName where empname = 'arvind' "), Seq(Row(22)))
FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
sql(s"drop table $tableName")
}
def copy(oldLoc: String, newLoc: String): Unit = {
val oldFolder = FileFactory.getCarbonFile(oldLoc)
FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
val oldFiles = oldFolder.listFiles
for (file <- oldFiles) {
Files.copy(Paths.get(file.getParentFile.getPath, file.getName), Paths.get(newLoc, file.getName))
}
}
def getDataSize(path: String): String = {
val allFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.endsWith(CarbonCommonConstants.PARQUET_FILE_EXT)
}
})
var size: Long = 0
for (file <- allFiles) {
size += file.getSize
}
Strings.formatSize(size.toFloat)
}
def createCarbonTable() = {
sql("drop table if exists addsegment1")
sql(
"""
| CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int, empno int)
| STORED AS carbondata
""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
|('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
}
def createParquetTable() = {
sql("drop table if exists addsegment2")
sql(
"""
| CREATE TABLE addsegment2 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int, empno int) using parquet
""".stripMargin)
sql(s"""insert into addsegment2 select * from addsegment1""")
}
def createOrcTable() = {
sql("drop table if exists addsegment3")
sql(
"""
| CREATE TABLE addsegment3 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int, empno int) using orc
""".stripMargin)
sql(s"""insert into addsegment3 select * from addsegment1""")
}
override def afterAll = {
defaultConfig()
sqlContext.sparkSession.conf.unset("carbon.input.segments.default.addsegment1")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
dropTable
}
def dropTable = {
sql("drop table if exists addsegment1")
sql("drop table if exists addsegment2")
sql("drop table if exists addSegCar")
sql("drop table if exists addSegPar")
sql("drop table if exists addSegParless")
sql("drop table if exists addSegParmore")
}
}