blob: 335ca5687f978534c62bb63b7c5733d2ad3f4e79 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from shapely.geometry import Polygon
from tests import csv_point_input_location, union_polygon_input_location
from tests.test_base import TestBase
class TestConstructors(TestBase):
def test_st_envelope_aggr(self):
point_csv_df = (
self.spark.read.format("csv")
.option("delimiter", ",")
.option("header", "false")
.load(csv_point_input_location)
)
point_csv_df.createOrReplaceTempView("pointtable")
point_df = self.spark.sql(
"select ST_Point(cast(pointtable._c0 as Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from pointtable"
)
point_df.createOrReplaceTempView("pointdf")
boundary = self.spark.sql(
"select ST_Envelope_Aggr(pointdf.arealandmark) from pointdf"
)
coordinates = [
(1.1, 101.1),
(1.1, 1100.1),
(1000.1, 1100.1),
(1000.1, 101.1),
(1.1, 101.1),
]
polygon = Polygon(coordinates)
assert boundary.take(1)[0][0] == polygon
def test_st_union_aggr(self):
polygon_csv_df = (
self.spark.read.format("csv")
.option("delimiter", ",")
.option("header", "false")
.load(union_polygon_input_location)
)
polygon_csv_df.createOrReplaceTempView("polygontable")
polygon_csv_df.show()
polygon_df = self.spark.sql(
"select ST_PolygonFromEnvelope(cast(polygontable._c0 as Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2 as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape from polygontable"
)
polygon_df.createOrReplaceTempView("polygondf")
polygon_df.show()
union = self.spark.sql(
"select ST_Union_Aggr(polygondf.polygonshape) from polygondf"
)
assert union.take(1)[0][0].area == 10100