blob: 7235527ceedf53471679279fcb8f4184a186c4ef [file] [log] [blame]
package org.apache.carbondata.geo
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedIndexCommandException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
val table1 = "geoTable1"
val table2 = "geotable2"
val result = Seq(Row(116187332, 39979316),
Row(116362699, 39942444),
Row(116288955, 39999101),
Row(116325378, 39963129),
Row(116337069, 39951887),
Row(116285807, 40084087))
override def beforeAll(): Unit = {
drop()
}
test("Invalid spatial index property") {
// Index name must not match with table column name. Fails to create table.
var exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='longitude')
""".stripMargin))
assert(exception.getMessage.contains(
"index: longitude must not match with any other column name in the table"))
// Type property is not configured. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash')
""".stripMargin))
assert(exception.getMessage.contains(
s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.type property must be specified"))
// Source columns are not configured. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash', 'SPATIAL_INDEX.mygeohash.type'='geohash')
""".stripMargin))
assert(exception.getMessage.contains(
s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.sourcecolumns property must be " +
s"specified."))
// Source columns must be present in the table. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash', 'SPATIAL_INDEX.mygeohash.type'='geohash',
| 'SPATIAL_INDEX.mygeohash.sourcecolumns'='unknown1, unknown2')
""".stripMargin))
assert(exception.getMessage.contains(
s"Source column: unknown1 in property " +
s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.sourcecolumns must be a column in the " +
s"table."))
}
test("test geo table with invalid table properties") {
var exception = intercept[MalformedCarbonCommandException](
createTable(table1, " 'RANGE_COLUMN'='timevalue', 'COLUMN_META_CACHE' = 'mygeohash', "))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): column_meta_cache"))
exception = intercept[MalformedCarbonCommandException](
createTable(table1, " 'NO_INVERTED_INDEX'='mygeohash', "))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): no_inverted_index"))
exception = intercept[MalformedCarbonCommandException](
createTable(table1,
" 'SORT_COLUMNS'='mygeohash, timevalue ', 'INVERTED_INDEX'='mygeohash', "))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): inverted_index"))
exception = intercept[MalformedCarbonCommandException](
createTable(table1, " 'RANGE_COLUMN'='mygeohash', "))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): range_column"))
exception = intercept[MalformedCarbonCommandException](
createTable(table1, " 'BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='mygeohash', "))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): bucket_columns"))
}
test("test alter table with invalid table properties") {
createTable()
var exception = intercept[RuntimeException](
sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('SORT_COLUMNS'='mygeohash, timevalue ', " +
s"'INVERTED_INDEX'='mygeohash')"))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): inverted_index"))
exception = intercept[RuntimeException](
sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('NO_INVERTED_INDEX'='mygeohash')"))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): no_inverted_index"))
exception = intercept[RuntimeException](
sql(s"ALTER TABLE $table1 SET TBLPROPERTIES('COLUMN_META_CACHE' = 'mygeohash')"))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): column_meta_cache"))
}
test("test materialized view with spatial column") {
createTable()
loadData()
val exception = intercept[MalformedCarbonCommandException](sql(
s"CREATE MATERIALIZED VIEW view1 AS SELECT longitude, mygeohash FROM $table1"))
assert(exception.getMessage.contains(
s"mygeohash is a spatial index column and is not allowed for " +
s"the option(s): MATERIALIZED VIEW"))
}
test("test geo table create index on spatial column") {
createTable()
loadData()
val exception = intercept[MalformedIndexCommandException](sql(
s"""
| CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
| AS 'bloomfilter'
| PROPERTIES('BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')
""".stripMargin))
assert(exception.getMessage.contains(
s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
}
test("test geo table create and load and check describe formatted") {
createTable()
loadData()
// Test if spatial index column is added as a sort column
val descTable = sql(s"describe formatted $table1").collect
descTable.find(_.get(0).toString.contains("Sort Scope")) match {
case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
case None => assert(false)
}
descTable.find(_.get(0).toString.contains("Sort Columns")) match {
case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
case None => assert(false)
}
}
test("test geo table drop spatial index column") {
createTable()
loadData()
val exception = intercept[MalformedCarbonCommandException](
sql(s"alter table $table1 drop columns(mygeohash)"))
assert(exception.getMessage.contains(
s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
s"table property cannot be altered/updated"))
}
test("test geo table alter spatial index column") {
createTable()
loadData()
val exception = intercept[MalformedCarbonCommandException](
sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 "))
assert(exception.getMessage.contains(
s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
s"table property cannot be altered/updated"))
}
test("test geo table filter by geo spatial index column") {
createTable()
loadData()
checkAnswer(sql(s"select *from $table1 where mygeohash = '2196036'"),
Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
}
test("test polygon query") {
createTable()
loadData()
checkAnswer(
sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
result)
}
test("test insert into table select from another table") {
val sourceTable = table1;
val targetTable = table2;
createTable(sourceTable)
loadData(sourceTable)
createTable(targetTable)
sql(s"insert into $targetTable select * from $sourceTable")
checkAnswer(
sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
result)
}
test("insert into table select from another geo table with different properties") {
val sourceTable = table1;
val targetTable = table2;
sql(
s"""
| CREATE TABLE $sourceTable(
| timevalue BIGINT,
| longitude LONG,
| latitude LONG) COMMENT "This is a GeoTable"
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='spatial',
| 'SPATIAL_INDEX.spatial.type'='geohash',
| 'SPATIAL_INDEX.spatial.sourcecolumns'='longitude, latitude',
| 'SPATIAL_INDEX.spatial.originLatitude'='39.832277',
| 'SPATIAL_INDEX.spatial.gridSize'='60',
| 'SPATIAL_INDEX.spatial.minLongitude'='115.811865',
| 'SPATIAL_INDEX.spatial.maxLongitude'='116.782233',
| 'SPATIAL_INDEX.spatial.minLatitude'='39.832277',
| 'SPATIAL_INDEX.spatial.maxLatitude'='40.225281',
| 'SPATIAL_INDEX.spatial.conversionRatio'='1000000')
""".stripMargin)
loadData(sourceTable)
createTable(targetTable)
sql(s"insert into $targetTable select * from $sourceTable")
checkAnswer(sql(s"select *from $targetTable where mygeohash = '2196036'"),
Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
}
test("test insert into non-geo table select from geo table") {
val sourceTable = table1;
val targetTable = table2;
createTable(sourceTable)
loadData(sourceTable)
sql(
s"""
CREATE TABLE IF NOT EXISTS $targetTable
(spatial Long, time Bigint, longitude Long, latitude Long)
STORED AS carbondata
""")
sql(s"insert into $targetTable select * from $sourceTable")
checkAnswer(
sql(s"select * from $targetTable where spatial='2196036'"),
Seq(Row(2196036, 1575428400000L, 116337069, 39951887)))
}
test("test insert into table select from another table with target table sort scope as global") {
val sourceTable = table1;
val targetTable = table2;
createTable(sourceTable)
loadData(sourceTable)
createTable(targetTable, "'SORT_SCOPE'='GLOBAL_SORT',")
sql(s"insert into $targetTable select * from $sourceTable")
checkAnswer(
sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
result)
}
test("test block pruning for polygon query") {
createTable()
sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
sql(s"insert into $table1 select 0,1575428400000,116372142,40129503")
sql(s"insert into $table1 select 0,1575428400000,116187332,39979316")
sql(s"insert into $table1 select 0,1575428400000,116337069,39951887")
sql(s"insert into $table1 select 0,1575428400000,116359102,40154684")
sql(s"insert into $table1 select 0,1575428400000,116736367,39970323")
sql(s"insert into $table1 select 0,1575428400000,116362699,39942444")
sql(s"insert into $table1 select 0,1575428400000,116325378,39963129")
sql(s"insert into $table1 select 0,1575428400000,116302895,39930753")
sql(s"insert into $table1 select 0,1575428400000,116288955,39999101")
val df = sql(s"select * from $table1 where IN_POLYGON('116.321011 " +
s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')")
assert(df.rdd.getNumPartitions == 6)
checkAnswer(df, Seq(Row(733215, 1575428400000L, 116187332, 39979316),
Row(2160019, 1575428400000L, 116362699, 39942444),
Row(2170151, 1575428400000L, 116288955, 39999101),
Row(2174509, 1575428400000L, 116325378, 39963129),
Row(2196036, 1575428400000L, 116337069, 39951887),
Row(2361256, 1575428400000L, 116285807, 40084087)))
}
test("test insert into on table partitioned by timevalue column") {
sql(
s"""
| CREATE TABLE $table1(
| longitude LONG,
| latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY (timevalue BIGINT)
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
| 'SPATIAL_INDEX.mygeohash.type'='geohash',
| 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
| 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.gridSize'='50',
| 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
| 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
| 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
| 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
""".stripMargin)
sql(s"insert into $table1 select 0, 116337069, 39951887, 1575428400000")
checkAnswer(
sql(s"select * from $table1 where mygeohash = '2196036'"),
Seq(Row(2196036, 116337069, 39951887, 1575428400000L)))
}
test("test polygon query on table partitioned by timevalue column") {
sql(s"""
| CREATE TABLE $table1(
| longitude LONG,
| latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY (timevalue BIGINT)
| STORED AS carbondata
| TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
| 'SPATIAL_INDEX.mygeohash.type'='geohash',
| 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
| 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.gridSize'='50',
| 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
| 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
| 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
| 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
""".stripMargin)
loadData()
checkAnswer(
sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
result)
}
override def afterEach(): Unit = {
drop()
}
override def afterAll(): Unit = {
drop()
}
def drop(): Unit = {
sql(s"drop table if exists $table1")
sql(s"drop table if exists $table2")
}
def createTable(tableName : String = table1, customProperties : String = ""): Unit = {
sql(s"""
| CREATE TABLE $tableName(
| timevalue BIGINT,
| longitude LONG,
| latitude LONG) COMMENT "This is a GeoTable"
| STORED AS carbondata
| TBLPROPERTIES ($customProperties 'SPATIAL_INDEX'='mygeohash',
| 'SPATIAL_INDEX.mygeohash.type'='geohash',
| 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
| 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.gridSize'='50',
| 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
| 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
| 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
| 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
| 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
""".stripMargin)
}
def loadData(tableName : String = table1): Unit = {
sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE $tableName OPTIONS
|('DELIMITER'= ',')""".stripMargin)
}
}