blob: a30213831bf794c6d24dc44aa92d4a172edd912f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.createTable
import java.io.File
import java.util
import org.apache.avro
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.CharEncoding
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.CarbonWriter
class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with BeforeAndAfterAll {
var writerPath = new File(this.getClass.getResource("/").getPath
+
"../." +
"./target/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, but the code expects /.
writerPath = writerPath.replace("\\", "/")
def cleanTestData() = {
FileUtils.deleteDirectory(new File(writerPath))
}
override def beforeAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
override def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String, isLocalDictionary: Boolean): Unit = {
// conversion to GenericData.Record
val nn = new avro.Schema.Parser().parse(mySchema)
val record = testUtil.jsonToAvro(json, mySchema)
try {
val writer = if (isLocalDictionary) {
CarbonWriter.builder
.outputPath(writerPath).enableLocalDictionary(true)
.localDictionaryThreshold(2000)
.uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
} else {
CarbonWriter.builder
.outputPath(writerPath)
.uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
}
var i = 0
while (i < rows) {
writer.write(record)
i = i + 1
}
writer.close()
}
catch {
case e: Exception => {
e.printStackTrace()
Assert.fail(e.getMessage)
}
}
}
// test multi level -- 4 levels [array of array of array of struct]
def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String], isLocalDictionary: Boolean): Any = {
FileUtils.deleteDirectory(new File(writerPath))
val mySchema = """ {
| "name": "address",
| "type": "record",
| "fields": [
| {
| "name": "name",
| "type": "string"
| },
| {
| "name": "age",
| "type": "int"
| },
| {
| "name": "BuildNum",
| "type": {
| "type": "array",
| "items": {
| "name": "FloorNum",
| "type": "array",
| "items": {
| "name": "doorNum",
| "type": "array",
| "items": {
| "name": "my_address",
| "type": "record",
| "fields": [
| {
| "name": "street",
| "type": "string"
| },
| {
| "name": "city",
| "type": "string"
| },
| {
| "name": "Temperature",
| "type": "double"
| },
| {
| "name": "WindSpeed",
| "type": "string"
| },
| {
| "name": "year",
| "type": "string"
| }
| ]
| }
| }
| }
| }
| }
| ]
|} """.stripMargin
val json =
""" {
| "name": "bob",
| "age": 10,
| "BuildNum": [
| [
| [
| {"street":"abc", "city":"city1", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"def", "city":"city2", "Temperature":13.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"cfg", "city":"city3", "Temperature":14.6, "WindSpeed":"1234.56", "year":"2018-05-10"}
| ],
| [
| {"street":"abc1", "city":"city3", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"def1", "city":"city4", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"cfg1", "city":"city5", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}
| ]
| ],
| [
| [
| {"street":"abc2", "city":"cityx", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"abc3", "city":"cityy", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"abc4", "city":"cityz", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}
| ],
| [
| {"street":"a1bc", "city":"cityA", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"a1bc", "city":"cityB", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"},
| {"street":"a1bc", "city":"cityc", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}
| ]
| ]
| ]
|} """.stripMargin
WriteFilesWithAvroWriter(rows, mySchema, json, isLocalDictionary)
}
def buildAvroTestDataMultiLevel4Type(isLocalDictionary: Boolean): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildAvroTestDataMultiLevel4(3, null, isLocalDictionary)
}
// test multi level -- 4 levels [array of array of array of struct]
test("test multi level support : array of array of array of struct") {
buildAvroTestDataMultiLevel4Type(false)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
sql("select * from sdkOutputTable").show(false)
// TODO: Add a validation
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
cleanTestData()
}
test("test local dictionary for complex datatype") {
buildAvroTestDataMultiLevel4Type(true)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS localComplex")
sql(
s"""CREATE EXTERNAL TABLE localComplex STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
assert(FileFactory.getCarbonFile(writerPath).exists())
assert(testUtil.checkForLocalDictionary(testUtil.getDimRawChunk(0,writerPath)))
sql("describe formatted localComplex").show(30, false)
val descLoc = sql("describe formatted localComplex").collect
descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
case Some(row) => assert(row.get(1).toString.contains("true"))
case None => assert(false)
}
// TODO: Add a validation
sql("DROP TABLE localComplex")
// drop table should not delete the files
cleanTestData()
}
test("test multi level support : array of array of array of with Double data type") {
cleanTestData()
val mySchema = """ {
| "name": "address",
| "type": "record",
| "fields": [
| {
| "name": "name",
| "type": "string"
| },
| {
| "name": "age",
| "type": "int"
| },
| {
| "name" :"my_address",
| "type" :{
| "name": "my_address",
| "type": "record",
| "fields": [
| {
| "name": "Temperaturetest",
| "type": "double"
| }
| ]
| }
| }
| ]
|} """.stripMargin
val jsonvalue=
"""{
|"name" :"babu",
|"age" :12,
|"my_address" :{ "Temperaturetest" :123 }
|}
""".stripMargin
val pschema= org.apache.avro.Schema.parse(mySchema)
val records = testUtil.jsonToAvro(jsonvalue, mySchema)
val writer = CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema).writtenBy("TestNonTransactionalCarbonTableWithComplexType").build()
writer.write(records)
writer.close()
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
sql("select * from sdkOutputTable").show(false)
// TODO: Add a validation
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
cleanTestData()
}
// test multi level -- 4 levels [array of array of array of struct]
test("test ComplexDataType projection for array of array of array of struct") {
buildAvroTestDataMultiLevel4Type(false)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select BuildNum[0][0][0].street from sdkOutputTable"),
Seq(Row("abc"), Row("abc"), Row("abc")))
checkAnswer(sql("select BuildNum[1][0][0].street from sdkOutputTable"),
Seq(Row("abc2"), Row("abc2"), Row("abc2")))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
cleanTestData()
}
def buildAvroTestDataMultiLevel6Type(isLocalDictionary: Boolean): Any = {
FileUtils.deleteDirectory(new File(writerPath))
buildAvroTestDataMultiLevel6(1, null, isLocalDictionary)
}
// test multi level -- 6 levels
def buildAvroTestDataMultiLevel6(rows: Int, options: util.Map[String, String], isLocalDictionary: Boolean): Any = {
FileUtils.deleteDirectory(new File(writerPath))
val mySchema =
""" {
|"type": "record",
| "name": "UserInfo",
| "namespace": "com.apache.schema.schemalevel6_struct",
| "fields": [
| {
| "name": "username",
| "type": "string",
| "default": "NONE"
| },
| {
| "name": "age",
| "type": "int",
| "default": -1
| },
| {
| "name": "phone",
| "type": "string",
| "default": "NONE"
| },
| {
| "name": "housenum",
| "type": "string",
| "default": "NONE"
| },
| {
| "name": "address",
| "type": {
| "type": "record",
| "name": "Mailing_Address",
| "fields": [
| {
| "name": "Address_Detail",
| "type": {
| "type": "record",
| "name": "Address_Detail",
| "fields": [
| {
| "name": "Building_Detail",
| "type": {
| "type": "record",
| "name": "Building_Address",
| "fields": [
| {
| "name": "Society_name",
| "type": "string"
| },
| {
| "name": "building_no",
| "type": "string"
| },
| {
| "name": "house_no",
| "type": "int"
| },
| {
| "name": "Building_Type",
| "type": {
| "type": "record",
| "name": "Building_Type",
| "fields": [
| {
| "name":"Buildingname",
| "type":"string"
| },
| {
| "name":"buildingArea",
| "type":"int"
| },
| {
| "name":"Building_Criteria",
| "type":{
| "type":"record",
| "name":"BuildDet",
| "fields":[
| {
| "name":"f1",
| "type":"int"
| },
| {
| "name":"f2",
| "type":"string"
| },
| {
| "name":"BuildDetInner",
| "type":
| {
| "type":"record",
| "name":"BuildInner",
| "fields":[
| {
| "name": "duplex",
| "type": "boolean"
| },
| {
| "name": "Price",
| "type": "int"
| },
| {
| "name": "TotalCost",
| "type": "int"
| },
| {
| "name": "Floor",
| "type": "int"
| },
| {
| "name": "PhoneNo",
| "type": "long"
| },
| {
| "name": "value",
| "type": "string"
| }
| ]
| }
| }
| ]
| }
| }
| ]
| }
| }
| ]
| }
| }
| ]
| }
| }
| ]
| }
| }
| ]
|} """.stripMargin
val json =
""" {
|"username": "DON",
|"age": 21,
|"phone": "9888",
|"housenum": "44",
|"address": {
|"Address_Detail": {
|"Building_Detail": {
|"Society_name": "TTTT",
|"building_no": "5",
|"house_no": 78,
|"Building_Type": {
|"Buildingname": "Amaranthus",
|"buildingArea": 34,
|"Building_Criteria": {
|"f1": 23,
|"f2": "RRR",
|"BuildDetInner": {
|"duplex": true,
|"Price": 3434,
|"TotalCost": 7777,
|"Floor": 4,
|"PhoneNo": 5656,
|"value":"Value"
|}
|}
|}
|}
|}
|}
|} """.stripMargin
WriteFilesWithAvroWriter(rows, mySchema, json, isLocalDictionary)
}
test("test ComplexDataType projection for struct of struct -6 levels") {
buildAvroTestDataMultiLevel6Type(false)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"),
Seq(Row("DON", 21, "9888", "44", Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34,
Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))))))
checkAnswer(sql("select address from sdkOutputTable"),
Seq(Row(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))))))
checkAnswer(sql("select address.Address_Detail from sdkOutputTable"),
Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))))
checkAnswer(sql("select address.Address_Detail.Building_Detail from sdkOutputTable"),
Seq(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))))
checkAnswer(sql("select address.Address_Detail.Building_Detail.Building_Type from sdkOutputTable"),
Seq(Row(Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from " +
"sdkOutputTable"), Seq(Row(Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.duplex from sdkOutputTable"), Seq(Row(true)))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.price from sdkOutputTable"), Seq(Row(3434)))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.totalcost from sdkOutputTable"), Seq(Row(7777)))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.floor from sdkOutputTable"), Seq(Row(4)))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.phoneNo from sdkOutputTable"), Seq(Row(5656)))
checkAnswer(sql(
"select address.Address_Detail.Building_Detail.Building_Type.Building_Criteria" +
".BuildDetInner.value from sdkOutputTable"), Seq(Row("Value")))
checkAnswer(sql("select address,address.Address_Detail from sdkOutputTable"),
Seq(Row(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))
, Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))))))
checkAnswer(sql("select address.Address_Detail.Building_Detail.Building_Type,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from sdkOutputTable"), Seq(Row(Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))),Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
checkAnswer(sql("select address.Address_Detail,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria from sdkOutputTable"),Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))),Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value")))))
checkAnswer(sql("select address.Address_Detail,address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.Building_Type.Building_Criteria.f1 from sdkOutputTable"),
Seq(Row(Row(Row("TTTT", "5", 78, Row("Amaranthus", 34, Row(23, "RRR", Row(true, 3434, 7777, 4, 5656, "Value"))))),"TTTT",23)))
checkAnswer(sql("select address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.building_no from sdkOutputTable"),Seq(Row("TTTT","5")))
sql("select address.Address_Detail.Building_Detail.Society_name,address.Address_Detail.Building_Detail.building_no from sdkOutputTable where address.Address_Detail.Building_Detail.Society_name ='TTTT'").show(false)
sql("DROP TABLE sdkOutputTable")
cleanTestData()
}
}