blob: 55aa264fa978daea8b5cc08d3aebc50822d374ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.integration.spark.testsuite.timeseries
import java.sql.Timestamp
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil4Test
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
class TestTimeSeriesMatchStrategySuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
var timestampFormat: String = _
override def beforeAll: Unit = {
timestampFormat = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
SparkUtil4Test.createTaskMockUp(sqlContext)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
}
override protected def beforeEach(): Unit = {
sql("drop table if exists mainTable")
sql(
"""
| CREATE TABLE mainTable(
| mytime TIMESTAMP,
| name STRING,
| age INT)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' INTO TABLE mainTable")
}
val timeSeries = "TIMESERIES"
test("test timeseries match 1: select small one when create big_agg and then create small_agg") {
dropDataMaps("maintable", "big_agg", "small_agg")
sql(
s"""
| CREATE DATAMAP big_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age), count(age), max(age), min(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df1 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df1, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
sql(
s"""
| CREATE DATAMAP small_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df2 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df2, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df2.queryExecution.analyzed, "maintable_small_agg")
}
test("test timeseries match 2: select small one when create small_agg and then create big_agg") {
dropDataMaps("maintable", "big_agg", "small_agg")
sql(
s"""
| CREATE DATAMAP small_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df2 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df2, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df2.queryExecution.analyzed, "maintable_small_agg")
sql(
s"""
| CREATE DATAMAP big_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age), count(age), max(age), min(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df1 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df1, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
intercept[Exception]{
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
}
preAggTableValidator(df1.queryExecution.analyzed, "maintable_small_agg")
}
test("test timeseries match 3: select small one when create big_agg and then create small_agg") {
dropDataMaps("maintable", "big_agg", "small_agg")
sql(
s"""
| CREATE DATAMAP big_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age), count(age), max(age), min(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df1 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df1, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
sql(
s"""
| CREATE DATAMAP small_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df2 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df2, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df2.queryExecution.analyzed, "maintable_small_agg")
intercept[Exception] {
preAggTableValidator(df1.queryExecution.analyzed, "maintable_small_agg")
}
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
}
test("test timeseries match 4: select small one when create big_agg, small_agg, and middle_agg") {
dropDataMaps("maintable", "big_agg", "small_agg", "middle_agg")
sql(
s"""
| CREATE DATAMAP big_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age), count(age), max(age), min(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df1 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df1, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
sql(
s"""
| CREATE DATAMAP small_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df2 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
checkAnswer(df2, Seq(Row(Timestamp.valueOf("2016-02-23 09:01:00"), 60)))
preAggTableValidator(df2.queryExecution.analyzed, "maintable_small_agg")
intercept[Exception] {
preAggTableValidator(df1.queryExecution.analyzed, "maintable_small_agg")
}
preAggTableValidator(df1.queryExecution.analyzed, "maintable_big_agg")
sql(
s"""
| CREATE DATAMAP middle_agg ON TABLE mainTable
| USING '$timeSeries'
| DMPROPERTIES (
| 'event_time'='mytime',
| 'MINUTE_GRANULARITY'='1')
| AS SELECT mytime, SUM(age), count(age)
| FROM mainTable
| GROUP BY mytime
""".stripMargin)
val df3 = sql(
"""
| SELECT
| timeseries(mytime,'minute') AS minuteLevel,
| SUM(age) AS SUM
| FROM mainTable
| WHERE
| timeseries(mytime,'minute')<'2016-02-23 09:02:00' and
| timeseries(mytime,'minute')>='2016-02-23 09:01:00'
| GROUP BY
| timeseries(mytime,'minute')
| ORDER BY
| timeseries(mytime,'minute')
""".stripMargin)
preAggTableValidator(df3.queryExecution.analyzed, "maintable_small_agg")
}
def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
var isValidPlan = false
plan.transform {
// first check if any preaTable1 scala function is applied it is present is in plan
// then call is FROM create preaTable1regate table class so no need to transform
// the query plan
case ca:CarbonRelation =>
if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
isValidPlan = true
}
}
ca
case logicalRelation:LogicalRelation =>
if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
isValidPlan = true
}
}
logicalRelation
}
if(!isValidPlan) {
assert(false)
} else {
assert(true)
}
}
override def afterAll: Unit = {
dropDataMaps("maintable", "agg0_second", "agg0_hour", "agg0_day", "agg0_month", "agg0_year")
sql("drop table if exists mainTable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestampFormat)
}
}