| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.carbondata.spark.testsuite.standardpartition |
| |
| import java.util |
| import java.util.concurrent.{Callable, ExecutorService, Executors} |
| |
| import org.apache.spark.sql.Row |
| import org.apache.spark.sql.test.util.QueryTest |
| import org.apache.spark.util.SparkUtil |
| import org.scalatest.BeforeAndAfterAll |
| |
| import org.apache.carbondata.common.constants.LoggerAction |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.util.CarbonProperties |
| |
| class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll { |
| var executorService: ExecutorService = _ |
| override def beforeAll { |
| dropTable |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") |
| sql( |
| """ |
| | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| } |
| |
| test("data loading for global sort partition table for one partition column") { |
| sql( |
| """ |
| | CREATE TABLE partitionone (empname String, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empno int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='1')""") |
| |
| checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"), |
| sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) |
| |
| } |
| |
| test("data loading for global partition table for two partition column") { |
| sql( |
| """ |
| | CREATE TABLE partitiontwo (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (doj Timestamp, empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"), |
| sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) |
| |
| } |
| |
| test("data loading for global sort partition table for one static partition column") { |
| sql( |
| """ |
| | CREATE TABLE staticpartitionone (empname String, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empno int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""insert into staticpartitionone PARTITION(empno='1') select empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary from originTable""") |
| |
| } |
| |
| test("single pass loading for global sort partition table for one partition column") { |
| sql( |
| """ |
| | CREATE TABLE singlepasspartitionone (empname String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (designation String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""") |
| |
| } |
| |
| test("data loading for global sort partition table for one static partition column with load syntax") { |
| sql( |
| """ |
| | CREATE TABLE loadstaticpartitionone (empname String, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empno int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitionone PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select distinct empno from loadstaticpartitionone"), Seq(Row(1))) |
| } |
| |
| test("overwrite global sort partition table for one static partition column with load syntax") { |
| sql( |
| """ |
| | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empno int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| val rows = sql("select count(*) from loadstaticpartitiononeoverwrite").collect() |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), rows) |
| } |
| |
| test("test global sort partition column with special characters") { |
| sql( |
| """ |
| | CREATE TABLE loadpartitionwithspecialchar (empno int, designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data_with_special_char.csv' INTO TABLE loadpartitionwithspecialchar OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select count(*) from loadpartitionwithspecialchar"), Seq(Row(10))) |
| checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where empname='sibi=56'"), Seq(Row(1))) |
| checkAnswer(sql("select count(*) from loadpartitionwithspecialchar where empname='arvind,ss'"), Seq(Row(1))) |
| } |
| |
| ignore("concurrent global sort partition table load test") { |
| executorService = Executors.newCachedThreadPool() |
| sql( |
| """ |
| | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj Timestamp, |
| | workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int,workgroupcategory int,designation String) |
| | PARTITIONED BY (empname String) |
| | STORED BY 'org.apache.carbondata.format' |
| | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| val tasks = new util.ArrayList[Callable[String]]() |
| var i = 0 |
| val count = 5 |
| while (i < count) { |
| tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) |
| i = i + 1 |
| } |
| val results = executorService.invokeAll(tasks) |
| for (i <- 0 until tasks.size()) { |
| val res = results.get(i).get |
| assert("PASS".equals(res)) |
| } |
| executorService.shutdown() |
| checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(10 * count))) |
| assert(sql("show segments for table partitionmultiplethreeconcurrent").count() == count) |
| } |
| |
| class QueryTask(query: String) extends Callable[String] { |
| override def call(): String = { |
| var result = "PASS" |
| try { |
| LOGGER.info("Executing :" + Thread.currentThread().getName) |
| sql(query) |
| } catch { |
| case ex: Exception => |
| ex.printStackTrace() |
| result = "FAIL" |
| } |
| result |
| } |
| } |
| |
| test("global sort bad record test with null values") { |
| sql(s"""CREATE TABLE IF NOT EXISTS emp1 (emp_no int,ename string,job string,mgr_id int,date_of_joining string,salary int,bonus int) partitioned by (dept_no int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')""") |
| sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\')""") |
| val rows = sql(s"select count(*) from emp1").collect() |
| sql(s"""LOAD DATA INPATH '$resourcesPath/emp.csv' overwrite INTO TABLE emp1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'= '\','BAD_RECORDS_ACTION'='FORCE')""") |
| checkAnswer(sql(s"select count(*) from emp1"), rows) |
| } |
| |
| test("global sort badrecords on partition column") { |
| sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition options('bad_records_action'='force')") |
| sql("select count(*) from badrecordsPartition").show() |
| checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is null"), Seq(Row(9))) |
| checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is not null"), Seq(Row(2))) |
| } |
| |
| test("global sort badrecords fail on partition column") { |
| sql("create table badrecordsPartitionfail(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| intercept[Exception] { |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfail options('bad_records_action'='fail')") |
| |
| } |
| } |
| |
| test("global sort badrecords ignore on partition column") { |
| sql("create table badrecordsPartitionignore(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql("create table badrecordsignore(intField1 int,intField2 int, stringField1 string) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionignore options('bad_records_action'='ignore')") |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsignore options('bad_records_action'='ignore')") |
| checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), sql("select count(*) cnt from badrecordsignore where intfield2 is null")) |
| checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), sql("select count(*) cnt from badrecordsignore where intfield2 is not null")) |
| } |
| |
| |
| test("global sort test partition fails on int null partition") { |
| sql("create table badrecordsPartitionintnull(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnull options('bad_records_action'='force')") |
| checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnull where intfield2 = 13"), Seq(Row(1))) |
| } |
| |
| test("global sort test partition fails on int null partition read alternate") { |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, "false") |
| sql("create table badrecordsPartitionintnullalt(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnullalt options('bad_records_action'='force')") |
| checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnullalt where intfield2 = 13"), Seq(Row(1))) |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT) |
| } |
| |
| test("global sort static column partition with load command") { |
| sql( |
| """ |
| | CREATE TABLE staticpartitionload (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, |
| | projectjoindate Timestamp,attendance int, |
| | deptname String,projectcode int, |
| | utilization int,salary int,projectenddate Date,doj Timestamp) |
| | PARTITIONED BY (empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionload") |
| checkExistence(sql(s"""SHOW PARTITIONS staticpartitionload"""), true, "empname=ravi") |
| } |
| |
| test("overwriting global sort static partition table for date partition column on insert query") { |
| sql( |
| """ |
| | CREATE TABLE staticpartitiondateinsert (empno int, empname String, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, |
| | projectjoindate Timestamp,attendance int, |
| | deptname String,projectcode int, |
| | utilization int,salary int) |
| | PARTITIONED BY (projectenddate Date,doj Timestamp) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") |
| sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") |
| sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") |
| sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") |
| sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as timestamp)""") |
| checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"), |
| sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,cast(projectenddate as date),doj from originTable where projectenddate=cast('2016-06-29' as timestamp)")) |
| } |
| |
| |
| test("dynamic and static global sort partition table with load syntax") { |
| sql( |
| """ |
| | CREATE TABLE loadstaticpartitiondynamic (designation String, doj Timestamp, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (empno int, empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiondynamic PARTITION(empno='1', empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql(s"select count(*) from loadstaticpartitiondynamic where empno=1"), sql(s"select count(*) from loadstaticpartitiondynamic")) |
| } |
| |
| test("dynamic and static global sort partition table with overwrite ") { |
| sql( |
| """ |
| | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary int) |
| | PARTITIONED BY (empno int, empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect() |
| sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname) select designation, doj, salary, empname from insertstaticpartitiondynamic""") |
| |
| checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"), rows) |
| |
| intercept[Exception] { |
| sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno, empname='ravi') select designation, doj, salary, empname from insertstaticpartitiondynamic""") |
| } |
| |
| } |
| |
| test("overwriting global sort all partition on table and do compaction") { |
| sql( |
| """ |
| | CREATE TABLE partitionallcompaction (empno int, empname String, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, |
| | projectjoindate Timestamp, projectenddate Date,attendance int, |
| | utilization int,salary int) |
| | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"') """) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='configManagement', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='network', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='security', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect() |
| checkExistence(sql(s"""SHOW segments for table partitionallcompaction"""), true, "Marked for Delete") |
| } |
| |
| test("Test global sort overwrite static partition ") { |
| sql( |
| """ |
| | CREATE TABLE weather6 (type String) |
| | PARTITIONED BY (year int, month int, day int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql("insert into weather6 partition(year=2014, month=5, day=25) select 'rainy'") |
| sql("insert into weather6 partition(year=2014, month=4, day=23) select 'cloudy'") |
| sql("insert overwrite table weather6 partition(year=2014, month=5, day=25) select 'sunny'") |
| checkExistence(sql("select * from weather6"), true, "sunny") |
| checkAnswer(sql("select count(*) from weather6"), Seq(Row(2))) |
| } |
| |
| test("Test global sort overwrite static partition with wrong int value") { |
| sql( |
| """ |
| | CREATE TABLE weather7 (type String) |
| | PARTITIONED BY (year int, month int, day int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| |
| sql("insert into weather7 partition(year=2014, month=05, day=25) select 'rainy'") |
| sql("insert into weather7 partition(year=2014, month=04, day=23) select 'cloudy'") |
| sql("insert overwrite table weather7 partition(year=2014, month=05, day=25) select 'sunny'") |
| checkExistence(sql("select * from weather7"), true, "sunny") |
| checkAnswer(sql("select count(*) from weather7"), Seq(Row(2))) |
| sql("insert into weather7 partition(year=2014, month, day) select 'rainy1',06,25") |
| sql("insert into weather7 partition(year=2014, month=01, day) select 'rainy2',27") |
| sql("insert into weather7 partition(year=2014, month=01, day=02) select 'rainy3'") |
| checkAnswer(sql("select count(*) from weather7 where month=1"), Seq(Row(2))) |
| } |
| |
| |
| test("test overwrite missed scenarios") { |
| sql(s"""create table carbon_test( |
| id string, |
| name string |
| ) |
| PARTITIONED BY(record_date int) |
| STORED BY 'org.apache.carbondata.format' |
| TBLPROPERTIES('SORT_COLUMNS'='id')""") |
| sql(s"""create table carbon_test_hive( |
| id string, |
| name string |
| ) |
| PARTITIONED BY(record_date int)""") |
| sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") |
| sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""") |
| sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim',unix_timestamp('2018-02-05','yyyy-MM-dd') as record_date""") |
| |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) |
| sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """) |
| sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim1',unix_timestamp('2018-02-06','yyyy-MM-dd') as record_date """) |
| |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517904000"""), sql(s"""select * from carbon_test_hive where record_date=1517904000""")) |
| sql(s"""insert overwrite table carbon_test partition(record_date) select '1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""") |
| sql(s"""insert overwrite table carbon_test_hive partition(record_date) select '1','kim2',unix_timestamp('2018-02-07','yyyy-MM-dd') as record_date""") |
| |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517817600"""), sql(s"""select * from carbon_test_hive where record_date=1517817600""")) |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517904000"""), sql(s"""select * from carbon_test_hive where record_date=1517904000""")) |
| checkAnswer(sql(s"""select * from carbon_test where record_date=1517990400"""), sql(s"""select * from carbon_test_hive where record_date=1517990400""")) |
| } |
| |
| test("test overwrite with timestamp partition column") { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") |
| sql("DROP TABLE IF EXISTS origintablenew") |
| sql( |
| """ |
| | CREATE TABLE origintablenew |
| | (id Int, |
| | vin String, |
| | logdate Timestamp, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| |
| sql( |
| s""" |
| LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintablenew |
| """) |
| |
| sql("DROP TABLE IF EXISTS partitiontable0") |
| sql("DROP TABLE IF EXISTS partitiontable0_hive") |
| sql( |
| """ |
| | CREATE TABLE partitiontable0 |
| | (id Int, |
| | vin String, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | PARTITIONED BY (logdate Timestamp) |
| | STORED BY 'org.apache.carbondata.format' |
| | TBLPROPERTIES('SORT_COLUMNS'='id,vin') |
| """.stripMargin) |
| sql( |
| """ |
| | CREATE TABLE partitiontable0_hive |
| | (id Int, |
| | vin String, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | PARTITIONED BY (logdate Timestamp) |
| """.stripMargin) |
| sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") |
| |
| sql( |
| s""" |
| LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table partitiontable0 |
| """) |
| |
| sql( |
| s""" |
| insert into partitiontable0_hive select * from partitiontable0 |
| """) |
| |
| checkAnswer(sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0 where logdate = '2016-02-12' |
| """.stripMargin), sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0_hive where logdate = '2016-02-12' |
| """.stripMargin)) |
| |
| sql("insert into table partitiontable0 partition(logdate='2018-02-15 00:00:00') " + |
| "select id,vin,phonenumber,country,area,salary from origintablenew") |
| sql("insert into table partitiontable0_hive partition(logdate='2018-02-15 00:00:00') " + |
| "select id,vin,phonenumber,country,area,salary from origintablenew") |
| checkAnswer(sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0 where logdate = '2018-02-15' |
| """.stripMargin), sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0_hive where logdate = '2018-02-15' |
| """.stripMargin)) |
| |
| checkAnswer(sql( |
| s""" |
| | SELECT count(*) FROM partitiontable0""".stripMargin), sql( |
| s""" |
| | SELECT count(*) FROM partitiontable0_hive""".stripMargin)) |
| |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") |
| } |
| |
| test("test overwrite with date partition column") { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") |
| sql("DROP TABLE IF EXISTS origintablenew") |
| sql( |
| """ |
| | CREATE TABLE origintablenew |
| | (id Int, |
| | vin String, |
| | logdate date, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| |
| sql( |
| s""" |
| LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table origintablenew |
| """) |
| |
| sql("DROP TABLE IF EXISTS partitiontable0") |
| sql("DROP TABLE IF EXISTS partitiontable0_hive") |
| sql( |
| """ |
| | CREATE TABLE partitiontable0 |
| | (id Int, |
| | vin String, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | PARTITIONED BY (logdate date) |
| | STORED BY 'org.apache.carbondata.format' |
| | TBLPROPERTIES('SORT_COLUMNS'='id,vin') |
| """.stripMargin) |
| sql( |
| """ |
| | CREATE TABLE partitiontable0_hive |
| | (id Int, |
| | vin String, |
| | phonenumber Long, |
| | country String, |
| | area String, |
| | salary Int) |
| | PARTITIONED BY (logdate date) |
| """.stripMargin) |
| sql(s"""set hive.exec.dynamic.partition.mode=nonstrict""") |
| |
| sql( |
| s""" |
| LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' into table partitiontable0 |
| """) |
| |
| sql( |
| s""" |
| insert into partitiontable0_hive select * from partitiontable0 |
| """) |
| |
| checkAnswer(sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0 where logdate = '2016-02-12' |
| """.stripMargin), sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0_hive where logdate = '2016-02-12' |
| """.stripMargin)) |
| |
| sql("insert into table partitiontable0 partition(logdate='2018-02-15') " + |
| "select id,vin,phonenumber,country,area,salary from origintablenew") |
| sql("insert into table partitiontable0_hive partition(logdate='2018-02-15') " + |
| "select id,vin,phonenumber,country,area,salary from origintablenew") |
| checkAnswer(sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0 where logdate = '2018-02-15' |
| """.stripMargin), sql( |
| s""" |
| | SELECT logdate,id,vin,phonenumber,country,area,salary |
| | FROM partitiontable0_hive where logdate = '2018-02-15' |
| """.stripMargin)) |
| |
| checkAnswer(sql( |
| s""" |
| | SELECT count(*) FROM partitiontable0""".stripMargin), sql( |
| s""" |
| | SELECT count(*) FROM partitiontable0_hive""".stripMargin)) |
| |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") |
| } |
| |
| test("partition with date column issue") { |
| try { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name()) |
| sql("drop table if exists partdatecarb") |
| sql( |
| "create table partdatecarb(name string, age Int) partitioned by(dob date) stored by 'carbondata'") |
| |
| sql("insert into partdatecarb partition(dob='2016-06-28') select 'name1',121") |
| checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb"), |
| Seq(Row("name1", 121, "2016-06-28"))) |
| } finally { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name()) |
| } |
| } |
| |
| test("partition with time column issue") { |
| try { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name()) |
| sql("drop table if exists partdatecarb1") |
| sql( |
| "create table partdatecarb1(name string, age Int) partitioned by(dob timestamp) stored by 'carbondata'") |
| |
| sql("insert into partdatecarb1 partition(dob='2016-06-28 00:00:00') select 'name1',121") |
| checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb1"), |
| Seq(Row("name1", 121, "2016-06-28 00:00:00"))) |
| } finally { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name()) |
| } |
| } |
| |
| test("partition with int issue and dictionary exclude") { |
| try { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name()) |
| sql("drop table if exists partdatecarb2") |
| sql( |
| "create table partdatecarb2(name string, dob string) partitioned by(age Int) stored by 'carbondata' TBLPROPERTIES('DICTIONARY_EXCLUDE'='age')") |
| |
| sql("insert into partdatecarb2 partition(age='12') select 'name1','2016-06-28'") |
| checkAnswer(sql("select name,age,cast(dob as string) from partdatecarb2"), |
| Seq(Row("name1", 12, "2016-06-28"))) |
| } finally { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name()) |
| } |
| } |
| |
| test("partition with int issue and dictionary include") { |
| sql("drop table if exists partdatecarb3") |
| intercept[Exception] { |
| sql( |
| "create table partdatecarb3(name string, dob string) partitioned by(age Int) stored by 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='age')") |
| } |
| } |
| |
| test("data loading for all dimensions with table for two partition column") { |
| sql("drop table if exists partitiontwoalldims") |
| sql( |
| """ |
| | CREATE TABLE partitiontwoalldims (empno String, designation String, |
| | workgroupcategory String, workgroupcategoryname String, deptno String, deptname String, |
| | projectcode String, projectjoindate Timestamp, projectenddate Timestamp,attendance String, |
| | utilization String,salary String) |
| | PARTITIONED BY (doj Timestamp, empname String) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwoalldims OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiontwoalldims"), Seq(Row(10))) |
| } |
| |
| test("partition with different order column issue") { |
| try { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FAIL.name()) |
| |
| sql("drop table if exists partdatecarb4_hive") |
| sql( |
| "create table partdatecarb4_hive(name string, age Int) partitioned by(country string, state string, city string)") |
| |
| sql("insert into partdatecarb4_hive partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'") |
| sql("insert into partdatecarb4_hive partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'") |
| |
| sql("drop table if exists partdatecarb4") |
| sql( |
| "create table partdatecarb4(name string, age Int) partitioned by(country string, state string, city string) stored by 'carbondata'") |
| |
| sql("insert into partdatecarb4 partition(state,city,country='india') select 'name1',12,'KA', 'BGLR'") |
| sql("insert into partdatecarb4 partition(city,state,country='india') select 'name1',12, 'BGLR','KA'") |
| sql("select * from partdatecarb4").show() |
| checkAnswer(sql("select * from partdatecarb4"), sql("select * from partdatecarb4_hive")) |
| intercept[Exception] { |
| sql( |
| "insert into partdatecarb4 partition(state,city='3',country) select 'name1',12,'cc', 'dd'") |
| } |
| } finally { |
| CarbonProperties.getInstance().addProperty( |
| CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name()) |
| } |
| } |
| |
| test("data loading for decimal column partition table") { |
| |
| sql( |
| """ |
| | CREATE TABLE partitiondecimal (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondecimal OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiondecimal order by empno"), |
| sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) |
| } |
| |
| test("data loading for decimal column static partition table") { |
| |
| sql( |
| """ |
| | CREATE TABLE partitiondecimalstatic (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondecimalstatic partition(salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select count(salary) from partitiondecimalstatic"), Seq(Row(10))) |
| } |
| |
| test("query after select on partition table") { |
| |
| sql( |
| """ |
| | CREATE TABLE partitiondatadelete (designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int,empno int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String,salary int) |
| | PARTITIONED BY (projectjoindate Timestamp) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondatadelete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| sql(s"delete from partitiondatadelete where projectjoindate='2012-11-14 00:00:00'") |
| checkAnswer(sql(s"select count(*) from partitiondatadelete where where projectjoindate='2012-11-14 00:00:00'"), Seq(Row(0))) |
| } |
| |
| test("partition colunm test without partition column in fileheader of load command") { |
| sql("DROP TABLE IF EXISTS partitiontablewithoutpartcolumninfileheader") |
| |
| sql("CREATE TABLE partitiontablewithoutpartcolumninfileheader (CUST_ID int,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) partitioned by(CUST_NAME String) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='CUST_ID,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1') ") |
| sql(s"""LOAD DATA INPATH '$resourcesPath/data_with_all_types.csv' into table partitiontablewithoutpartcolumninfileheader partition(cust_name='ravi') OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME1,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')""") |
| |
| checkAnswer(sql("select count(*) from partitiontablewithoutpartcolumninfileheader"), Seq(Row(10))) |
| sql("DROP TABLE IF EXISTS partitiontablewithoutpartcolumninfileheader") |
| } |
| |
| test("data loading with wrong format in static partition table") { |
| sql("DROP TABLE IF EXISTS partitionwrongformat") |
| sql( |
| """ |
| | CREATE TABLE partitionwrongformat (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| intercept[MalformedCarbonCommandException] { |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionwrongformat partition(projectjoindate='2016-12-01',salary='gg') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| } |
| |
| intercept[MalformedCarbonCommandException] { |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionwrongformat partition(projectjoindate='2016',salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| } |
| |
| } |
| |
| test("data loading with default partition in static partition table") { |
| sql("DROP TABLE IF EXISTS partitiondefaultpartition") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultpartition (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartition partition(projectjoindate='__HIVE_DEFAULT_PARTITION__',salary='1.0') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(salary) from partitiondefaultpartition"), Seq(Row(10))) |
| checkExistence(sql("show partitions partitiondefaultpartition"), true, "__HIVE_DEFAULT_PARTITION__") |
| } |
| |
| test("data loading with default partition in static partition table with fail badrecord") { |
| sql("DROP TABLE IF EXISTS partitiondefaultpartitionfail") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultpartitionfail (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartitionfail partition(projectjoindate='__HIVE_DEFAULT_PARTITION__',salary='1.0') OPTIONS('bad_records_logger_enable'='true', 'bad_records_action'='fail','DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultpartitionfail"), Seq(Row(10))) |
| checkExistence(sql("show partitions partitiondefaultpartitionfail"), true, "__HIVE_DEFAULT_PARTITION__") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultpartitionfail partition(projectjoindate='2016-12-01',salary='__HIVE_DEFAULT_PARTITION__') OPTIONS('bad_records_logger_enable'='true', 'bad_records_action'='fail','DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultpartitionfail"), Seq(Row(20))) |
| } |
| |
| test("data loading with int partition issue") { |
| sql("DROP TABLE IF EXISTS intissue") |
| sql("create table intissue(a int) partitioned by (b int) stored by 'carbondata'") |
| sql("insert into intissue values(1,1)") |
| checkAnswer(sql("select * from intissue"), Seq(Row(1,1))) |
| } |
| |
| test("data loading with int partition issue with global sort") { |
| sql("DROP TABLE IF EXISTS intissuesort") |
| sql("create table intissuesort(a int) partitioned by (b int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| sql("insert into intissuesort values(1,1)") |
| checkAnswer(sql("select * from intissuesort"), Seq(Row(1,1))) |
| } |
| |
| test("data loading with decimal column fail issue") { |
| sql("DROP TABLE IF EXISTS partitiondecimalfailissue") |
| sql("CREATE TABLE IF NOT EXISTS partitiondecimalfailissue (ID Int, date Timestamp, country String, name String, phonetype String, serialname String) partitioned by (salary Decimal(17,2)) STORED BY 'org.apache.carbondata.format'") |
| sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table partitiondecimalfailissue") |
| sql(s"select * from partitiondecimalfailissue").show() |
| sql(s"insert into partitiondecimalfailissue partition(salary='13000000.7878788') select ID,date,country,name,phonetype,serialname from partitiondecimalfailissue" ) |
| sql(s"select * from partitiondecimalfailissue").show(100) |
| } |
| |
| test("data loading with decimalissue partition issue") { |
| sql("DROP TABLE IF EXISTS decimalissue") |
| sql("create table decimalissue(a int) partitioned by (b decimal(2,2)) stored by 'carbondata'") |
| sql("insert into decimalissue values(23,21.2)") |
| checkAnswer(sql("select * from decimalissue"), Seq(Row(23,null))) |
| } |
| |
| test("data loading scalar query partition issue") { |
| sql("DROP TABLE IF EXISTS scalarissue") |
| sql("create table scalarissue(a int) partitioned by (salary double) stored by 'carbondata'") |
| sql("insert into scalarissue values(23,21.2)") |
| sql("DROP TABLE IF EXISTS scalarissue_hive") |
| sql("create table scalarissue_hive(a int,salary double) using parquet partitioned by (salary) ") |
| sql("set hive.exec.dynamic.partition.mode=nonstrict") |
| sql("insert into scalarissue_hive values(23,21.2)") |
| if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) { |
| intercept[Exception] { |
| sql(s"select * from scalarissue_hive where salary = (select max(salary) from " + |
| s"scalarissue_hive)") |
| .show() |
| } |
| intercept[Exception] { |
| sql(s"select * from scalarissue where salary = (select max(salary) from scalarissue)") |
| .show() |
| } |
| } else { |
| checkAnswer(sql(s"select * from scalarissue_hive where salary = (select max(salary) from " + |
| s"scalarissue_hive)"), Seq(Row(23, 21.2))) |
| checkAnswer(sql(s"select * from scalarissue where salary = (select max(salary) from " + |
| s"scalarissue)"), |
| Seq(Row(23, 21.2))) |
| } |
| } |
| |
| test("global sort badrecords fail on partition column message") { |
| sql("DROP TABLE IF EXISTS badrecordsPartitionfailmessage") |
| sql("create table badrecordsPartitionfailmessage(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')") |
| val ex = intercept[Exception] { |
| sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfailmessage options('bad_records_action'='fail')") |
| } |
| println(ex.getMessage.startsWith("DataLoad failure: Data load failed due to bad record")) |
| } |
| |
| test("multiple compaction on partition table") { |
| sql("DROP TABLE IF EXISTS comp_dt2") |
| sql("create table comp_dt2(id int,name string) partitioned by (dt date,c4 int) stored by 'carbondata'") |
| sql("insert into comp_dt2 select 1,'A','2001-01-01',1") |
| sql("insert into comp_dt2 select 2,'B','2001-01-01',1") |
| sql("insert into comp_dt2 select 3,'C','2002-01-01',2") |
| sql("insert into comp_dt2 select 4,'D','2002-01-01',null") |
| assert(sql("select * from comp_dt2").collect().length == 4) |
| sql("Alter table comp_dt2 compact 'minor'") |
| assert(sql("select * from comp_dt2").collect().length == 4) |
| sql("clean files for table comp_dt2") |
| assert(sql("select * from comp_dt2").collect().length == 4) |
| sql("insert into comp_dt2 select 5,'E','2003-01-01',3") |
| sql("insert into comp_dt2 select 6,'F','2003-01-01',3") |
| sql("insert into comp_dt2 select 7,'G','2003-01-01',4") |
| sql("insert into comp_dt2 select 8,'H','2004-01-01',''") |
| assert(sql("select * from comp_dt2").collect().length == 8) |
| sql("Alter table comp_dt2 compact 'minor'") |
| sql("clean files for table comp_dt2") |
| assert(sql("select * from comp_dt2").collect().length == 8) |
| assert(sql("select * from comp_dt2").collect().length == 8) |
| sql("insert into comp_dt2 select 9,'H','2001-01-01',1") |
| sql("insert into comp_dt2 select 10,'I','2002-01-01',null") |
| sql("insert into comp_dt2 select 11,'J','2003-01-01',4") |
| sql("insert into comp_dt2 select 12,'K','2003-01-01',5") |
| assert(sql("select * from comp_dt2").collect().length == 12) |
| sql("Alter table comp_dt2 compact 'minor'") |
| assert(sql("show segments for table comp_dt2").collect().length == 8) |
| assert(sql("select * from comp_dt2").collect().length == 12) |
| sql("clean files for table comp_dt2") |
| assert(sql("select * from comp_dt2").collect().length == 12) |
| sql("insert into comp_dt2 select 13,'L','2004-01-01', 6") |
| assert(sql("select * from comp_dt2").collect().length == 13) |
| sql("Alter table comp_dt2 compact 'major'") |
| assert(sql("select * from comp_dt2").collect().length == 13) |
| assert(sql("show segments for table comp_dt2").collect().length == 3) |
| assert(sql("select * from comp_dt2").collect().length == 13) |
| sql("clean files for table comp_dt2") |
| assert(sql("show segments for table comp_dt2").collect().length == 1) |
| assert(sql("select * from comp_dt2").collect().length == 13) |
| } |
| |
| test("test insert into partition column which does not exists") { |
| sql("drop table if exists partitionNoColumn") |
| sql("create table partitionNoColumn (name string, dob date) partitioned by(year int,month int) stored by 'carbondata'") |
| val exMessage = intercept[Exception] { |
| sql("insert into partitionNoColumn partition(year=2014,month=01,day=01) select 'martin','2014-04-07'") |
| } |
| assert(exMessage.getMessage.contains("day is not a valid partition column in table default.partitionnocolumn")) |
| } |
| |
| test("data loading with default partition in static partition table with local_sort") { |
| sql("DROP TABLE IF EXISTS partitiondefaultlocalsort") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultlocalsort (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='local_sort') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultlocalsort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultlocalsort"), Seq(Row(10))) |
| } |
| |
| test("data loading with default partition in static partition table with nosort") { |
| sql("DROP TABLE IF EXISTS partitiondefaultnosort") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultnosort (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='NO_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultnosort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultnosort"), Seq(Row(10))) |
| } |
| |
| test("data loading with default partition in static partition table with rename") { |
| sql("DROP TABLE IF EXISTS partitiondefaultrename") |
| sql("DROP TABLE IF EXISTS partitiondefaultrename_new") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultrename (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultrename"), Seq(Row(10))) |
| sql(s"alter table partitiondefaultrename rename to partitiondefaultrename_new") |
| checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(10))) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(20))) |
| } |
| |
| test("data loading with default partition in static partition table with rename first") { |
| sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst") |
| sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst_new") |
| sql( |
| """ |
| | CREATE TABLE partitiondefaultrenamefirst (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectenddate Timestamp,attendance int, |
| | utilization int, doj Timestamp, empname String) |
| | PARTITIONED BY (projectjoindate Timestamp, salary decimal) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"alter table partitiondefaultrenamefirst rename to partitiondefaultrenamefirst_new") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrenamefirst_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| checkAnswer(sql("select count(*) from partitiondefaultrenamefirst_new"), Seq(Row(10))) |
| } |
| |
| test("data loading for global partition table for two partition column with no columns in csv") { |
| sql("DROP TABLE IF EXISTS partitiontwonocolumns") |
| sql( |
| """ |
| | CREATE TABLE partitiontwonocolumns (empno int, designation String, |
| | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, |
| | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, |
| | utilization int,salary int,doj Timestamp, empname String) |
| | PARTITIONED BY (newcol1 date, newcol2 int) |
| | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') |
| """.stripMargin) |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwonocolumns partition(newcol1='2016-08-09', newcol2='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") |
| |
| checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwonocolumns order by empno"), |
| sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) |
| |
| checkAnswer(sql("select distinct cast(newcol1 as string) from partitiontwonocolumns"), Seq(Row("2016-08-09"))) |
| } |
| |
| override def afterAll = { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, |
| CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, |
| CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , |
| CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) |
| dropTable |
| if (executorService != null && !executorService.isShutdown) { |
| executorService.shutdownNow() |
| } |
| } |
| |
| def dropTable = { |
| sql("drop table if exists originTable") |
| sql("drop table if exists originMultiLoads") |
| sql("drop table if exists partitionone") |
| sql("drop table if exists partitiontwo") |
| sql("drop table if exists partitionthree") |
| sql("drop table if exists partitionmultiplethree") |
| sql("drop table if exists insertpartitionthree") |
| sql("drop table if exists staticpartitionone") |
| sql("drop table if exists singlepasspartitionone") |
| sql("drop table if exists loadstaticpartitionone") |
| sql("drop table if exists loadstaticpartitiononeoverwrite") |
| sql("drop table if exists streamingpartitionedtable") |
| sql("drop table if exists mergeindexpartitionthree") |
| sql("drop table if exists loadstaticpartitiononeissue") |
| sql("drop table if exists partitionmultiplethreeconcurrent") |
| sql("drop table if exists loadpartitionwithspecialchar") |
| sql("drop table if exists emp1") |
| sql("drop table if exists restorepartition") |
| sql("drop table if exists casesensitivepartition") |
| sql("drop table if exists badrecordsPartition") |
| sql("drop table if exists staticpartitionload") |
| sql("drop table if exists badrecordsPartitionignore") |
| sql("drop table if exists badrecordsPartitionfail") |
| sql("drop table if exists badrecordsignore") |
| sql("drop table if exists badrecordsPartitionintnull") |
| sql("drop table if exists badrecordsPartitionintnullalt") |
| sql("drop table if exists partitiondateinsert") |
| sql("drop table if exists staticpartitiondateinsert") |
| sql("drop table if exists loadstaticpartitiondynamic") |
| sql("drop table if exists insertstaticpartitiondynamic") |
| sql("drop table if exists partitionallcompaction") |
| sql("drop table if exists weather6") |
| sql("drop table if exists weather7") |
| sql("drop table if exists uniqdata_hive_static") |
| sql("drop table if exists uniqdata_hive_dynamic") |
| sql("drop table if exists uniqdata_string_static") |
| sql("drop table if exists uniqdata_string_dynamic") |
| sql("drop table if exists partitionLoadTable") |
| sql("drop table if exists noLoadTable") |
| sql("drop table if exists carbon_test") |
| sql("drop table if exists carbon_test_hive") |
| sql("drop table if exists partitiondecimal") |
| sql("drop table if exists partitiondecimalstatic") |
| sql("drop table if exists partitiondatadelete") |
| sql("drop table if exists comp_dt2") |
| sql("drop table if exists partitionNoColumn") |
| sql("drop table if exists partitiondefaultlocalsort") |
| } |
| } |