blob: 81c86ac982aa1cb76d2793d341a0c281e667ca11 [file] [log] [blame]
package org.apache.carbondata.geo
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
override def beforeAll(): Unit = {
drop()
}
test("Invalid geo index handler property") {
// Handler name must not match with table column name. Fails to create table.
var exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('INDEX_HANDLER'='longitude')
""".stripMargin))
assert(exception.getMessage.contains(
"handler: longitude must not match with any other column name in the table"))
// Type property is not configured. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('INDEX_HANDLER'='mygeohash')
""".stripMargin))
assert(exception.getMessage.contains(
s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.type property must be specified"))
// Source columns are not configured. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 'INDEX_HANDLER.mygeohash.type'='geohash')
""".stripMargin))
assert(exception.getMessage.contains(
s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns property must be " +
s"specified."))
// Source columns must be present in the table. Fails to create table.
exception = intercept[MalformedCarbonCommandException](sql(
s"""
| CREATE TABLE malformed(timevalue BIGINT, longitude LONG, latitude LONG)
| COMMENT "This is a malformed table"
| STORED AS carbondata
| TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', 'INDEX_HANDLER.mygeohash.type'='geohash',
| 'INDEX_HANDLER.mygeohash.sourcecolumns'='unknown1, unknown2')
""".stripMargin))
assert(exception.getMessage.contains(
s"Source column: unknown1 in property " +
s"${CarbonCommonConstants.INDEX_HANDLER}.mygeohash.sourcecolumns must be a column in the " +
s"table."))
}
test("test geo table create and load and check describe formatted") {
createTable()
loadData()
// Test if index handler column is added as a sort column
val descTable = sql("describe formatted geotable").collect
descTable.find(_.get(0).toString.contains("Sort Scope")) match {
case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
case None => assert(false)
}
descTable.find(_.get(0).toString.contains("Sort Columns")) match {
case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
case None => assert(false)
}
}
test("test polygon query") {
createTable()
loadData()
checkAnswer(
sql(s"select longitude, latitude from geotable where IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
Seq(Row(116187332, 39979316),
Row(116362699, 39942444),
Row(116288955, 39999101),
Row(116325378, 39963129),
Row(116337069, 39951887),
Row(116285807, 40084087)))
}
after {
drop()
}
override def afterAll(): Unit = {
drop()
}
def drop(): Unit = {
sql("drop table if exists geotable")
}
def createTable(): Unit = {
sql(s"""
| CREATE TABLE geotable(
| timevalue BIGINT,
| longitude LONG,
| latitude LONG) COMMENT "This is a GeoTable"
| STORED AS carbondata
| TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
| 'INDEX_HANDLER.mygeohash.type'='geohash',
| 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
| 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
| 'INDEX_HANDLER.mygeohash.gridSize'='50',
| 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865',
| 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233',
| 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277',
| 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281',
| 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
""".stripMargin)
}
def loadData(): Unit = {
sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE geotable OPTIONS
|('DELIMITER'= ',')""".stripMargin)
}
}