blob: b60963577b5ed45b4f172c27b26399ad75b902bd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import geoarrow.pyarrow as ga
import geoarrow.types as gat
import geopandas.testing
import pandas as pd
from pathlib import Path
import pyarrow as pa
import pytest
import sedonadb
import tempfile
def test_dataframe_from_dataframe(con):
# DataFrame from DataFrame on the same context with no schema
# should be just a Python reference
df = con.sql("SELECT ST_Point(0, 1) as geom")
assert con.create_data_frame(df) is df
# On a separate context the table should still be collected the same
# but should be a separate Python reference. This also has the effect
# of testing the __datafusion_table_provider__ interface.
new_con = sedonadb.connect()
new_df = new_con.create_data_frame(df)
assert new_df is not df
pd.testing.assert_frame_equal(df.to_pandas(), new_df.to_pandas())
def test_dataframe_from_table(con):
tab = pa.table(
{
"not_geom": [1, 2, 3],
"geom": ga.array(["POINT (0 1)", "POINT (2 3)", "POINT (4 5)"]),
}
)
df = con.create_data_frame(tab)
# Ensure that we can collect once
assert df.to_arrow_table() == tab
# ...and ensure we can collect again
assert df.to_arrow_table() == tab
def test_dataframe_from_pandas(con):
pd_df = pd.DataFrame({"col1": [1, 2, 3]})
df = con.create_data_frame(pd_df)
# Ensure that we can collect once
pd.testing.assert_frame_equal(df.to_pandas(), pd_df)
# ...and ensure we can collect again
pd.testing.assert_frame_equal(df.to_pandas(), pd_df)
def test_dataframe_from_geopandas(con):
gpd_df = geopandas.GeoDataFrame(
{"geometry": geopandas.GeoSeries.from_wkt(["POINT (0 1)"], crs="OGC:CRS84")}
)
df = con.create_data_frame(gpd_df)
# Ensure that we can collect once
geopandas.testing.assert_geodataframe_equal(df.to_pandas(), gpd_df)
# ...and ensure we can collect again
geopandas.testing.assert_geodataframe_equal(df.to_pandas(), gpd_df)
def test_dataframe_from_polars(con):
pl = pytest.importorskip("polars")
pl_df = pl.DataFrame({"col1": [1, 2, 3]})
df = con.create_data_frame(pl_df)
# Ensure that we can collect once
pd.testing.assert_frame_equal(df.to_pandas(), pd.DataFrame({"col1": [1, 2, 3]}))
# ...and ensure we can collect again
pd.testing.assert_frame_equal(df.to_pandas(), pd.DataFrame({"col1": [1, 2, 3]}))
def test_dataframe_from_array_stream(con):
tab = pa.table(
{
"not_geom": [1, 2, 3],
"geom": ga.array(["POINT (0 1)", "POINT (2 3)", "POINT (4 5)"]),
}
)
# Ensure we're working from a stateful reader that can only be consumed
# exactly once
one_way_stream = pa.RecordBatchReader.from_stream(tab)
df = con.create_data_frame(one_way_stream)
assert pa.schema(df) == tab.schema
# Ensure that we can collect once
assert df.to_arrow_table() == tab
# Ensure that we exhausted the reader
assert list(one_way_stream) == []
with pytest.raises(
sedonadb._lib.SedonaError,
match="Can't scan RecordBatchReader provider more than once.",
):
# Ensure we can't collect again
df.to_arrow_table()
def test_schema(con):
df = con.sql("SELECT 1 as one, ST_GeomFromText('POINT (0 1)') as geom")
# Non-geometry field accessor
assert df.schema.field(0).name == "one"
assert df.schema.field("one").name == "one"
assert repr(df.schema.field(0).type) == "SedonaType int64<Int64>"
assert df.schema.field(0).type.edge_type is None
assert df.schema.field(0).type.crs is None
# Geometry field accessor
assert df.schema.field(1).name == "geom"
assert df.schema.field("geom").name == "geom"
assert repr(df.schema.field(1).type) == "SedonaType geometry<Wkb>"
assert df.schema.field(1).type.edge_type == gat.EdgeType.PLANAR
assert df.schema.field(1).type.crs is None
# Arrow export
assert pa.schema(df.schema) == pa.schema(
[pa.field("one", pa.int64(), False), pa.field("geom", ga.wkb())]
)
assert pa.field(df.schema.field(0)) == pa.field("one", pa.int64(), False)
assert pa.field(df.schema.field(0).type) == pa.field("", pa.int64(), True)
with pytest.raises(IndexError):
df.schema.field(100)
with pytest.raises(KeyError):
df.schema.field("foofy")
with pytest.raises(TypeError):
df.schema.field({})
def test_schema_non_null_crs(con):
tab = pa.table({"geom": ga.with_crs(ga.as_wkb(["POINT (0 1)"]), gat.OGC_CRS84)})
df = con.create_data_frame(tab)
assert df.schema.field("geom").type.crs == gat.OGC_CRS84
def test_to_memtable(con):
df = con.sql("SELECT 1 as one")
pd.testing.assert_frame_equal(df.to_memtable().to_pandas(), df.to_pandas())
def test_to_view(con):
try:
df = con.sql("SELECT 1 as one")
df.to_view("foofy")
pd.testing.assert_frame_equal(
con.sql("SELECT * FROM foofy").to_pandas(), df.to_pandas()
)
new_df = con.sql("SELECT 2 as two")
with pytest.raises(
sedonadb._lib.SedonaError, match="The table foofy already exist"
):
new_df.to_view("foofy")
new_df.to_view("foofy", overwrite=True)
pd.testing.assert_frame_equal(
con.sql("SELECT * FROM foofy").to_pandas(), new_df.to_pandas()
)
finally:
con.drop_view("foofy")
def test_head_limit(con):
df = con.sql("SELECT * FROM (VALUES ('one'), ('two'), ('three')) AS t(val)")
pd.testing.assert_frame_equal(
df.head(1).to_pandas(), pd.DataFrame({"val": ["one"]})
)
pd.testing.assert_frame_equal(
df.limit(1).to_pandas(), pd.DataFrame({"val": ["one"]})
)
pd.testing.assert_frame_equal(
df.limit(1, offset=2).to_pandas(), pd.DataFrame({"val": ["three"]})
)
pd.testing.assert_frame_equal(
df.limit(None, offset=1).to_pandas(), pd.DataFrame({"val": ["two", "three"]})
)
def test_execute(con):
df = con.sql("SELECT * FROM (VALUES ('one'), ('two'), ('three')) AS t(val)")
assert df.execute() == 3
df = con.sql("CREATE OR REPLACE VIEW temp_view AS SELECT 1 as one")
assert df.execute() == 0
assert con.view("temp_view").count() == 1
con.drop_view("temp_view")
def test_count(con):
df = con.sql("SELECT * FROM (VALUES ('one'), ('two'), ('three')) AS t(val)")
assert df.count() == 3
def test_dataframe_to_arrow(con):
df = con.sql("SELECT 1 as one, ST_GeomFromWKT('POINT (0 1)') as geom")
expected_schema = pa.schema(
[pa.field("one", pa.int64(), nullable=False), pa.field("geom", ga.wkb())]
)
assert pa.schema(df) == expected_schema
assert (
df.to_arrow_table().columns
== pa.table(
{"one": [1], "geom": ga.as_wkb(["POINT (0 1)"])}, schema=expected_schema
).columns
)
# Make sure we can request a schema if the schema is identical
assert (
df.to_arrow_table(schema=expected_schema).columns == df.to_arrow_table().columns
)
# ...but not otherwise (yet)
with pytest.raises(
sedonadb._lib.SedonaError,
match="Requested schema != DataFrame schema not yet supported",
):
df.to_arrow_table(schema=pa.schema({}))
def test_dataframe_to_pandas(con):
# Check with a geometry column
df_with_geo = con.sql("SELECT 1 as one, ST_GeomFromWKT('POINT (0 1)') as geom")
geopandas.testing.assert_geodataframe_equal(
df_with_geo.to_pandas(),
geopandas.GeoDataFrame(
{"one": [1], "geom": geopandas.GeoSeries.from_wkt(["POINT (0 1)"])}
).set_geometry("geom"),
)
# Check with more than one geometry column
df_with_multi_geo = con.sql(
"SELECT ST_GeomFromWKT('POINT (0 1)') as geom1, ST_GeomFromWKT('POINT (2 3)') as geom2"
)
geodf_with_multi_geo = geopandas.GeoDataFrame(
{
"geom1": geopandas.GeoSeries.from_wkt(["POINT (0 1)"]),
"geom2": geopandas.GeoSeries.from_wkt(["POINT (2 3)"]),
}
)
geopandas.testing.assert_geodataframe_equal(
df_with_multi_geo.to_pandas(geometry="geom1"),
geodf_with_multi_geo.set_geometry("geom1"),
)
geopandas.testing.assert_geodataframe_equal(
df_with_multi_geo.to_pandas(geometry="geom2"),
geodf_with_multi_geo.set_geometry("geom2"),
)
# Check without geometry column
df_without_geo = con.sql("SELECT 1 as one")
pd.testing.assert_frame_equal(
df_without_geo.to_pandas(), pd.DataFrame({"one": [1]})
)
def test_dataframe_to_parquet(con):
df = con.sql(
"SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)"
)
with tempfile.TemporaryDirectory() as td:
# Defaults with a path that ends with .parquet (single file)
tmp_parquet_file = Path(td) / "tmp.parquet"
df.to_parquet(tmp_parquet_file)
assert tmp_parquet_file.exists()
assert tmp_parquet_file.is_file()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_file),
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
)
# Defaults with a path that doesn't end in .parquet (directory)
tmp_parquet_dir = Path(td) / "tmp"
df.to_parquet(tmp_parquet_dir)
assert tmp_parquet_dir.exists()
assert tmp_parquet_dir.is_dir()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_dir),
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
)
# With partition_by
tmp_parquet_dir = Path(td) / "tmp_partitioned"
df.to_parquet(tmp_parquet_dir, partition_by=["a"])
assert tmp_parquet_dir.exists()
assert tmp_parquet_dir.is_dir()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_dir).sort_values("b").reset_index(drop=True),
pd.DataFrame(
{"b": [1, 2, 3], "a": pd.Categorical(["one", "two", "three"])}
),
)
# With order_by
tmp_parquet = Path(td) / "tmp_ordered.parquet"
df.to_parquet(tmp_parquet, sort_by=["a"])
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet),
pd.DataFrame({"a": ["one", "three", "two"], "b": [1, 3, 2]}),
)
def test_show(con, capsys):
con.sql("SELECT 1 as one").show()
expected = """
┌───────┐
│ one │
│ int64 │
╞═══════╡
│ 1 │
└───────┘
""".strip()
assert capsys.readouterr().out.strip() == expected
con.sql("SELECT 1 as one").show(ascii=True)
expected = """
+-------+
| one |
| int64 |
+-------+
| 1 |
+-------+
""".strip()
assert capsys.readouterr().out.strip() == expected
# Make sure width parameter can be specified
con.sql("SELECT 123456789 as col1, 2 as a_very_long_column_name").show(width=10)
expected = """
┌───────────┬───┐
│ col1 ┆ … │
│ int64 ┆ │
╞═══════════╪═══╡
│ 123456789 ┆ … │
└───────────┴───┘
""".strip()
assert capsys.readouterr().out.strip() == expected
def test_show_explained(con, capsys):
con.sql("EXPLAIN SELECT 1 as one").show()
expected = """
┌───────────────┬─────────────────────────────────┐
│ plan_type ┆ plan │
│ utf8 ┆ utf8 │
╞═══════════════╪═════════════════════════════════╡
│ logical_plan ┆ Projection: Int64(1) AS one │
│ ┆ EmptyRelation │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ physical_plan ┆ ProjectionExec: expr=[1 as one] │
│ ┆ PlaceholderRowExec │
│ ┆ │
└───────────────┴─────────────────────────────────┘
""".strip()
assert capsys.readouterr().out.strip() == expected
def test_explain(con, capsys):
con.sql("SELECT 1 as one").explain().show()
expected = """
┌───────────────┬─────────────────────────────────┐
│ plan_type ┆ plan │
│ utf8 ┆ utf8 │
╞═══════════════╪═════════════════════════════════╡
│ logical_plan ┆ Projection: Int64(1) AS one │
│ ┆ EmptyRelation │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ physical_plan ┆ ProjectionExec: expr=[1 as one] │
│ ┆ PlaceholderRowExec │
│ ┆ │
└───────────────┴─────────────────────────────────┘
""".strip()
assert capsys.readouterr().out.strip() == expected
con.sql("SELECT 1 as one").explain(format="tree").show()
expected = """
┌───────────────┬───────────────────────────────┐
│ plan_type ┆ plan │
│ utf8 ┆ utf8 │
╞═══════════════╪═══════════════════════════════╡
│ physical_plan ┆ ┌───────────────────────────┐ │
│ ┆ │ ProjectionExec │ │
│ ┆ │ -------------------- │ │
│ ┆ │ one: 1 │ │
│ ┆ └─────────────┬─────────────┘ │
│ ┆ ┌─────────────┴─────────────┐ │
│ ┆ │ PlaceholderRowExec │ │
│ ┆ └───────────────────────────┘ │
│ ┆ │
└───────────────┴───────────────────────────────┘
""".strip()
assert capsys.readouterr().out.strip() == expected
query_plan = con.sql("SELECT 1 as one").explain(type="analyze").to_pandas()
assert query_plan.iloc[0, 0] == "Plan with Metrics"
query_plan = con.sql("SELECT 1 as one").explain(type="extended").to_pandas()
assert query_plan.iloc[0, 0] == "initial_logical_plan"
assert len(query_plan) > 10
def test_repr(con):
assert repr(con.sql("SELECT 1 as one")).startswith(
"<sedonadb.dataframe.DataFrame object"
)
try:
con.options.interactive = True
repr_interactive = repr(con.sql("SELECT 1 as one"))
expected = """
┌───────┐
│ one │
│ int64 │
╞═══════╡
│ 1 │
└───────┘
""".strip()
assert repr_interactive == expected
finally:
con.options.interactive = False