blob: 941364c51d086780ca72c9ea605733bc0438ab86 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
*
http://www.apache.org/licenses/LICENSE-2.0
*
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.createTable.TestCreateDDLForComplexMapType
import java.io.{BufferedWriter, File, FileWriter}
import java.util
import au.com.bytecode.opencsv.CSVWriter
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
import scala.collection.JavaConversions._
class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
private val conf: Configuration = new Configuration(false)
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val path = s"$rootPath/integration/spark-common-test/src/test/resources/maptest2.csv"
private def checkForLocalDictionary(dimensionRawColumnChunks: util
.List[DimensionRawColumnChunk]): Boolean = {
var isLocalDictionaryGenerated = false
import scala.collection.JavaConversions._
isLocalDictionaryGenerated = dimensionRawColumnChunks
.filter(dimensionRawColumnChunk => dimensionRawColumnChunk.getDataChunkV3
.isSetLocal_dictionary).size > 0
isLocalDictionaryGenerated
}
def createCSVFile(): Unit = {
val out = new BufferedWriter(new FileWriter(path));
val writer = new CSVWriter(out);
val employee1 = Array("1\u0002Nalla\u00012\u0002Singh\u00011\u0002Gupta\u00014\u0002Kumar")
val employee2 = Array("10\u0002Nallaa\u000120\u0002Sissngh\u0001100\u0002Gusspta\u000140" +
"\u0002Kumar")
var listOfRecords = List(employee1, employee2)
writer.writeAll(listOfRecords)
out.close()
}
override def beforeAll(): Unit = {
createCSVFile()
sql("DROP TABLE IF EXISTS carbon")
}
override def afterAll(): Unit = {
new File(path).delete()
}
test("Single Map One Level") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,string>"))
}
test("Single Map with Two Nested Level") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,map<INT,STRING>>
| )
| STORED BY
|'carbondata'
|"""
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,map<int,string>>"))
}
test("Map Type with array type as value") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,array<INT>>
| )
| STORED BY 'carbondata'
|
"""
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,array<int>>"))
}
test("Map Type with struct type as value") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,struct<key:INT,val:INT>>
| )
| STORED BY
| 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim
.equals("map<string,struct<key:int,val:int>>"))
}
test("Map Type as child to struct type") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField struct<key:INT,val:map<INT,INT>>
| )
| STORED BY
|'carbondata' """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim
.equals("struct<key:int,val:map<int,int>>"))
}
test("Map Type as child to array type") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField array<map<INT,INT>>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon """.stripMargin).collect()
assert(desc(0).get(1).asInstanceOf[String].trim.equals("array<map<int,int>>"))
sql("insert into carbon values('1\0032\0022\0033\001100\003200\002200\003300')")
sql("select * from carbon").show(false)
}
test("Test Load data in map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
}
test("Test Load data in map with empty value") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
sql("insert into carbon values('1\002Nalla\0012\002\0013\002Gupta\0014\002Kumar')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "", 3 -> "Gupta", 4 -> "Kumar"))))
}
// Support this for Map type
test("Test Load data in map with dictionary include") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<int,STRING>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('DICTIONARY_INCLUDE'='mapField')
| """
.stripMargin)
sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta')")
sql("select * from carbon").show(false)
//checkAnswer(sql("select * from carbon"), Seq(
//Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
}
test("Test Load data in map with partition columns") {
sql("DROP TABLE IF EXISTS carbon")
val exception = intercept[AnalysisException](
sql(
s"""
| CREATE TABLE carbon(
| a INT,
| mapField array<STRING>,
| b STRING
| )
| PARTITIONED BY (mp map<int,string>)
| STORED BY 'carbondata'
| """
.stripMargin)
)
assertResult("Cannot use map<int,string> for partition column;")(exception.getMessage())
}
test("Test IUD in map columns") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| a INT,
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("insert into carbon values(1,'1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
sql("insert into carbon values(2,'1\002abc\0012\002xyz\0013\002hello\0014\002mno')")
val exception = intercept[UnsupportedOperationException](
sql("update carbon set(mapField)=('1,haha') where a=1").show(false))
assertResult("Unsupported operation on Complex data type")(exception.getMessage())
sql("delete from carbon where mapField[1]='abc'")
checkAnswer(sql("select * from carbon"), Seq(
Row(1, Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
}
test("Test Compaction blocking") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| a INT,
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val exception = intercept[UnsupportedOperationException](
sql("ALTER table carbon compact 'minor'")
)
assertResult("Compaction is unsupported for Table containing Map Columns")(exception
.getMessage())
}
test("Test Load duplicate keys data in map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
val desc = sql(
s"""
| Describe Formatted
| carbon
| """.stripMargin).collect()
sql("insert into carbon values('1\002Nalla\0012\002Singh\0011\002Gupta\0014\002Kumar')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar"))))
}
test("Test Load data in map of map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,map<INT,STRING>>
| )
| STORED BY
|'carbondata' """
.stripMargin)
sql(
"insert into carbon values('manish\0021\004nalla\0032\004gupta\001kunal\0021\004kapoor\0032" +
"\004sharma')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map("manish" -> Map(1 -> "nalla", 2 -> "gupta"),
"kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
}
test("Test Load duplicate keys data in map of map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,map<INT,STRING>>
| )
| STORED BY
|'carbondata'
|"""
.stripMargin)
sql(
"insert into carbon values('manish\0021\004nalla\0031\004gupta\001kunal\0021\004kapoor\0032" +
"\004sharma')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map("manish" -> Map(1 -> "nalla"),
"kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
}
test("Test Create table as select with map") {
sql("DROP TABLE IF EXISTS carbon")
sql("DROP TABLE IF EXISTS carbon1")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
sql(
s"""
| CREATE TABLE carbon1
| AS
| Select *
| From carbon
| """
.stripMargin)
checkAnswer(sql("select * from carbon1"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
}
test("Test Create table with double datatype in map") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<DOUBLE,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql(
"insert into carbon values('1.23\002Nalla\0012.34\002Singh\0013.67676\002Gupta\0013.67676" +
"\002Kumar')")
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1.23 -> "Nalla", 2.34 -> "Singh", 3.67676 -> "Gupta"))))
}
test("Load Map data from CSV File") {
sql("DROP TABLE IF EXISTS carbon")
sql(
s"""
| CREATE TABLE carbon(
| mapField map<INT,STRING>
| )
| STORED BY 'carbondata'
| """
.stripMargin)
sql(
s"""
| LOAD DATA LOCAL INPATH '$path'
| INTO TABLE carbon OPTIONS(
| 'header' = 'false')
""".stripMargin)
checkAnswer(sql("select * from carbon"), Seq(
Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
))
}
test("Sort Column table property blocking for Map type") {
sql("DROP TABLE IF EXISTS carbon")
val exception1 = intercept[Exception] {
sql(
s"""
| CREATE TABLE carbon(
| mapField map<STRING,STRING>
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('SORT_COLUMNS'='mapField')
| """
.stripMargin)
}
assert(exception1.getMessage
.contains(
"sort_columns is unsupported for map datatype column: mapfield"))
}
}