blob: 05e556003a80991201882f407d06dc49aa7156e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.cluster.sdv.generated
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream}
import java.sql.Timestamp
import scala.collection.mutable
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, Encoder}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.CarbonWriter
/**
* Test Class for ComplexDataTypeTestCase to verify all scenerios
*/
class ComplexDataTypeTestCase extends QueryTest with BeforeAndAfterAll {
val filePath = TestQueryExecutor.integrationPath + "/spark/src/test/resources"
val writerPath =
s"${ resourcesPath }" + "/SparkCarbonFileFormat/WriterOutputComplex/"
override def beforeAll(): Unit = {
FileUtils.deleteDirectory(new File(writerPath))
sql("DROP TABLE IF EXISTS complexcarbontable")
sql("DROP TABLE IF EXISTS test")
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
override def afterAll(): Unit = {
FileUtils.deleteDirectory(new File(writerPath))
sql("DROP TABLE IF EXISTS complexcarbontable")
sql("DROP TABLE IF EXISTS test")
sql("DROP TABLE IF EXISTS sdkOutputTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}
// check create table with complex data type and insert into complex table
test("test Complex_DataType-001") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(person struct<detail:struct<id:int,name:string,height:double," +
"status:boolean,dob:date,dobt:timestamp>>) STORED AS carbondata")
sql("insert into test values(named_struct('detail', named_struct('id', 1, 'name', 'abc', 'height', 4.30, 'status', true, 'dob', '2017-08-09', 'dobt', '2017-08-09 00:00:00.0')))")
checkAnswer(sql("select * from test"),
Seq(Row(Row(Row(1, "abc", 4.3, true, java.sql.Date.valueOf("2017-08-09"),
Timestamp.valueOf("2017-08-09 00:00:00.0"))))))
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(p1 array<int>,p2 array<string>,p3 array<double>,p4 array<boolean>,p5 " +
"array<date>,p6 array<timestamp>) STORED AS carbondata")
sql("insert into test values(array(1,2,3), array('abc','def','mno'), array(4.30,4.60,5.20), array(true,true,false), array('2017-08-09','2017-08-09','2017-07-07'), array('2017-08-09 00:00:00.0','2017-08-09 00:00:00.0','2017-07-07 00:00:00.0'))")
checkAnswer(sql("select * from test"),
Seq(Row(mutable.WrappedArray.make(Array(1, 2, 3)),
mutable.WrappedArray.make(Array("abc", "def", "mno")),
mutable.WrappedArray.make(Array(4.3, 4.6, 5.2)),
mutable.WrappedArray.make(Array(true, true, false)),
mutable.WrappedArray
.make(Array(java.sql.Date.valueOf("2017-08-09"),
java.sql.Date.valueOf("2017-08-09"),
java.sql.Date.valueOf("2017-07-07"))),
mutable.WrappedArray
.make(Array(Timestamp.valueOf("2017-08-09 00:00:00.0"),
Timestamp.valueOf("2017-08-09 00:00:00.0"),
Timestamp.valueOf("2017-07-07 00:00:00.0"))))))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}
// check create table with complex data type and load data into complex table
test("test Complex_DataType-002") {
sql("drop table if exists complexcarbontable")
sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
"ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
"MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double) " +
"STORED AS carbondata")
sql(
s"LOAD DATA local inpath '$filePath/complexdata.csv' INTO table " +
"complexcarbontable " +
"OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
"ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
"'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
checkAnswer(sql("select * from complexcarbontable where deviceInformationId = 1"),
Seq(Row(1, "109", "4ROM size", "29-11-2015", Row("1AA1", "2BB1"),
mutable.WrappedArray.make(Array("MAC1", "MAC2", "MAC3")),
mutable.WrappedArray
.make(Array(Row(7, "Chinese", "Hubei Province", "yichang", "yichang", "yichang"),
Row(7, "India", "New Delhi", "delhi", "delhi", "delhi"))),
Row("29-11-2015", mutable.WrappedArray.make(Array("29-11-2015", "29-11-2015"))),
109.0, 2738.562)))
}
// check create table with complex data type and insert into
// into complex table
test("test Complex_DataType-003") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(person struct<detail:struct<id:int,name:string,height:double," +
"status:boolean,dob:date,dobt:timestamp>>) STORED AS carbondata ")
sql("insert into test values(named_struct('detail', named_struct('id', 1, 'name', 'abc', 'height', 4.30, 'status', true, 'dob', '2017-08-09', 'dobt', '2017-08-09 00:00:00.0')))")
checkAnswer(sql("select * from test"),
Seq(Row(Row(Row(1,
"abc", 4.3, true, java.sql.Date.valueOf("2017-08-09"),
Timestamp.valueOf("2017-08-09 00:00:00.0"))))))
sql("DROP TABLE IF EXISTS test")
sql(
"create table test(p1 array<int>,p2 array<string>,p3 array<double>,p4 array<boolean>,p5 " +
"array<date>,p6 array<timestamp>) STORED AS carbondata ")
sql("insert into test values(array(1,2,3), array('abc','def','mno'), array(4.30,4.60,5.20), array(true,true,false), array('2017-08-09','2017-08-09','2017-07-07'), array('2017-08-09 00:00:00.0','2017-08-09 00:00:00.0','2017-07-07 00:00:00.0'))")
checkAnswer(sql("select * from test"),
Seq(Row(mutable.WrappedArray.make(Array(1, 2, 3)),
mutable.WrappedArray.make(Array("abc", "def", "mno")),
mutable.WrappedArray.make(Array(4.3, 4.6, 5.2)),
mutable.WrappedArray.make(Array(true, true, false)),
mutable.WrappedArray
.make(Array(java.sql.Date.valueOf("2017-08-09"),
java.sql.Date.valueOf("2017-08-09"),
java.sql.Date.valueOf("2017-07-07"))),
mutable.WrappedArray
.make(Array(Timestamp.valueOf("2017-08-09 00:00:00.0"),
Timestamp.valueOf("2017-08-09 00:00:00.0"),
Timestamp.valueOf("2017-07-07 00:00:00.0"))))))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
}
// check ctas with complex datatype table
test("test Complex_DataType-004") {
sql("drop table if exists complexcarbontable")
sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
"ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
"MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double) " +
"STORED AS carbondata")
sql(
s"LOAD DATA local inpath '$filePath/complexdata.csv' INTO table " +
"complexcarbontable " +
"OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
"ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
"'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(100)))
sql("DROP TABLE IF EXISTS test")
sql("create table test STORED AS carbondata as select * from complexcarbontable")
checkAnswer(sql("select count(*) from test"), Seq(Row(100)))
}
//check projection pushdown with complex- STRUCT data type
test("test Complex_DataType-005") {
sql("DROP TABLE IF EXISTS complexcarbontable")
sql(
"create table complexcarbontable (roll int,a struct<b:int,c:string,d:int,e:string," +
"f:struct<g:int,h:string,i:int>,j:int>) " +
"STORED AS carbondata")
sql("insert into complexcarbontable values(1, named_struct('b', 1, 'c', 'abc', 'd', 2, 'e', 'efg', 'f', named_struct('g', 3, 'h', 'mno', 'i', 4), 'j', 5))")
sql("insert into complexcarbontable values(2, named_struct('b', 1, 'c', 'abc', 'd', 2, 'e', 'efg', 'f', named_struct('g', 3, 'h', 'mno', 'i', 4), 'j', 5))")
sql("insert into complexcarbontable values(3, named_struct('b', 1, 'c', 'abc', 'd', 2, 'e', 'efg', 'f', named_struct('g', 3, 'h', 'mno', 'i', 4), 'j', 5))")
checkAnswer(sql("select a.b from complexcarbontable"), Seq(Row(1), Row(1), Row(1)))
checkAnswer(sql("select a.c from complexcarbontable"), Seq(Row("abc"), Row("abc"), Row("abc")))
checkAnswer(sql("select a.d from complexcarbontable"), Seq(Row(2), Row(2), Row(2)))
checkAnswer(sql("select a.e from complexcarbontable"), Seq(Row("efg"), Row("efg"), Row("efg")))
checkAnswer(sql("select a.f from complexcarbontable"),
Seq(Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4)), Row(Row(3, "mno", 4))))
checkAnswer(sql("select a.f.g from complexcarbontable"), Seq(Row(3), Row(3), Row(3)))
checkAnswer(sql("select a.f.h from complexcarbontable"),
Seq(Row("mno"), Row("mno"), Row("mno")))
checkAnswer(sql("select a.f.i from complexcarbontable"), Seq(Row(4), Row(4), Row(4)))
checkAnswer(sql("select a.f.g,a.f.h,a.f.i from complexcarbontable"),
Seq(Row(3, "mno", 4), Row(3, "mno", 4), Row(3, "mno", 4)))
checkAnswer(sql("select a.b,a.f from complexcarbontable"),
Seq(Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4)), Row(1, Row(3, "mno", 4))))
checkAnswer(sql("select a.c,a.f from complexcarbontable"),
Seq(Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4)), Row("abc", Row(3, "mno", 4))))
checkAnswer(sql("select a.d,a.f from complexcarbontable"),
Seq(Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4)), Row(2, Row(3, "mno", 4))))
checkAnswer(sql("select a.j from complexcarbontable"), Seq(Row(5), Row(5), Row(5)))
checkAnswer(sql("select * from complexcarbontable"),
Seq(Row(1, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
Row(2, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
Row(3, Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
checkAnswer(sql("select *,a from complexcarbontable"),
Seq(Row(1,
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
Row(2,
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5)),
Row(3,
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5),
Row(1, "abc", 2, "efg", Row(3, "mno", 4), 5))))
}
// check create table with complex datatype columns and insert into table and apply filters
test("test Complex_DataType-006") {
sql("DROP TABLE IF EXISTS test")
sql("create table test(id int,a struct<b:int,c:int>) STORED AS carbondata")
sql("insert into test values(1, named_struct('b', 2, 'c', 3))")
sql("insert into test values(3, named_struct('b', 5, 'c', 3))")
sql("insert into test values(2, named_struct('b', 4, 'c', 5))")
checkAnswer(sql("select a.b from test where id=3"), Seq(Row(5)))
checkAnswer(sql("select a.b from test where a.c!=3"), Seq(Row(4)))
checkAnswer(sql("select a.b from test where a.c=3"), Seq(Row(5), Row(2)))
checkAnswer(sql("select a.b from test where id=1 or !a.c=3"), Seq(Row(4), Row(2)))
checkAnswer(sql("select a.b from test where id=3 or a.c=3"), Seq(Row(5), Row(2)))
}
// check create table with complex datatype columns and perform insertoverwrite
test("test Complex_DataType-007") {
sql("drop table if exists complexcarbontable")
sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
"ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
"MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double) " +
"STORED AS carbondata")
sql(
s"LOAD DATA local inpath '$filePath/complexdata.csv' INTO table " +
"complexcarbontable " +
"OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
"ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
"'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(100)))
sql("DROP TABLE IF EXISTS test")
sql("create table test(deviceInformationId int, channelsId string," +
"ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
"MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
"ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
"proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
"double,contractNumber double) " +
"STORED AS carbondata")
sql("insert overwrite table test select * from complexcarbontable")
checkAnswer(sql("select count(*) from test"), Seq(Row(100)))
}
// check create complex table and insert null values
test("test Complex_DataType-008") {
sql("drop table if exists complexcarbontable")
sql(
"create table complexcarbontable(roll int, student struct<id:int,name:string," +
"marks:array<int>>) " +
"STORED AS carbondata")
sql("insert into complexcarbontable values(1, named_struct('id', 1, 'name', 'abc', 'marks', array(1,null,null)))")
checkAnswer(sql("select * from complexcarbontable"),
Seq(Row(1, Row(1, "abc", mutable.WrappedArray.make(Array(1, null, null))))))
}
//check create table with complex double and insert bigger value and check
test("test Complex_DataType-009") {
sql("Drop table if exists complexcarbontable")
sql(
"create table complexcarbontable(struct_dbl struct<double1:double,double2:double," +
"double3:double>) " +
"STORED AS carbondata")
sql("insert into complexcarbontable values(named_struct('double1', 10000000, 'double2', 300000, 'double3', 3000))")
checkExistence(sql("select * from complexcarbontable"), true, "1.0E7,300000.0,3000.0")
sql("Drop table if exists complexcarbontable")
sql(
"create table complexcarbontable(struct_arr struct<array_db1:array<double>>) " +
"STORED AS carbondata")
sql("insert into complexcarbontable values(named_struct('array_db1', array(5555555.9559,12345678991234567,3444.999)))")
checkExistence(sql("select * from complexcarbontable"),
true,
"5555555.9559, 1.2345678991234568E16, 3444.999")
}
// check create table with complex data type through SDK
test("test Complex_DataType-010") {
val mySchema =
""" {
| "name": "address",
| "type": "record",
| "fields": [
| {
| "name": "name",
| "type": "string"
| },
| {
| "name": "age",
| "type": "int"
| },
| {
| "name" :"my_address",
| "type" :{
| "name": "my_address",
| "type": "record",
| "fields": [
| {
| "name": "Temperaturetest",
| "type": "double"
| }
| ]
| }
| }
| ]
|} """.stripMargin
val jsonvalue =
"""{
|"name" :"abcde",
|"age" :34,
|"my_address" :{ "Temperaturetest" :100 }
|}
""".stripMargin
val pschema = org.apache.avro.Schema.parse(mySchema)
val records = jsonToAvro(jsonvalue, mySchema)
val writer = CarbonWriter.builder().outputPath(writerPath).withAvroInput(pschema)
.writtenBy("ComplexDataTypeTestCase").build()
writer.write(records)
writer.close()
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata LOCATION
|'$writerPath' """.stripMargin)
checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("abcde", 34, Row(100.0))))
}
def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
var input: InputStream = null
var writer: DataFileWriter[GenericRecord] = null
var output: ByteArrayOutputStream = null
try {
val schema = new org.apache.avro.Schema.Parser().parse(avroSchema)
val reader = new GenericDatumReader[GenericRecord](schema)
input = new ByteArrayInputStream(json.getBytes())
output = new ByteArrayOutputStream()
val din = new DataInputStream(input)
writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
writer.create(schema, output)
val decoder = DecoderFactory.get().jsonDecoder(schema, din)
var datum: GenericRecord = reader.read(null, decoder)
return datum
} finally {
input.close()
writer.close()
}
}
}