| #!/usr/bin/env python3 |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import inspect |
| import re |
| import sys |
| |
| |
| class SpatialBenchBenchmark: |
| """A benchmark for the performance of analytical spatial queries on a spatial dataset. |
| |
| These queries are written in the Sedona/Spark SQL dialect. Because spatial functions are not as standardized as |
| other analytical functions, many engines needs specific implementations of a couple of these queries where dialects |
| vary slightly. |
| |
| To deal with these differences, other engine-specific implementations of this benchmark subclass this class and |
| override only the queries that need to be changed. |
| |
| """ |
| |
| def queries(self) -> dict[str, str]: |
| """ |
| Collects all methods of the subclass whose names start with 'q' followed by a number and have no arguments (other than self), |
| and returns them as a dictionary of query functions, partially applied with the current instance. |
| |
| Returns: |
| Dict[str, str]: A dictionary mapping query names to their corresponding functions. |
| """ |
| |
| queries = {} |
| for name, method in inspect.getmembers( |
| self.__class__, predicate=inspect.isfunction |
| ): |
| if re.fullmatch(r"q\d+", name): |
| sig = inspect.signature(method) |
| if len(sig.parameters) == 0: |
| queries[name] = method() |
| else: |
| raise ValueError("Query methods must not take any arguments") |
| |
| # Sort queries numerically by extracting the number from the query name |
| sorted_queries = dict(sorted(queries.items(), key=lambda x: int(x[0][1:]))) |
| |
| return sorted_queries |
| |
| def dialect(self) -> str: |
| """Return the dialect of the benchmark.""" |
| return "SedonaSpark" |
| |
| @staticmethod |
| def q1() -> str: |
| return """ |
| -- Q1: Find trips starting within 50km of Sedona city center, ordered by distance |
| SELECT |
| t.t_tripkey, ST_X(ST_GeomFromWKB(t.t_pickuploc)) AS pickup_lon, ST_Y(ST_GeomFromWKB(t.t_pickuploc)) AS pickup_lat, t.t_pickuptime, |
| ST_Distance(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromText('POINT (-111.7610 34.8697)')) AS distance_to_center |
| FROM trip t |
| WHERE ST_DWithin(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromText('POINT (-111.7610 34.8697)'), 0.45) -- 50km radius around Sedona center |
| ORDER BY distance_to_center ASC, t.t_tripkey ASC |
| """ |
| |
| @staticmethod |
| def q2() -> str: |
| return """ |
| -- Q2: Count trips starting within Coconino County (Arizona) zone |
| SELECT COUNT(*) AS trip_count_in_coconino_county |
| FROM trip t |
| WHERE ST_Intersects(ST_GeomFromWKB(t.t_pickuploc), (SELECT ST_GeomFromWKB(z.z_boundary) FROM zone z WHERE z.z_name = 'Coconino County' LIMIT 1)) |
| """ |
| |
| @staticmethod |
| def q3() -> str: |
| return """ |
| -- Q3: Monthly trip statistics within 15km radius of Sedona city center (10km base + 5km buffer) |
| SELECT |
| DATE_TRUNC('month', t.t_pickuptime) AS pickup_month, COUNT(t.t_tripkey) AS total_trips, |
| AVG(t.t_distance) AS avg_distance, AVG(t.t_dropofftime - t.t_pickuptime) AS avg_duration, |
| AVG(t.t_fare) AS avg_fare |
| FROM trip t |
| WHERE ST_DWithin( |
| ST_GeomFromWKB(t.t_pickuploc), |
| ST_GeomFromText('POLYGON((-111.9060 34.7347, -111.6160 34.7347, -111.6160 35.0047, -111.9060 35.0047, -111.9060 34.7347))'), -- 10km bounding box around Sedona |
| 0.045 -- Additional 5km buffer |
| ) |
| GROUP BY pickup_month |
| ORDER BY pickup_month |
| """ |
| |
| @staticmethod |
| def q4() -> str: |
| return """ |
| -- Q4: Zone distribution of top 1000 trips by tip amount |
| SELECT z.z_zonekey, z.z_name, COUNT(*) AS trip_count |
| FROM |
| zone z |
| JOIN ( |
| SELECT t.t_pickuploc |
| FROM trip t |
| ORDER BY t.t_tip DESC, t.t_tripkey ASC |
| LIMIT 1000 -- Replace 1000 with x (how many top tips you want) |
| ) top_trips ON ST_Within(ST_GeomFromWKB(top_trips.t_pickuploc), ST_GeomFromWKB(z.z_boundary)) |
| GROUP BY z.z_zonekey, z.z_name |
| ORDER BY trip_count DESC, z.z_zonekey ASC |
| """ |
| |
| @staticmethod |
| def q5() -> str: |
| return """ |
| -- Q5: Monthly travel patterns for repeat customers (convex hull of dropoff locations) |
| SELECT |
| c.c_custkey, c.c_name AS customer_name, |
| DATE_TRUNC('month', t.t_pickuptime) AS pickup_month, |
| ST_Area(ST_ConvexHull(ST_Collect(ARRAY_AGG(ST_GeomFromWKB(t.t_dropoffloc))))) AS monthly_travel_hull_area, |
| COUNT(*) as dropoff_count |
| FROM trip t JOIN customer c ON t.t_custkey = c.c_custkey |
| GROUP BY c.c_custkey, c.c_name, pickup_month |
| HAVING dropoff_count > 5 -- Only include repeat customers for meaningful hulls |
| ORDER BY dropoff_count DESC, c.c_custkey ASC |
| """ |
| |
| @staticmethod |
| def q6() -> str: |
| return """ |
| -- Q6: Zone statistics for trips within 50km radius of Sedona city center |
| SELECT |
| z.z_zonekey, z.z_name, |
| COUNT(t.t_tripkey) AS total_pickups, AVG(t.t_totalamount) AS avg_distance, |
| AVG(t.t_dropofftime - t.t_pickuptime) AS avg_duration |
| FROM trip t, zone z |
| WHERE ST_Contains(ST_GeomFromText('POLYGON((-112.2110 34.4197, -111.3110 34.4197, -111.3110 35.3197, -112.2110 35.3197, -112.2110 34.4197))'), ST_GeomFromWKB(z.z_boundary)) -- 50km bounding box around Sedona |
| AND ST_Within(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(z.z_boundary)) |
| GROUP BY z.z_zonekey, z.z_name |
| ORDER BY total_pickups DESC, z.z_zonekey ASC |
| """ |
| |
| @staticmethod |
| def q7() -> str: |
| return """ |
| -- Q7: Detect potential route detours by comparing reported vs. geometric distances |
| WITH trip_lengths AS ( |
| SELECT |
| t.t_tripkey, |
| t.t_distance AS reported_distance_m, |
| ST_Length( |
| ST_MakeLine( |
| ST_GeomFromWKB(t.t_pickuploc), |
| ST_GeomFromWKB(t.t_dropoffloc) |
| ) |
| ) / 0.000009 AS line_distance_m -- 1 meter = 0.000009 degree |
| FROM trip t |
| ) |
| SELECT |
| t.t_tripkey, |
| t.reported_distance_m, |
| t.line_distance_m, |
| t.reported_distance_m / NULLIF(t.line_distance_m, 0) AS detour_ratio |
| FROM trip_lengths t |
| ORDER BY detour_ratio DESC NULLS LAST, reported_distance_m DESC, t_tripkey ASC |
| """ |
| |
| @staticmethod |
| def q8() -> str: |
| return """ |
| -- Q8: Count nearby pickups for each building within 500m radius |
| SELECT b.b_buildingkey, b.b_name, COUNT(*) AS nearby_pickup_count |
| FROM trip t JOIN building b ON ST_DWithin(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(b.b_boundary), 0.0045) -- ~500m |
| GROUP BY b.b_buildingkey, b.b_name |
| ORDER BY nearby_pickup_count DESC, b.b_buildingkey ASC |
| """ |
| |
| @staticmethod |
| def q9() -> str: |
| return """ |
| -- Q9: Building Conflation (duplicate/overlap detection via IoU), deterministic order |
| WITH b1 AS ( |
| SELECT b_buildingkey AS id, ST_GeomFromWKB(b_boundary) AS geom |
| FROM building |
| ), |
| b2 AS ( |
| SELECT b_buildingkey AS id, ST_GeomFromWKB(b_boundary) AS geom |
| FROM building |
| ), |
| pairs AS ( |
| SELECT |
| b1.id AS building_1, |
| b2.id AS building_2, |
| ST_Area(b1.geom) AS area1, |
| ST_Area(b2.geom) AS area2, |
| ST_Area(ST_Intersection(b1.geom, b2.geom)) AS overlap_area |
| FROM b1 |
| JOIN b2 |
| ON b1.id < b2.id |
| AND ST_Intersects(b1.geom, b2.geom) |
| ) |
| SELECT |
| building_1, |
| building_2, |
| area1, |
| area2, |
| overlap_area, |
| CASE |
| WHEN overlap_area = 0 THEN 0.0 |
| WHEN (area1 + area2 - overlap_area) = 0 THEN 1.0 |
| ELSE overlap_area / (area1 + area2 - overlap_area) |
| END AS iou |
| FROM pairs |
| ORDER BY iou DESC, building_1 ASC, building_2 ASC |
| """ |
| |
| @staticmethod |
| def q10() -> str: |
| return """ |
| -- Q10: Zone statistics for trips starting within each zone |
| SELECT |
| z.z_zonekey, z.z_name AS pickup_zone, AVG(t.t_dropofftime - t.t_pickuptime) AS avg_duration, |
| AVG(t.t_distance) AS avg_distance, COUNT(t.t_tripkey) AS num_trips |
| FROM zone z LEFT JOIN trip t ON ST_Within(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(z.z_boundary)) |
| GROUP BY z.z_zonekey, z.z_name |
| ORDER BY avg_duration DESC NULLS LAST, z.z_zonekey ASC |
| """ |
| |
| @staticmethod |
| def q11() -> str: |
| return """ |
| -- Q11: Count trips that cross between different zones |
| SELECT COUNT(*) AS cross_zone_trip_count |
| FROM |
| trip t |
| JOIN zone pickup_zone ON ST_Within(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(pickup_zone.z_boundary)) |
| JOIN zone dropoff_zone ON ST_Within(ST_GeomFromWKB(t.t_dropoffloc), ST_GeomFromWKB(dropoff_zone.z_boundary)) |
| WHERE pickup_zone.z_zonekey != dropoff_zone.z_zonekey |
| """ |
| |
| @staticmethod |
| def q12() -> str: |
| # There is some odd bug with missing columns in EMR. Using CTEs to work around it. |
| return """ |
| -- Q12: Find 5 nearest buildings to each trip pickup location using KNN join |
| WITH trip_with_geom AS ( |
| SELECT t_tripkey, t_pickuploc, ST_GeomFromWKB(t_pickuploc) as pickup_geom |
| FROM trip |
| ), |
| building_with_geom AS ( |
| SELECT b_buildingkey, b_name, b_boundary, ST_GeomFromWKB(b_boundary) as boundary_geom |
| FROM building |
| ) |
| SELECT |
| t.t_tripkey, |
| t.t_pickuploc, |
| b.b_buildingkey, |
| b.b_name AS building_name, |
| ST_Distance(t.pickup_geom, b.boundary_geom) AS distance_to_building |
| FROM trip_with_geom t JOIN building_with_geom b |
| ON ST_KNN(t.pickup_geom, b.boundary_geom, 5, FALSE) |
| ORDER BY distance_to_building ASC, b.b_buildingkey ASC |
| """ |
| |
| |
| class DatabricksSpatialBenchBenchmark(SpatialBenchBenchmark): |
| """A Databricks-specific implementation of the SpatialBench benchmark. |
| |
| This class is used to run the SpatialBench benchmark using Databricks' spatial functions. It varies only as |
| needed from the base class. |
| |
| """ |
| |
| def dialect(self) -> str: |
| """Return the dialect of the benchmark.""" |
| return "Databricks" |
| |
| @staticmethod |
| def q5() -> str: |
| return """ |
| -- Q5 (Databricks): NO ST_Collect function, using ST_Union_Agg instead. This is more expensive, but should give the same results. |
| SELECT |
| c.c_custkey, c.c_name AS customer_name, |
| DATE_TRUNC('month', t.t_pickuptime) AS pickup_month, |
| ST_Area(ST_ConvexHull(ST_Union_Agg(ST_GeomFromWKB(t.t_dropoffloc)))) AS monthly_travel_hull_area, |
| COUNT(*) as dropoff_count |
| FROM trip t JOIN customer c ON t.t_custkey = c.c_custkey |
| GROUP BY c.c_custkey, c.c_name, pickup_month |
| HAVING dropoff_count > 5 -- Only include repeat customers for meaningful hulls |
| ORDER BY dropoff_count DESC, c.c_custkey ASC |
| """ |
| |
| @staticmethod |
| def q7() -> str: |
| return """ |
| -- Q7 (Databricks): ST_MakeLine takes an array of points rather than varargs |
| WITH trip_lengths AS ( |
| SELECT |
| t.t_tripkey, |
| t.t_distance AS reported_distance_m, |
| ST_Length( |
| ST_MakeLine( |
| Array( |
| ST_GeomFromWKB(t.t_pickuploc), |
| ST_GeomFromWKB(t.t_dropoffloc) |
| ) |
| ) |
| ) / 0.000009 AS line_distance_m -- 1 meter = 0.000009 degree |
| FROM trip t |
| ) |
| SELECT |
| t.t_tripkey, |
| t.reported_distance_m, |
| t.line_distance_m, |
| t.reported_distance_m / NULLIF(t.line_distance_m, 0) AS detour_ratio |
| FROM trip_lengths t |
| ORDER BY detour_ratio DESC NULLS LAST, reported_distance_m DESC, t_tripkey ASC |
| """ |
| |
| @staticmethod |
| def q12() -> str: |
| return """ |
| -- Q12 (Databricks): No KNN join, using cross join + ROW_NUMBER() window function instead. |
| -- Note: Databricks doesn't have cross join lateral support. |
| SELECT |
| t_tripkey, |
| t_pickuploc, |
| b_buildingkey, |
| building_name, |
| distance_to_building |
| FROM ( |
| SELECT |
| t.t_tripkey, |
| t.t_pickuploc, |
| b.b_buildingkey, |
| b.b_name AS building_name, |
| ST_Distance(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(b.b_boundary)) AS distance_to_building, |
| ROW_NUMBER() OVER ( |
| PARTITION BY t.t_tripkey |
| ORDER BY ST_Distance(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(b.b_boundary)) ASC |
| ) AS rn |
| FROM trip t |
| JOIN building b |
| ) AS ranked_buildings |
| WHERE rn <= 5 |
| ORDER BY distance_to_building ASC, b_buildingkey ASC |
| """ |
| |
| |
| class DuckDBSpatialBenchBenchmark(SpatialBenchBenchmark): |
| """A DuckDB-specific implementation of the SpatialBench benchmark. |
| |
| This class is used to run the SpatialBench benchmark using DuckDB's spatial extension. It varies only as |
| needed from the base class. |
| """ |
| |
| def dialect(self) -> str: |
| """Return the dialect of the benchmark.""" |
| return "DuckDB" |
| |
| @staticmethod |
| def q12() -> str: |
| return """ |
| -- Q12 (DuckDB): No KNN join, using cross join lateral instead. |
| SELECT |
| t.t_tripkey, |
| t.t_pickuploc, |
| nb.b_buildingkey, |
| nb.building_name, |
| nb.distance_to_building |
| FROM trip t |
| CROSS JOIN LATERAL ( |
| SELECT |
| b.b_buildingkey, |
| b.b_name AS building_name, |
| ST_Distance(ST_GeomFromWKB(t.t_pickuploc), ST_GeomFromWKB(b.b_boundary)) AS distance_to_building |
| FROM building b |
| ORDER BY distance_to_building |
| LIMIT 5 |
| ) AS nb |
| ORDER BY nb.distance_to_building, nb.b_buildingkey |
| """ |
| |
| |
| class SedonaDBSpatialBenchBenchmark(SpatialBenchBenchmark): |
| """A SedonaDB-specific implementation of the SpatialBench benchmark. |
| |
| This class is used to run the SpatialBench benchmark using SedonaDB's spatial functions. |
| It inherits from the SpatialBenchBenchmark class and uses SedonaDB's spatial functions. |
| |
| """ |
| |
| def dialect(self) -> str: |
| """Return the dialect of the benchmark.""" |
| return "SedonaDB" |
| |
| @staticmethod |
| def q5() -> str: |
| return """ |
| -- Q5 (SedonaDB): In SedonaDB ST_Collect is an aggregate function so no need to use ARRAY_AGG first. |
| -- ST_Collect does not accept an array as input so we cannot use the query with ARRAY_AGG. |
| SELECT |
| c.c_custkey, c.c_name AS customer_name, |
| DATE_TRUNC('month', t.t_pickuptime) AS pickup_month, |
| ST_Area(ST_ConvexHull(ST_Collect(ST_GeomFromWKB(t.t_dropoffloc)))) AS monthly_travel_hull_area, |
| COUNT(*) as dropoff_count |
| FROM trip t JOIN customer c ON t.t_custkey = c.c_custkey |
| GROUP BY c.c_custkey, c.c_name, pickup_month |
| HAVING dropoff_count > 5 -- Only include repeat customers for meaningful hulls |
| ORDER BY dropoff_count DESC, c.c_custkey ASC |
| """ |
| |
| |
| def main(): |
| query_classes = { |
| "SedonaSpark": SpatialBenchBenchmark, |
| "Databricks": DatabricksSpatialBenchBenchmark, |
| "DuckDB": DuckDBSpatialBenchBenchmark, |
| "SedonaDB": SedonaDBSpatialBenchBenchmark, |
| "Geopandas": None # Special case, we will catch this below |
| } |
| |
| if len(sys.argv) < 2: |
| print(f"Usage: {sys.argv[0]} <dialect>") |
| print(f"Available dialects: {', '.join(query_classes.keys())}") |
| sys.exit(1) |
| |
| dialect_arg = sys.argv[1] |
| |
| if dialect_arg == "Geopandas": |
| print("Geopandas does not support SQL queries directly. Please use the provided Python script geopandas.py.") |
| sys.exit(0) |
| |
| if dialect_arg not in query_classes: |
| print(f"Unknown dialect: {dialect_arg}") |
| print(f"Available dialects: {', '.join(query_classes.keys())}") |
| sys.exit(1) |
| |
| queries = query_classes[dialect_arg]().queries() |
| |
| for query in queries.values(): |
| print(query) |
| |
| |
| if __name__ == "__main__": |
| main() |