blob: 372efeeadd60e5ddfdc4928cd08fa3dc65c352d6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import shutil
import tempfile
from shapely.geometry import (
Point,
LineString,
Polygon,
GeometryCollection,
MultiPoint,
MultiLineString,
MultiPolygon,
LinearRing,
box,
)
import shapely
from sedona.spark.geopandas import GeoDataFrame, GeoSeries
from tests.geopandas.test_geopandas_base import TestGeopandasBase
import pyspark.pandas as ps
import pandas as pd
import geopandas as gpd
import sedona.spark.geopandas as sgpd
import pytest
from pandas.testing import assert_frame_equal, assert_series_equal
from packaging.version import parse as parse_version
@pytest.mark.skipif(
parse_version(shapely.__version__) < parse_version("2.0.0"),
reason=f"Tests require shapely>=2.0.0, but found v{shapely.__version__}",
)
class TestGeoDataFrame(TestGeopandasBase):
@pytest.mark.parametrize(
"obj",
[
[Point(x, x) for x in range(3)],
{"geometry": [Point(x, x) for x in range(3)]},
pd.DataFrame([Point(x, x) for x in range(3)]),
gpd.GeoDataFrame([Point(x, x) for x in range(3)]),
pd.Series([Point(x, x) for x in range(3)]),
gpd.GeoSeries([Point(x, x) for x in range(3)]),
],
)
def test_constructor(self, obj):
sgpd_df = GeoDataFrame(obj)
check_geodataframe(sgpd_df)
def test_construct_from_geopandas(self):
gpd_df = gpd.GeoDataFrame(
{"geometry1": [Point(0, 0)]}, geometry="geometry1", crs="EPSG:3857"
)
with ps.option_context("compute.ops_on_diff_frames", True):
sgpd_df = GeoDataFrame(gpd_df)
assert sgpd_df.crs == "EPSG:3857"
assert sgpd_df.geometry.crs == "EPSG:3857"
assert sgpd_df.geometry.name == "geometry1"
@pytest.mark.parametrize(
"obj",
[
pd.DataFrame(
{
"non-geom": [1, 2, 3],
"geometry": [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) for _ in range(3)
],
}
),
gpd.GeoDataFrame(
{
"geom2": [Point(x, x) for x in range(3)],
"non-geom": [4, 5, 6],
"geometry": [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) for _ in range(3)
],
}
),
],
)
def test_complex_df(self, obj):
sgpd_df = GeoDataFrame(obj)
name = "geometry"
sgpd_df.set_geometry(name, inplace=True)
check_geodataframe(sgpd_df)
result = sgpd_df.area
expected = pd.Series([1.0, 1.0, 1.0])
self.check_pd_series_equal(result, expected)
# These need to be defined inside the function to ensure Sedona's Geometry UDTs have been registered
def test_constructor_pandas_on_spark(self):
for obj in [
ps.DataFrame([Point(x, x) for x in range(3)]),
ps.Series([Point(x, x) for x in range(3)]),
GeoSeries([Point(x, x) for x in range(3)]),
GeoDataFrame([Point(x, x) for x in range(3)]),
]:
sgpd_df = GeoDataFrame(obj)
check_geodataframe(sgpd_df)
@pytest.mark.parametrize(
"obj",
[
[0, 1, 2],
["x", "y", "z"],
{"a": [0, 1, 2], 1: [4, 5, 6]},
{"a": ["x", "y", "z"], 1: ["a", "b", "c"]},
pd.Series([0, 1, 2]),
pd.Series(["x", "y", "z"]),
pd.DataFrame({"x": ["x", "y", "z"]}),
gpd.GeoDataFrame({"x": [0, 1, 2]}),
ps.DataFrame({"x": ["x", "y", "z"]}),
],
)
def test_non_geometry(self, obj):
pd_df = pd.DataFrame(obj)
# pd.DataFrame(obj) doesn't work correctly for pandas on spark DataFrame type, so we use to_pandas() method instead.
if isinstance(obj, ps.DataFrame):
pd_df = obj.to_pandas()
sgpd_df = sgpd.GeoDataFrame(obj)
assert_frame_equal(pd_df, sgpd_df.to_pandas())
def test_to_geopandas(self):
from geopandas.testing import assert_geodataframe_equal
data = {"geometry": [Point(x, x) for x in range(3)], "id": [1, 2, 3]}
index = [1, 2, 3]
crs = "EPSG:3857"
# TODO: try to optimize this away
with ps.option_context("compute.ops_on_diff_frames", True):
result = GeoDataFrame(data, index=index, crs=crs).to_geopandas()
gpd_df = gpd.GeoDataFrame(data, index=index, crs=crs)
assert_geodataframe_equal(result, gpd_df)
def test_to_spark_pandas(self):
data = {"geometry": [Point(x, x) for x in range(3)], "id": [1, 2, 3]}
index = [1, 2, 3]
result = GeoDataFrame(data, index=index).to_spark_pandas()
ps_df = ps.DataFrame(data, index=index)
assert_frame_equal(result.to_pandas(), ps_df.to_pandas())
def test_getitem(self):
geoms = [Point(x, x) for x in range(3)]
ids = [1, 2, 3]
values = ["a", "b", "c"]
crs = "EPSG:3857"
with ps.option_context("compute.ops_on_diff_frames", True):
df = GeoDataFrame({"geometry": geoms, "id": ids, "value": values}, crs=crs)
# get a single non-geometry series
result = df["id"]
expected = pd.Series(ids, name="id")
self.check_pd_series_equal(result, expected)
# get a single geometry series
result = df["geometry"]
expected = gpd.GeoSeries(geoms, name="geometry", crs=crs)
self.check_sgpd_equals_gpd(result, expected)
# get multiple columns
result = df[["id", "value"]]
# no crs because no geometry column
expected = gpd.GeoDataFrame({"id": ids, "value": values})
self.check_sgpd_df_equals_gpd_df(result, expected)
# get numerical slice
result = df[:2]
expected = gpd.GeoDataFrame(
{"geometry": geoms[:2], "id": ids[:2], "value": values[:2]}, crs=crs
)
self.check_sgpd_df_equals_gpd_df(result, expected)
def test_plot(self):
# Just make sure it doesn't error
df = GeoDataFrame(
{
"value1": ["a", "b", "c"],
"geometry": [
Point(0, 0),
Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]),
LineString([(0, 0), (1, 1)]),
],
"value2": [1, 2, 3],
}
)
df.plot()
def test_psdf(self):
# this is to make sure the spark session works with pandas on spark api
psdf = ps.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6],
"b": [100, 200, 300, 400, 500, 600],
"c": ["one", "two", "three", "four", "five", "six"],
},
index=[10, 20, 30, 40, 50, 60],
)
assert psdf.count().count() == 3
def test_type_single_geometry_column(self):
# Create a GeoDataFrame with a single geometry column and additional attributes
points = [Point(x, x) for x in range(3)]
data = {"geometry1": points, "id": [1, 2, 3], "value": ["a", "b", "c"]}
df = GeoDataFrame(data)
# Verify the GeoDataFrame type
assert type(df) is GeoDataFrame
# Check the underlying Spark DataFrame schema
schema = df._internal.spark_frame.schema
# Assert the geometry column has the correct type and is not nullable
geometry_field = schema["geometry1"]
assert (
geometry_field.dataType.typeName() == "geometrytype"
or geometry_field.dataType.typeName() == "binary"
)
assert not geometry_field.nullable
# Assert non-geometry columns are present with correct types
assert schema["id"].dataType.typeName().startswith("long")
assert schema["value"].dataType.typeName().startswith("string")
# Verify number of columns
assert len(schema.fields) == 5
def test_type_multiple_geometry_columns(self):
# Create points for two geometry columns
points1 = [Point(x, x) for x in range(3)]
points2 = [Point(x + 5, x + 5) for x in range(3)]
# Create a dictionary with two geometry columns
data = {"geometry1": points1, "geometry2": points2, "attribute": [1, 2, 3]}
df = GeoDataFrame(data)
assert type(df) is GeoDataFrame
schema = df._internal.spark_frame.schema
# Assert both geometry columns have the correct type
geometry_field1 = schema["geometry1"]
assert (
geometry_field1.dataType.typeName() == "geometrytype"
or geometry_field1.dataType.typeName() == "binary"
)
assert not geometry_field1.nullable
geometry_field2 = schema["geometry2"]
assert (
geometry_field2.dataType.typeName() == "geometrytype"
or geometry_field2.dataType.typeName() == "binary"
)
assert not geometry_field2.nullable
# Check non-geometry column
attribute_field = schema["attribute"]
assert (
attribute_field.dataType.typeName() != "geometrytype"
and attribute_field.dataType.typeName() != "binary"
)
def test_copy(self):
df = GeoDataFrame(
{"test": [Point(x, x) for x in range(3)]}, index=[1, 2, 3], geometry="test"
)
result = df.copy()
self.check_sgpd_df_equals_gpd_df(result, df.to_geopandas())
self.check_sgpd_df_equals_gpd_df(df, result.to_geopandas())
def test_set_crs(self):
sgpd_df = sgpd.GeoDataFrame({"geometry": [Point(0, 0), Point(1, 1)]})
with ps.option_context("compute.ops_on_diff_frames", True):
sgpd_df.crs = 4326
assert sgpd_df.crs.to_epsg() == 4326
with ps.option_context("compute.ops_on_diff_frames", True):
sgpd_df.set_crs(3857, inplace=True, allow_override=True)
assert sgpd_df.crs.to_epsg() == 3857
with ps.option_context("compute.ops_on_diff_frames", True):
sgpd_df = sgpd_df.set_crs(None, allow_override=True)
assert isinstance(sgpd_df, GeoDataFrame)
assert sgpd_df.crs is None
with ps.option_context("compute.ops_on_diff_frames", True):
result = sgpd_df.set_crs(4326, allow_override=True)
assert result.crs.to_epsg() == 4326
assert isinstance(result, GeoDataFrame)
# Ensure set_crs without inplace modifies a copy and not current df
assert sgpd_df.crs is None
def test_to_crs(self):
from pyproj import CRS
with ps.option_context("compute.ops_on_diff_frames", True):
gdf = sgpd.GeoDataFrame(
{"geometry": [Point(1, 1), Point(2, 2), Point(3, 3)]}, crs=4326
)
assert isinstance(gdf.crs, CRS) and gdf.crs.to_epsg() == 4326
with ps.option_context("compute.ops_on_diff_frames", True):
result = gdf.to_crs(3857)
assert isinstance(result.crs, CRS) and result.crs.to_epsg() == 3857
# Ensure original df is not modified
assert gdf.crs.to_epsg() == 4326
expected = gpd.GeoSeries(
[
Point(111319.49079327356, 111325.14286638486),
Point(222638.98158654712, 222684.20850554455),
Point(333958.4723798207, 334111.1714019597),
],
name="geometry",
crs=3857,
)
self.check_sgpd_equals_gpd(result.geometry, expected)
def test_set_geometry(self):
from sedona.spark.geopandas.geodataframe import MissingGeometryColumnError
points1 = [Point(x, x) for x in range(3)]
points2 = [Point(x + 5, x + 5) for x in range(3)]
data = {"geometry1": points1, "geometry2": points2, "attribute": [1, 2, 3]}
sgpd_df = sgpd.GeoDataFrame(data)
# No geometry column set yet
with pytest.raises(MissingGeometryColumnError):
_ = sgpd_df.geometry
sgpd_df.set_geometry("geometry1", inplace=True)
assert sgpd_df.geometry.name == "geometry1"
result = sgpd_df.set_geometry("geometry2")
assert result.geometry.name == "geometry2"
# Ensure original df is not modified
assert sgpd_df.geometry.name == "geometry1"
# Test the actual values of the geometry column equal for an area calculation
self.check_pd_series_equal(result.area, sgpd_df["geometry2"].area.to_pandas())
# unknown column
with pytest.raises(ValueError):
sgpd_df.set_geometry("nonexistent-column")
geom = GeoSeries(
[Point(x, y) for x, y in zip(range(5), range(5))], name="geometry2"
)
# new crs - setting should default to GeoSeries' crs
gs = GeoSeries(geom, crs="epsg:3857")
with ps.option_context("compute.ops_on_diff_frames", True):
new_df = sgpd_df.set_geometry(gs)
assert new_df.crs == "epsg:3857"
# explicit crs overrides self and dataframe
with ps.option_context("compute.ops_on_diff_frames", True):
new_df = sgpd_df.set_geometry(gs, crs="epsg:26909")
assert new_df.crs == "epsg:26909"
assert new_df.geometry.crs == "epsg:26909"
# Series should use dataframe's crs
with ps.option_context("compute.ops_on_diff_frames", True):
new_df = sgpd_df.set_geometry(geom.values)
assert new_df.crs == sgpd_df.crs
assert new_df.geometry.crs == sgpd_df.crs
def test_set_geometry_crs(self):
df = GeoDataFrame({"geometry1": [Point(0, 0)]})
with ps.option_context("compute.ops_on_diff_frames", True):
df.set_geometry("geometry1", crs="EPSG:3857", inplace=True)
assert df.crs == "EPSG:3857"
assert df.geometry.crs == "EPSG:3857"
with ps.option_context("compute.ops_on_diff_frames", True):
df = GeoDataFrame(
{"geometry1": [Point(0, 0)]}, geometry="geometry1", crs="EPSG:3857"
)
assert df.crs == "EPSG:3857"
assert df.geometry.crs == "EPSG:3857"
def test_active_geometry_name(self):
if parse_version(gpd.__version__) < parse_version("1.0.0"):
return
points1 = [Point(x, x) for x in range(3)]
points2 = [Point(x + 5, x + 5) for x in range(3)]
data = {"geometry1": points1, "geometry2": points2, "attribute": [1, 2, 3]}
df = GeoDataFrame(data)
df = df.set_geometry("geometry1")
assert df.geometry.name == df.active_geometry_name == "geometry1"
df.set_geometry("geometry2", inplace=True)
assert df.geometry.name == df.active_geometry_name == "geometry2"
def test_rename_geometry(self):
points1 = [Point(x, x) for x in range(3)]
points2 = [Point(x + 5, x + 5) for x in range(3)]
data = {"geometry1": points1, "geometry2": points2, "attribute": [1, 2, 3]}
df = GeoDataFrame(data)
df = df.set_geometry("geometry1")
assert df.geometry.name == "geometry1"
df = df.rename_geometry("geometry3")
assert df.geometry.name == "geometry3"
# test inplace rename
df.rename_geometry("geometry4", inplace=True)
assert df.geometry.name == "geometry4"
def test_area(self):
# Create a GeoDataFrame with polygons to test area calculation
from shapely.geometry import Polygon
# Create polygons with known areas (1.0 and 4.0 square units)
poly1 = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) # 1 square unit
poly2 = Polygon([(0, 0), (2, 0), (2, 2), (0, 2)]) # 4 square units
data = {"geometry1": [poly1, poly2], "id": [1, 2], "value": ["a", "b"]}
df = GeoDataFrame(data)
df.set_geometry("geometry1", inplace=True)
area_series = df.area
assert type(area_series) is ps.Series
# Check the actual area values
area_values = area_series.to_list()
assert len(area_series) == 2
self.assert_almost_equal(area_values[0], 1.0)
self.assert_almost_equal(area_values[1], 4.0)
def test_buffer(self):
point = Point(0, 0)
square = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)])
data = {"geometry": [point, square], "id": [1, 2], "value": ["a", "b"]}
df = GeoDataFrame(data)
result = df.buffer(0.5)
assert type(result) is GeoSeries
# Calculate areas to verify buffer was applied correctly
# Point buffer with radius 0.5 should have area approximately π * 0.5² ≈ 0.785
# Square buffer with radius 0.5 should expand the 1x1 square to 2x2 square with rounded corners
# Check that square buffer area is greater than original (1.0)
assert result.area[1] > 1.0
def test_to_parquet(self):
pass
def test_from_arrow(self):
if parse_version(gpd.__version__) < parse_version("1.0.0"):
return
import pyarrow as pa
table = pa.table({"a": [0, 1, 2], "b": [0.1, 0.2, 0.3]})
with pytest.raises(ValueError, match="No geometry column found"):
GeoDataFrame.from_arrow(table)
gdf = gpd.GeoDataFrame(
{
"col": [1, 2, 3, 4],
"geometry": [
LineString([(0, 0), (1, 1)]),
box(0, 0, 10, 10),
Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]),
Point(1, 1),
],
}
)
result = GeoDataFrame.from_arrow(gdf.to_arrow())
self.check_sgpd_df_equals_gpd_df(result, gdf)
gdf = gpd.GeoDataFrame(
{
"col": ["a", "b", "c", "d"],
"geometry": [
Point(1, 1),
Polygon(),
LineString([(0, 0), (1, 1)]),
None,
],
}
)
result = GeoDataFrame.from_arrow(gdf.to_arrow())
self.check_sgpd_df_equals_gpd_df(result, gdf)
def test_to_json(self):
import json
d = {"col1": ["name1", "name2"], "geometry": [Point(1, 2), Point(2, 1)]}
# Currently, adding the crs information later requires us to join across partitions
with ps.option_context("compute.ops_on_diff_frames", True):
gdf = GeoDataFrame(d, crs="EPSG:3857")
result = gdf.to_json()
obj = json.loads(result)
assert obj["type"] == "FeatureCollection"
assert obj["features"][0]["geometry"]["type"] == "Point"
assert obj["features"][0]["geometry"]["coordinates"] == [1.0, 2.0]
assert obj["features"][1]["geometry"]["type"] == "Point"
assert obj["features"][1]["geometry"]["coordinates"] == [2.0, 1.0]
assert obj["crs"]["type"] == "name"
assert obj["crs"]["properties"]["name"] == "urn:ogc:def:crs:EPSG::3857"
expected = '{"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", \
"properties": {"col1": "name1"}, "geometry": {"type": "Point", "coordinates": [1.0,\
2.0]}}, {"id": "1", "type": "Feature", "properties": {"col1": "name2"}, "geometry"\
: {"type": "Point", "coordinates": [2.0, 1.0]}}], "crs": {"type": "name", "properti\
es": {"name": "urn:ogc:def:crs:EPSG::3857"}}}'
assert result == expected, f"Expected {expected}, but got {result}"
def test_to_arrow(self):
if parse_version(gpd.__version__) < parse_version("1.0.0"):
return
import pyarrow as pa
from geopandas.testing import assert_geodataframe_equal
data = {"col1": ["name1", "name2"], "geometry": [Point(1, 2), Point(2, 1)]}
# Ensure index is not preserved for index=False
sgpd_df = GeoDataFrame(data, index=pd.Index([1, 2]))
result = pa.table(sgpd_df.to_arrow(index=False))
expected = gpd.GeoDataFrame(data)
# Ensure we can read it from using geopandas
gpd_df = gpd.GeoDataFrame.from_arrow(result)
assert_geodataframe_equal(gpd_df, expected)
# Ensure we can read it using sedona geopandas
sgpd_df = GeoDataFrame.from_arrow(result)
self.check_sgpd_df_equals_gpd_df(sgpd_df, expected)
# Ensure index is preserved for index=True
sgpd_df = GeoDataFrame(data, index=pd.Index([1, 2]))
result = pa.table(sgpd_df.to_arrow(index=True))
expected = gpd.GeoDataFrame(data, pd.Index([1, 2]))
gpd_df = gpd.GeoDataFrame.from_arrow(result)
assert_geodataframe_equal(gpd_df, expected)
# -----------------------------------------------------------------------------
# # Utils
# -----------------------------------------------------------------------------
def check_geodataframe(df):
assert isinstance(df, GeoDataFrame)