blob: 250e9a667671502cfe0d776ce46e8236d89ddf3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.carbondata.datasource
import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Random}
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.util.SparkUtil
import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataFileFooterConverter}
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import org.apache.spark.sql.Row
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndAfterAll {
override def beforeAll(): Unit = {
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
}
override def afterAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,
CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
}
var writerPath = new File(this.getClass.getResource("/").getPath
+
"../." +
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/");
def buildTestData(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"height\":\"double\"}\n")
.append("]")
.toString()
try {
val builder = CarbonWriter.builder()
val writer =
builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
var i = 0
while (i < 100) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
i += 1
}
writer.close()
} catch {
case _: Throwable => None
}
}
def cleanTestData() = {
FileUtils.deleteDirectory(new File(writerPath))
}
def deleteIndexFile(path: String, extension: String) : Unit = {
val file: CarbonFile = FileFactory
.getCarbonFile(path, FileFactory.getFileType(path))
for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
if (eachDir.getName.endsWith(extension)) {
CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
}
} else {
deleteIndexFile(eachDir.getPath, extension)
}
}
}
//TO DO, need to remove segment dependency and tableIdentifier Dependency
test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
buildTestData()
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
//data source file format
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else{
// TO DO
}
spark.sql("Describe formatted sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable limit 3").show(false)
spark.sql("select name from sdkOutputTable").show(false)
spark.sql("select age from sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false)
spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false)
spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false)
spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false)
spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false)
spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false)
spark.sql("select count(*) from sdkOutputTable").show(200,false)
spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("Running SQL directly and read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") {
buildTestData()
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
//data source file format
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else {
// TO DO
}
val directSQL = spark.sql(s"""select * FROM carbon.`$writerPath`""".stripMargin)
directSQL.show(false)
TestUtil.checkAnswer(spark.sql("select * from sdkOutputTable"), directSQL)
spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
// TODO: Make the sparkCarbonFileFormat to work without index file
test("Read sdk writer output file without Carbondata file should fail") {
buildTestData()
deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
val exception = intercept[Exception] {
// data source file format
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else{
// TO DO
}
}
assert(exception.getMessage()
.contains("CarbonData file is not present in the table location"))
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("Read sdk writer output file without any file should fail") {
buildTestData()
deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
val exception = intercept[Exception] {
//data source file format
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else{
// TO DO
}
spark.sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage()
.contains("CarbonData file is not present in the table location"))
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("Read sdk writer output file withSchema") {
buildTestData()
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
//data source file format
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else{
// TO DO
}
spark.sql("Describe formatted sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable limit 3").show(false)
spark.sql("select name from sdkOutputTable").show(false)
spark.sql("select age from sdkOutputTable").show(false)
spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false)
spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200, false)
spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false)
spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false)
spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false)
spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false)
spark.sql("select count(*) from sdkOutputTable").show(200, false)
spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("Read sdk writer output file without index file should not fail") {
buildTestData()
deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
} else{
// TO DO
}
//org.apache.spark.SparkException: Index file not present to read the carbondata file
assert(spark.sql("select * from sdkOutputTable").collect().length == 100)
assert(spark.sql("select * from sdkOutputTable where name='robot0'").collect().length == 1)
spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
cleanTestData()
}
test("Read data having multi blocklet and validate min max flag") {
buildTestDataMuliBlockLet(750000, 50000)
assert(new File(writerPath).exists())
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
//data source file format
spark.sql(s"""CREATE TABLE sdkOutputTable USING carbon OPTIONS (PATH '$writerPath') """)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable USING carbon LOCATION
|'$writerPath' """.stripMargin)
}
checkAnswer(spark.sql("select count(*) from sdkOutputTable"),Seq(Row(800000)))
checkAnswer(spark
.sql(
"select count(*) from sdkOutputTable where from_email='Email for testing min max for " +
"allowed chars'"),
Seq(Row(50000)))
//expected answer for min max flag. FInally there should be 2 blocklets with one blocklet
// having min max flag as false for email column and other as true
val blocklet1MinMaxFlag = Array(true, true, true, true, true, false, true, true, true)
val blocklet2MinMaxFlag = Array(true, true, true, true, true, true, true, true, true)
val expectedMinMaxFlag = Array(blocklet1MinMaxFlag, blocklet2MinMaxFlag)
validateMinMaxFlag(expectedMinMaxFlag, 2)
spark.sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
clearDataMapCache
cleanTestData()
}
def buildTestDataMuliBlockLet(recordsInBlocklet1 :Int, recordsInBlocklet2 :Int): Unit ={
FileUtils.deleteDirectory(new File(writerPath))
val fields=new Array[Field](8)
fields(0)=new Field("myid",DataTypes.INT)
fields(1)=new Field("event_id",DataTypes.STRING)
fields(2)=new Field("eve_time",DataTypes.DATE)
fields(3)=new Field("ingestion_time",DataTypes.TIMESTAMP)
fields(4)=new Field("alldate",DataTypes.createArrayType(DataTypes.DATE))
fields(5)=new Field("subject",DataTypes.STRING)
fields(6)=new Field("from_email",DataTypes.STRING)
fields(7)=new Field("sal",DataTypes.DOUBLE)
import scala.collection.JavaConverters._
val emailDataBlocklet1 = "FromEmail"
val emailDataBlocklet2 = "Email for testing min max for allowed chars"
try{
val options=Map("bad_records_action"->"FORCE","complex_delimiter_level_1"->"$").asJava
val writer = CarbonWriter.builder().outputPath(writerPath).withBlockletSize(16)
.sortBy(Array("myid", "ingestion_time", "event_id")).withLoadOptions(options)
.withCsvInput(new Schema(fields)).build()
val timeF=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date_F=new SimpleDateFormat("yyyy-MM-dd")
for(i<- 1 to recordsInBlocklet1){
val time=new Date(System.currentTimeMillis())
writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0",emailDataBlocklet1,""+new Random().nextDouble()))
}
for(i<- 1 to recordsInBlocklet2){
val time=new Date(System.currentTimeMillis())
writer.write(Array(""+i,"event_"+i,""+date_F.format(time),""+timeF.format(time),""+date_F.format(time)+"$"+date_F.format(time),"Subject_0",emailDataBlocklet2,""+new Random().nextDouble()))
}
writer.close()
}
}
/**
* read carbon index file and validate the min max flag written in each blocklet
*
* @param expectedMinMaxFlag
* @param numBlocklets
*/
private def validateMinMaxFlag(expectedMinMaxFlag: Array[Array[Boolean]],
numBlocklets: Int): Unit = {
val carbonFiles: Array[File] = new File(writerPath).listFiles()
val carbonIndexFile = carbonFiles.filter(file => file.getName.endsWith(".carbonindex"))(0)
val converter: DataFileFooterConverter = new DataFileFooterConverter(spark.sessionState
.newHadoopConf())
val carbonIndexFilePath = FileFactory.getUpdatedFilePath(carbonIndexFile.getCanonicalPath)
val indexMetadata: List[DataFileFooter] = converter
.getIndexInfo(carbonIndexFilePath, null, false).asScala.toList
assert(indexMetadata.size == numBlocklets)
indexMetadata.zipWithIndex.foreach { filefooter =>
val isMinMaxSet: Array[Boolean] = filefooter._1.getBlockletIndex.getMinMaxIndex.getIsMinMaxSet
assert(isMinMaxSet.sameElements(expectedMinMaxFlag(filefooter._2)))
}
}
private def clearDataMapCache(): Unit = {
if (!spark.sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(writerPath))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
}
test("Test with long string columns") {
FileUtils.deleteDirectory(new File(writerPath))
// here we specify the long string column as varchar
val schema = new StringBuilder()
.append("[ \n")
.append(" {\"name\":\"string\"},\n")
.append(" {\"address\":\"varchar\"},\n")
.append(" {\"age\":\"int\"},\n")
.append(" {\"note\":\"varchar\"}\n")
.append("]")
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath).withCsvInput(Schema.parseJson(schema)).build()
val totalRecordsNum = 3
for (i <- 0 until totalRecordsNum) {
// write a varchar with 75,000 length
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString, RandomStringUtils.randomAlphabetic(75000)))
}
writer.close()
//--------------- data source external table with schema ---------------------------
spark.sql("DROP TABLE IF EXISTS sdkOutputTable")
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address, note") """
.stripMargin)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int, note string) USING carbon
|OPTIONS("long_String_columns"="address, note") LOCATION
|'$writerPath' """.stripMargin)
}
checkAnswer(spark.sql("select count(*) from sdkOutputTable where age = 0"), Seq(Row(1)))
checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTable where length(address)=75000 and length(note)=75000)"),
Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTable")
//--------------- data source external table without schema ---------------------------
spark.sql("DROP TABLE IF EXISTS sdkOutputTableWithoutSchema")
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
spark
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
|'$writerPath', "long_String_columns" "address, note") """.stripMargin)
} else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
|("long_String_columns"="address, note") LOCATION '$writerPath' """.stripMargin)
}
checkAnswer(spark.sql("select count(*) from sdkOutputTableWithoutSchema where age = 0"), Seq(Row(1)))
checkAnswer(spark.sql("SELECT COUNT(*) FROM (select address,age,note from sdkOutputTableWithoutSchema where length(address)=75000 and length(note)=75000)"),
Seq(Row(totalRecordsNum)))
spark.sql("DROP TABLE sdkOutputTableWithoutSchema")
clearDataMapCache
cleanTestData()
}
}