| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.mv.timeseries |
| |
| import java.util.concurrent.{Callable, Executors, TimeUnit} |
| |
| import org.apache.spark.sql.test.util.CarbonQueryTest |
| import org.scalatest.BeforeAndAfterAll |
| |
| import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.mv.rewrite.TestUtil |
| |
| class TestMVTimeSeriesCreateDataMapCommand extends CarbonQueryTest with BeforeAndAfterAll { |
| |
| private val timestampFormat = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) |
| |
| override def beforeAll(): Unit = { |
| CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") |
| drop() |
| sql("CREATE TABLE maintable (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + |
| "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED BY 'org.apache.carbondata.format'") |
| sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable OPTIONS |
| |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) |
| } |
| |
| def drop(): Unit = { |
| sql("drop table if exists products") |
| sql("drop table IF EXISTS main_table") |
| sql("drop table IF EXISTS maintable") |
| } |
| |
| test("test mv_timeseries create datamap") { |
| sql("drop datamap if exists datamap1") |
| sql( |
| "create datamap datamap1 on table maintable using 'mv'" + |
| " as select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") |
| val result = sql("show datamap on table maintable").collectAsList() |
| assert(result.get(0).get(0).toString.equalsIgnoreCase("datamap1")) |
| assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED")) |
| val df = sql("select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") |
| assert(TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "datamap1")) |
| sql("drop datamap if exists datamap1") |
| } |
| |
| test("test mv_timeseries create lazy datamap") { |
| sql("drop datamap if exists datamap1") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' with deferred rebuild as " + |
| "select timeseries(projectjoindate,'second') from maintable group by timeseries(projectjoindate,'second')") |
| }.getMessage.contains("MV TimeSeries queries does not support Lazy Rebuild") |
| } |
| |
| test("test mv_timeseries create datamap with multiple granularity") { |
| sql("drop datamap if exists datamap1") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'second'), timeseries(projectjoindate,'hour') from maintable") |
| }.getMessage.contains("Multiple timeseries udf functions are defined in Select statement with different granularities") |
| } |
| |
| test("test mv_timeseries create datamap with date type as timeseries_column") { |
| sql("drop table IF EXISTS maintable_new") |
| sql("CREATE TABLE maintable_new (projectcode int, projectjoindate date, projectenddate Timestamp,attendance int) " + |
| "STORED BY 'org.apache.carbondata.format'") |
| sql("drop datamap if exists datamap1") |
| sql( |
| "create datamap datamap1 on table maintable_new using 'mv' as " + |
| "select timeseries(projectjoindate,'day') from maintable_new") |
| val result = sql("show datamap on table maintable_new").collectAsList() |
| assert(result.get(0).get(0).toString.equalsIgnoreCase("datamap1")) |
| assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED")) |
| sql("drop table IF EXISTS maintable_new") |
| } |
| |
| test("test mv_timeseries create datamap with date type as timeseries_column with incorrect granularity") { |
| sql("drop table IF EXISTS maintable_new") |
| sql("CREATE TABLE maintable_new (projectcode int, projectjoindate date, projectenddate Timestamp,attendance int) " + |
| "STORED BY 'org.apache.carbondata.format'") |
| sql("drop datamap if exists datamap1") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable_new using 'mv' as " + |
| "select timeseries(projectjoindate,'second') from maintable_new") |
| }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable_new using 'mv' as " + |
| "select timeseries(projectjoindate,'five_minute') from maintable_new") |
| }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable_new using 'mv' as " + |
| "select timeseries(projectjoindate,'hour') from maintable_new") |
| }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") |
| sql("drop table IF EXISTS maintable_new") |
| } |
| |
| test("test mv_timeseries create datamap - Parent table name is different in Create and Select Statement") { |
| sql("drop table if exists main_table") |
| sql("CREATE TABLE main_table (empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, " + |
| "deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, utilization int,salary int) STORED BY 'org.apache.carbondata.format'") |
| sql("drop datamap if exists datamap1") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table main_table using 'mv' as " + |
| "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") |
| }.getMessage.contains("Parent table name is different in Create and Select Statement") |
| sql("drop table if exists main_table") |
| } |
| |
| test("test mv_timeseries for same event_column with different granularities") { |
| def dropDataMaps = { |
| sql("drop datamap if exists datamap1") |
| sql("drop datamap if exists datamap2") |
| sql("drop datamap if exists datamap3") |
| sql("drop datamap if exists datamap4") |
| sql("drop datamap if exists datamap5") |
| } |
| dropDataMaps |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") |
| sql( |
| "create datamap datamap2 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'hour'), sum(projectcode) from maintable group by timeseries(projectjoindate,'hour')") |
| sql( |
| "create datamap datamap3 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'minute'), sum(projectcode) from maintable group by timeseries(projectjoindate,'minute')") |
| sql( |
| "create datamap datamap4 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'day'), sum(projectcode) from maintable group by timeseries(projectjoindate,'day')") |
| sql( |
| "create datamap datamap5 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") |
| dropDataMaps |
| } |
| |
| test("test mv_timeseries create datamap with more event_columns") { |
| sql("drop datamap if exists datamap1") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'hour'), timeseries(projectenddate,'hour') from maintable") |
| }.getMessage.contains( |
| "Multiple timeseries udf functions are defined in Select statement with different timestamp columns") |
| } |
| |
| test("test mv_timeseries create datamap with same granularity and different ctas") { |
| sql("drop datamap if exists datamap1") |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable group by timeseries(projectjoindate,'second')") |
| sql("drop datamap if exists datamap2") |
| sql( |
| "create datamap datamap2 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'second'), sum(projectcode) from maintable where projectjoindate='29-06-2008 00:00:00.0' " + |
| "group by timeseries(projectjoindate,'second')") |
| sql("drop datamap if exists datamap1") |
| sql("drop datamap if exists datamap2") |
| } |
| |
| test("insert and create datamap in progress") { |
| sql("drop datamap if exists datamap1") |
| val query = s"LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE maintable " + |
| s"OPTIONS('DELIMITER'= ',')" |
| val executorService = Executors.newFixedThreadPool(4) |
| executorService.submit(new QueryTask(query)) |
| intercept[UnsupportedOperationException] { |
| sql( |
| "create datamap datamap1 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'year'), sum(projectcode) from maintable group by timeseries(projectjoindate,'year')") |
| }.getMessage |
| .contains("Cannot create mv datamap table when insert is in progress on parent table: maintable") |
| executorService.shutdown() |
| executorService.awaitTermination(2, TimeUnit.HOURS) |
| sql("drop datamap if exists datamap1") |
| } |
| |
| test("test create datamap with incorrect timeseries_column and granularity") { |
| sql("drop datamap if exists datamap2") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap2 on table maintable using 'mv' as " + |
| "select timeseries(projectjoindate,'time'), sum(projectcode) from maintable group by timeseries(projectjoindate,'time')") |
| }.getMessage.contains("Granularity time is invalid") |
| intercept[MalformedCarbonCommandException] { |
| sql( |
| "create datamap datamap2 on table maintable using 'mv' as " + |
| "select timeseries(empname,'second'), sum(projectcode) from maintable group by timeseries(empname,'second')") |
| }.getMessage.contains("MV Timeseries is only supported on Timestamp/Date column") |
| } |
| |
| class QueryTask(query: String) extends Callable[String] { |
| override def call(): String = { |
| var result = "PASS" |
| try { |
| sql(query).collect() |
| } catch { |
| case exception: Exception => LOGGER.error(exception.getMessage) |
| result = "FAIL" |
| } |
| result |
| } |
| } |
| |
| override def afterAll(): Unit = { |
| drop() |
| if (null != timestampFormat) { |
| CarbonProperties.getInstance() |
| .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat) |
| } |
| } |
| } |