| /* |
| |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| * |
| http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| package org.apache.carbondata.spark.testsuite.createTable.TestCreateDDLForComplexMapType |
| |
| import java.io.{BufferedWriter, File, FileWriter} |
| import java.util |
| |
| import au.com.bytecode.opencsv.CSVWriter |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.spark.sql.{AnalysisException, Row} |
| import org.apache.spark.sql.test.util.QueryTest |
| import org.scalatest.BeforeAndAfterAll |
| |
| import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk |
| import scala.collection.JavaConversions._ |
| |
| class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll { |
| private val conf: Configuration = new Configuration(false) |
| |
| val rootPath = new File(this.getClass.getResource("/").getPath |
| + "../../../..").getCanonicalPath |
| |
| val path = s"$rootPath/integration/spark-common-test/src/test/resources/maptest2.csv" |
| |
| private def checkForLocalDictionary(dimensionRawColumnChunks: util |
| .List[DimensionRawColumnChunk]): Boolean = { |
| var isLocalDictionaryGenerated = false |
| import scala.collection.JavaConversions._ |
| isLocalDictionaryGenerated = dimensionRawColumnChunks |
| .filter(dimensionRawColumnChunk => dimensionRawColumnChunk.getDataChunkV3 |
| .isSetLocal_dictionary).size > 0 |
| isLocalDictionaryGenerated |
| } |
| |
| def createCSVFile(): Unit = { |
| val out = new BufferedWriter(new FileWriter(path)); |
| val writer = new CSVWriter(out); |
| |
| val employee1 = Array("1\u0002Nalla\u00012\u0002Singh\u00011\u0002Gupta\u00014\u0002Kumar") |
| |
| val employee2 = Array("10\u0002Nallaa\u000120\u0002Sissngh\u0001100\u0002Gusspta\u000140" + |
| "\u0002Kumar") |
| |
| var listOfRecords = List(employee1, employee2) |
| |
| writer.writeAll(listOfRecords) |
| out.close() |
| } |
| |
| override def beforeAll(): Unit = { |
| createCSVFile() |
| sql("DROP TABLE IF EXISTS carbon") |
| } |
| |
| override def afterAll(): Unit = { |
| new File(path).delete() |
| } |
| |
| test("Single Map One Level") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,string>")) |
| } |
| |
| test("Single Map with Two Nested Level") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,map<INT,STRING>> |
| | ) |
| | STORED BY |
| |'carbondata' |
| |""" |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,map<int,string>>")) |
| } |
| |
| test("Map Type with array type as value") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,array<INT>> |
| | ) |
| | STORED BY 'carbondata' |
| | |
| """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,array<int>>")) |
| } |
| |
| test("Map Type with struct type as value") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,struct<key:INT,val:INT>> |
| | ) |
| | STORED BY |
| | 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim |
| .equals("map<string,struct<key:int,val:int>>")) |
| } |
| |
| test("Map Type as child to struct type") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField struct<key:INT,val:map<INT,INT>> |
| | ) |
| | STORED BY |
| |'carbondata' """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim |
| .equals("struct<key:int,val:map<int,int>>")) |
| } |
| |
| test("Map Type as child to array type") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField array<map<INT,INT>> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon """.stripMargin).collect() |
| assert(desc(0).get(1).asInstanceOf[String].trim.equals("array<map<int,int>>")) |
| sql("insert into carbon values(array(map(1,2,2,3), map(100,200,200,300)))") |
| sql("select * from carbon").show(false) |
| } |
| |
| test("Test Load data in map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| sql("insert into carbon values(map(1,'Nalla',2,'Singh',3,'Gupta',4,'Kumar'))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) |
| } |
| |
| test("Test Load data in map with empty value") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| sql("insert into carbon values(map(1,'Nalla',2,'',3,'Gupta',4,'Kumar'))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Nalla", 2 -> "", 3 -> "Gupta", 4 -> "Kumar")))) |
| } |
| |
| // Global Dictionary for Map type |
| test("Test Load data in map with dictionary include") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('DICTIONARY_INCLUDE'='mapField') |
| | """ |
| .stripMargin) |
| sql("insert into carbon values(map('vi','Nalla','sh','Singh','al','Gupta'))") |
| sql("select * from carbon").show(false) |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map("vi" -> "Nalla", "sh" -> "Singh", "al" -> "Gupta")))) |
| } |
| |
| test("Test Load data in map with partition columns") { |
| sql("DROP TABLE IF EXISTS carbon") |
| val exception = intercept[AnalysisException]( |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | a INT, |
| | mapField array<STRING>, |
| | b STRING |
| | ) |
| | PARTITIONED BY (mp map<int,string>) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| ) |
| assertResult("Cannot use map<int,string> for partition column;")(exception.getMessage()) |
| } |
| |
| test("Test IUD in map columns") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | a INT, |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("insert into carbon values(1, map(1,'Nalla',2,'Singh',3,'Gupta',4,'Kumar'))") |
| sql("insert into carbon values(2, map(1,'abc',2,'xyz',3,'hello',4,'mno'))") |
| val exception = intercept[UnsupportedOperationException]( |
| sql("update carbon set(mapField)=('1,haha') where a=1").show(false)) |
| assertResult("Unsupported operation on Complex data type")(exception.getMessage()) |
| sql("delete from carbon where mapField[1]='abc'") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(1, Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) |
| |
| } |
| |
| test("Test Load duplicate keys data in map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| val desc = sql( |
| s""" |
| | Describe Formatted |
| | carbon |
| | """.stripMargin).collect() |
| sql("insert into carbon values(map(1,'Nalla',2,'Singh',1,'Gupta',4,'Kumar'))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")))) |
| } |
| |
| test("Test Load data in map of map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,map<INT,STRING>> |
| | ) |
| | STORED BY |
| |'carbondata' """ |
| .stripMargin) |
| sql("insert into carbon values(map('manish', map(1,'nalla',2,'gupta'), 'kunal', map(1, 'kapoor', 2, 'sharma')))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map("manish" -> Map(1 -> "nalla", 2 -> "gupta"), |
| "kunal" -> Map(1 -> "kapoor", 2 -> "sharma"))))) |
| } |
| |
| test("Test Load duplicate keys data in map of map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,map<INT,STRING>> |
| | ) |
| | STORED BY |
| |'carbondata' |
| |""" |
| .stripMargin) |
| sql("insert into carbon values(map('manish', map(1,'nalla',1,'gupta'), 'kunal', map(1, 'kapoor', 2, 'sharma')))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map("manish" -> Map(1 -> "gupta"), |
| "kunal" -> Map(1 -> "kapoor", 2 -> "sharma"))))) |
| } |
| |
| test("Test Create table as select with map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql("DROP TABLE IF EXISTS carbon1") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("insert into carbon values(map(1,'Nalla',2,'Singh',3,'Gupta',4,'Kumar'))") |
| sql( |
| s""" |
| | CREATE TABLE carbon1 |
| | AS |
| | Select * |
| | From carbon |
| | """ |
| .stripMargin) |
| checkAnswer(sql("select * from carbon1"), Seq( |
| Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar")))) |
| } |
| |
| test("Test Create table with double datatype in map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<DOUBLE,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("insert into carbon values(map(1.23,'Nalla',2.34,'Singh',3.67676,'Gupta',3.67676,'Kumar'))") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1.23 -> "Nalla", 2.34 -> "Singh", 3.67676 -> "Kumar")))) |
| } |
| |
| test("Load Map data from CSV File") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")) |
| )) |
| } |
| |
| test("test compaction with map data type") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| sql("alter table carbon compact 'minor'") |
| sql("show segments for table carbon").show(false) |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")), |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")), |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")), |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")) |
| )) |
| sql("DROP TABLE IF EXISTS carbon") |
| } |
| |
| test("Sort Column table property blocking for Map type") { |
| sql("DROP TABLE IF EXISTS carbon") |
| val exception1 = intercept[Exception] { |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<STRING,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | TBLPROPERTIES('SORT_COLUMNS'='mapField') |
| | """ |
| .stripMargin) |
| } |
| assert(exception1.getMessage |
| .contains( |
| "sort_columns is unsupported for map datatype column: mapField")) |
| } |
| |
| test("Data Load Fail Issue") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,STRING> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql( |
| s""" |
| | LOAD DATA LOCAL INPATH '$path' |
| | INTO TABLE carbon OPTIONS( |
| | 'header' = 'false') |
| """.stripMargin) |
| sql("INSERT INTO carbon SELECT * FROM carbon") |
| checkAnswer(sql("select * from carbon"), Seq( |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(1 -> "Gupta", 2 -> "Singh", 4 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")), |
| Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")) |
| )) |
| } |
| |
| test("Struct inside map") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,struct<kk:STRING,mm:STRING>> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("INSERT INTO carbon values(map(1, named_struct('kk', 'man', 'mm', 'nan'), 2, named_struct('kk', 'kands', 'mm', 'dsnknd')))") |
| sql("INSERT INTO carbon SELECT * FROM carbon") |
| checkAnswer(sql("SELECT * FROM carbon limit 1"), |
| Seq(Row(Map(1 -> Row("man", "nan"), (2 -> Row("kands", "dsnknd")))))) |
| } |
| |
| test("Struct inside map pushdown") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | mapField map<INT,struct<kk:STRING,mm:STRING>> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("INSERT INTO carbon values(map(1, named_struct('kk', 'man', 'mm', 'nan'), 2, named_struct('kk', 'kands', 'mm', 'dsnknd')))") |
| checkAnswer(sql("SELECT mapField[1].kk FROM carbon"), Row("man")) |
| } |
| |
| test("Map inside struct") { |
| sql("DROP TABLE IF EXISTS carbon") |
| sql( |
| s""" |
| | CREATE TABLE carbon( |
| | structField struct<intVal:INT,map1:MAP<STRING,STRING>> |
| | ) |
| | STORED BY 'carbondata' |
| | """ |
| .stripMargin) |
| sql("INSERT INTO carbon values(named_struct('intVal', 1, 'map1', map('man','nan','kands','dsnknd')))") |
| val res = sql("SELECT structField.intVal FROM carbon").show(false) |
| checkAnswer(sql("SELECT structField.intVal FROM carbon"), Seq(Row(1))) |
| } |
| |
| } |