| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain id copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.spark.testsuite.standardpartition |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan |
| import org.apache.spark.sql.execution.datasources.LogicalRelation |
| import org.apache.spark.sql.hive.CarbonRelation |
| import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row} |
| import org.apache.spark.sql.test.util.QueryTest |
| import org.scalatest.BeforeAndAfterAll |
| |
| class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll { |
| |
| val testData = s"$resourcesPath/sample.csv" |
| |
| override def beforeAll(): Unit = { |
| sql("drop database if exists partition_preaggregate cascade") |
| sql("create database partition_preaggregate") |
| sql("use partition_preaggregate") |
| sql( |
| """ |
| | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| """ |
| | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") |
| } |
| |
| override def afterAll(): Unit = { |
| sql("drop database if exists partition_preaggregate cascade") |
| sql("use default") |
| } |
| |
| // Create aggregate table on partition with partition column in aggregation only. |
| test("test preaggregate table creation on partition table with partition col as aggregation") { |
| sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id") |
| assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable) |
| } |
| |
| // Create aggregate table on partition with partition column in projection and aggregation only. |
| test("test preaggregate table creation on partition table with partition col as projection") { |
| sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ") |
| assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable) |
| } |
| |
| // Create aggregate table on partition with partition column as group by. |
| test("test preaggregate table creation on partition table with partition col as group by") { |
| sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ") |
| assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable) |
| } |
| |
| // Create aggregate table on partition without partition column. |
| test("test preaggregate table creation on partition table without partition column") { |
| sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ") |
| assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable) |
| } |
| |
| test("test data correction in aggregate table when partition column is used") { |
| sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age") |
| checkAnswer(sql("select * from maintable_p1"), |
| Seq(Row(1,31,31), |
| Row(2,27,27), |
| Row(3,70,35), |
| Row(4,26,26), |
| Row(4,29,29))) |
| preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1") |
| sql("drop datamap p1 on table maintable") |
| } |
| |
| test("test data correction in aggregate table when partition column is not used") { |
| sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id") |
| checkAnswer(sql("select * from maintable_p2"), |
| Seq(Row(1,31), |
| Row(2,27), |
| Row(3,35), |
| Row(4,29))) |
| preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2") |
| sql("drop datamap p2 on table maintable") |
| } |
| |
| test("test data correction with insert overwrite") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert overwrite table partitionone values('v',2014,1,1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1))) |
| } |
| |
| test("test data correction with insert overwrite on different value") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert overwrite table partitionone values('v',2015,1,1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,1), Row("v",2015,2015,1,1))) |
| } |
| |
| test("test to check column ordering in parent and child table") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, month, year,day") |
| val parentTable = CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "partitionone")(sqlContext.sparkSession) |
| val childTable = CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "partitionone_p1")(sqlContext.sparkSession) |
| val parentPartitionColumns = parentTable.getPartitionInfo.getColumnSchemaList |
| val childPartitionColumns = childTable.getPartitionInfo.getColumnSchemaList |
| assert(parentPartitionColumns.asScala.zip(childPartitionColumns.asScala).forall { |
| case (a,b) => |
| a.getColumnName.equalsIgnoreCase(b.getParentColumnTableRelations.get(0).getColumnName) |
| }) |
| } |
| |
| test("test data after minor compaction on partition table with pre-aggregate") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone compact 'minor'") |
| val showSegments = sql("show segments for table partitionone").collect().map{a=> (a.get(0), a.get(1))} |
| assert(showSegments.count(_._2 == "Success") == 1) |
| assert(showSegments.count(_._2 == "Compacted") == 4) |
| } |
| |
| test("test data after major compaction on partition table with pre-aggregate") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone compact 'major'") |
| val showSegments = sql("show segments for table partitionone").collect().map{a=> (a.get(0), a.get(1))} |
| assert(showSegments.count(_._2 == "Success") == 1) |
| assert(showSegments.count(_._2 == "Compacted") == 5) |
| } |
| |
| test("test drop partition 1") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(day=1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2))) |
| } |
| |
| test("test drop partition 2") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(day=1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2), Row("k",2015,2,3))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3))) |
| } |
| |
| test("test data with filter query") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(day=1)") |
| checkAnswer(sql("select empname, sum(year) from partitionone where day=3 group by empname, year, month, day"), Seq(Row("k",2015))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3))) |
| } |
| |
| test("test drop partition 3") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(day=1,month=1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2), Row("k",2015, 2,3), Row("k",2015, 2,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3), Row("k",2015,2015,2,1))) |
| } |
| |
| test("test drop partition 4") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(year=2014,day=1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2), Row("k",2015, 2,3), Row("k",2015, 2,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015, 2,3), Row("k",2015,2015, 2,1))) |
| } |
| |
| test("test drop partition 5") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2), Row("k",2015, 2,3), Row("k",2015, 2,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3), Row("k",2015,2015,2,1))) |
| sql("show partitions partitionone_p1").show() |
| } |
| |
| test("test drop partition 6") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2015,2,3)") |
| sql("insert into partitionone values('k',2015,2,1)") |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,2), Row("k",2015, 2,3), Row("k",2015, 2,1))) |
| checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3), Row("k",2015,2015,2,1))) |
| } |
| |
| test("test drop partition 7") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, day") |
| sql( |
| "create datamap p2 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, month") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| }.getMessage |
| assert(exceptionMessage.contains("Cannot drop partition as one of the partition")) |
| assert(exceptionMessage.contains("p2")) |
| assert(exceptionMessage.contains("p1")) |
| } |
| |
| test("test drop partition 8") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql( |
| "create datamap p2 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, month, day") |
| sql( |
| "create datamap p3 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| }.getMessage |
| assert(exceptionMessage.contains("Cannot drop partition as one of the partition")) |
| assert(exceptionMessage.contains("p2")) |
| assert(exceptionMessage.contains("p3")) |
| assert(!exceptionMessage.contains("p1")) |
| } |
| |
| test("test drop partition 9") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| }.getMessage |
| assert(exceptionMessage.contains("Cannot drop partition as one of the partition")) |
| assert(exceptionMessage.contains("p1")) |
| } |
| |
| test("test drop partition 10") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") |
| sql( |
| "create datamap p2 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month, day") |
| sql("insert into partitionone values('k',2014,1,1)") |
| sql("insert into partitionone values('k',2014,1,2)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| }.getMessage |
| assert(exceptionMessage.contains("Cannot drop partition as one of the partition")) |
| assert(exceptionMessage.contains("p1")) |
| sql("drop datamap p1 on table partitionone") |
| sql("alter table partitionone drop partition(year=2014,month=1, day=1)") |
| } |
| |
| test("test drop partition 11") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year") |
| sql("insert into partitionone values('k',2014,1,1)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone_p1 drop partition(partitionone_year=1)") |
| }.getMessage |
| assert(exceptionMessage.contains("Cannot drop partition directly on aggregate table")) |
| } |
| |
| test("test drop partition 12") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") |
| sql("insert into partitionone values('k',2014,1,1)") |
| val exceptionMessage = intercept[Exception] { |
| sql("alter table partitionone_p1 drop partition(year=2014)") |
| }.getMessage |
| assert(exceptionMessage.contains("operation failed for partition_preaggregate.partitionone_p1: Not a partitioned table")) |
| } |
| |
| test("test add partition on aggregate table") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") |
| assert(intercept[Exception] { |
| sql("alter table partitionone_p1 add partition(c=1)") |
| }.getMessage.equals("Cannot add partition directly on non partitioned table")) |
| } |
| |
| test("test if alter rename is blocked on partition table with preaggregate") { |
| sql("drop table if exists partitionone") |
| sql( |
| """ |
| | CREATE TABLE if not exists partitionone (empname String, id int) |
| | PARTITIONED BY (year int, month int,day int) |
| | STORED BY 'org.apache.carbondata.format' |
| """.stripMargin) |
| sql( |
| "create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname") |
| intercept[Exception] { |
| sql("alter table partitionone rename to p") |
| } |
| } |
| |
| test("test dropping partition which has already been deleted") { |
| sql("drop table if exists partitiontable") |
| sql("create table partitiontable(id int,name string) partitioned by (email string) " + |
| "stored by 'carbondata' tblproperties('sort_scope'='global_sort')") |
| sql("insert into table partitiontable select 1,'huawei','abc'") |
| sql("create datamap ag1 on table partitiontable using 'preaggregate' as select count(email),id" + |
| " from partitiontable group by id") |
| sql("create datamap ag2 on table partitiontable using 'preaggregate' as select sum(email),name" + |
| " from partitiontable group by name") |
| sql("create datamap ag3 on table partitiontable using 'preaggregate' as select max(email),name" + |
| " from partitiontable group by name") |
| sql("create datamap ag4 on table partitiontable using 'preaggregate' as select min(email),name" + |
| " from partitiontable group by name") |
| sql("create datamap ag5 on table partitiontable using 'preaggregate' as select avg(email),name" + |
| " from partitiontable group by name") |
| sql("alter table partitiontable add partition (email='def')") |
| sql("insert into table partitiontable select 1,'huawei','def'") |
| sql("drop datamap ag1 on table partitiontable") |
| sql("drop datamap ag2 on table partitiontable") |
| sql("drop datamap ag3 on table partitiontable") |
| sql("drop datamap ag4 on table partitiontable") |
| sql("drop datamap ag5 on table partitiontable") |
| sql("alter table partitiontable drop partition(email='def')") |
| assert(intercept[Exception] { |
| sql("alter table partitiontable drop partition(email='def')") |
| }.getMessage.contains("No partition is dropped. One partition spec 'Map(email -> def)' does not exist in table 'partitiontable' database 'partition_preaggregate'")) |
| } |
| |
| test("test Pre-Aggregate table creation with count(*) on Partition table") { |
| sql("drop table if exists partitiontable") |
| sql("create table partitiontable(id int,name string) partitioned by (email string) " + |
| "stored by 'carbondata' tblproperties('sort_scope'='global_sort')") |
| sql("insert into table partitiontable select 1,'huawei','abc'") |
| sql("create datamap ag1 on table partitiontable using 'preaggregate' as select count(*),id" + |
| " from partitiontable group by id") |
| sql("insert into table partitiontable select 1,'huawei','def'") |
| assert(sql("show datamap on table partitiontable").collect().head.get(0).toString.equalsIgnoreCase("ag1")) |
| sql("drop datamap ag1 on table partitiontable") |
| } |
| |
| test("test blocking partitioning of Pre-Aggregate table") { |
| sql("drop table if exists updatetime_8") |
| sql("create table updatetime_8" + |
| "(countryid smallint,hs_len smallint,minstartdate string,startdate string,newdate string,minnewdate string) partitioned by (imex smallint) stored by 'carbondata' tblproperties('sort_scope'='global_sort','sort_columns'='countryid,imex,hs_len,minstartdate,startdate,newdate,minnewdate','table_blocksize'='256')") |
| sql("create datamap ag on table updatetime_8 using 'preaggregate' dmproperties('partitioning'='false') as select imex,sum(hs_len) from updatetime_8 group by imex") |
| val carbonTable = CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "updatetime_8_ag")(sqlContext.sparkSession) |
| assert(!carbonTable.isHivePartitionTable) |
| sql("drop table if exists updatetime_8") |
| } |
| |
| test("Test data updation in Aggregate query after compaction on Partitioned table with Pre-Aggregate table") { |
| sql("drop table if exists updatetime_8") |
| sql("create table updatetime_8" + |
| "(countryid smallint,hs_len smallint,minstartdate string,startdate string,newdate string,minnewdate string) partitioned by (imex smallint) stored by 'carbondata' tblproperties('sort_scope'='global_sort','sort_columns'='countryid,imex,hs_len,minstartdate,startdate,newdate,minnewdate','table_blocksize'='256')") |
| sql("create datamap ag on table updatetime_8 using 'preaggregate' as select sum(hs_len) from updatetime_8 group by imex") |
| sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',23") |
| sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',24") |
| sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',23") |
| sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24") |
| sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24") |
| sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24") |
| sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25") |
| sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25") |
| sql("alter table updatetime_8 compact 'minor'") |
| sql("alter table updatetime_8 compact 'minor'") |
| checkAnswer(sql("select sum(hs_len) from updatetime_8 group by imex"),Seq(Row(40),Row(42),Row(83))) |
| } |
| |
| def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit = { |
| var isValidPlan = false |
| plan.transform { |
| // first check if any preaTable1 scala function is applied it is present is in plan |
| // then call is from create preaTable1regate table class so no need to transform the query plan |
| case ca:CarbonRelation => |
| if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) { |
| val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation] |
| if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { |
| isValidPlan = true |
| } |
| } |
| ca |
| case logicalRelation:LogicalRelation => |
| if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { |
| val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] |
| if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { |
| isValidPlan = true |
| } |
| } |
| logicalRelation |
| } |
| assert(isValidPlan) |
| } |
| |
| } |