Sedona 的空间算子完全支持 Apache SparkSQL 的查询优化器。它具备以下查询优化特性:
!!! tip Sedona 连接的性能受分区数影响很大。如果连接性能不理想,请在创建原始 DataFrame 之后执行 df.repartition(XXX) 以增加分区数。
简介:从 A 与 B 中查找满足某个谓词的几何对象配对。SedonaSQL 支持的大多数谓词都可以触发范围连接。
SQL 示例
SELECT * FROM polygondf, pointdf WHERE ST_Contains(polygondf.polygonshape,pointdf.pointshape)
SELECT * FROM polygondf, pointdf WHERE ST_Intersects(polygondf.polygonshape,pointdf.pointshape)
SELECT * FROM pointdf, polygondf WHERE ST_Within(pointdf.pointshape, polygondf.polygonshape)
SELECT * FROM pointdf, polygondf WHERE ST_DWithin(pointdf.pointshape, polygondf.polygonshape, 10.0)
Spark SQL 物理计划:
== Physical Plan == RangeJoin polygonshape#20: geometry, pointshape#43: geometry, false :- Project [st_polygonfromenvelope(cast(_c0#0 as decimal(24,20)), cast(_c1#1 as decimal(24,20)), cast(_c2#2 as decimal(24,20)), cast(_c3#3 as decimal(24,20)), mypolygonid) AS polygonshape#20] : +- *FileScan csv +- Project [st_point(cast(_c0#31 as decimal(24,20)), cast(_c1#32 as decimal(24,20)), myPointId) AS pointshape#43] +- *FileScan csv
!!!note SedonaSQL 中所有的连接查询都是内连接
简介:从 A 与 B 中查找彼此之间的距离小于或等于某个阈值的几何对象配对。支持平面欧氏距离计算器 ST_Distance、ST_HausdorffDistance、ST_FrechetDistance,以及基于米制的大地测量距离计算器 ST_DistanceSpheroid 与 ST_DistanceSphere。
平面欧氏距离的 Spark SQL 示例:
仅考虑==完全位于某个距离之内==
SELECT * FROM pointdf1, pointdf2 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2
SELECT * FROM pointDf, polygonDF WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
SELECT * FROM pointDf, polygonDF WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) < 2
考虑==与某个距离范围相交==
SELECT * FROM pointdf1, pointdf2 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) <= 2
SELECT * FROM pointDf, polygonDF WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
SELECT * FROM pointDf, polygonDF WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
Spark SQL 物理计划:
== Physical Plan == DistanceJoin pointshape1#12: geometry, pointshape2#33: geometry, 2.0, true :- Project [st_point(cast(_c0#0 as decimal(24,20)), cast(_c1#1 as decimal(24,20)), myPointId) AS pointshape1#12] : +- *FileScan csv +- Project [st_point(cast(_c0#21 as decimal(24,20)), cast(_c1#22 as decimal(24,20)), myPointId) AS pointshape2#33] +- *FileScan csv
!!!warning 如果使用 ST_Distance、ST_HausdorffDistance 或 ST_FrechetDistance 等平面欧氏距离函数作为谓词,Sedona 不会管理距离的单位(度或米),它与几何对象保持一致。如果坐标位于经纬度系统下,distance 的单位应是度,而不是米或英里。若想更换几何对象的单位,请将坐标参考系转换到基于米的坐标系。参见 ST_Transform。如果不想转换数据,请考虑使用 ST_DistanceSpheroid 或 ST_DistanceSphere。
基于米制大地测量距离 ST_DistanceSpheroid 的 Spark SQL 示例(对 ST_DistanceSphere 同样适用):
==小于某个距离==
SELECT * FROM pointdf1, pointdf2 WHERE ST_DistanceSpheroid(pointdf1.pointshape1,pointdf2.pointshape2) < 2
==小于等于某个距离==
SELECT * FROM pointdf1, pointdf2 WHERE ST_DistanceSpheroid(pointdf1.pointshape1,pointdf2.pointshape2) <= 2
!!!warning 如果使用 ST_DistanceSpheroid 或 ST_DistanceSphere 作为谓词,距离单位为米。目前,使用大地测量距离计算器的距离连接对点数据效果最好。对于非点数据,仅会考虑它们的质心。
简介:以范围连接或距离连接的空间性能来执行左连接。 这样既可以找到 A 与 B 中满足连接条件的几何对象配对,同时还能保留 A 中那些在 B 中找不到任何匹配几何的记录。
范围连接与距离连接==不支持==如下所示的 LEFT JOIN:
SELECT a.*, b.* FROM a LEFT JOIN b ON ST_INTERSECTS(a.geometry, b.geometry)
这会导致使用 BroadcastIndexJoin,在两个大数据集上会非常低效。 否则就会触发 BroadcastNestedLoopJoin,这是最慢的选项。
为了利用 Sedona 的空间连接性能,可以通过将一次 INNER JOIN 与一次 LEFT JOIN 组合,来生成左连接的结果。
WITH inner_join AS ( SELECT dfA.a_id , dfB.b_id FROM dfA, dfB WHERE ST_INTERSECTS(dfA.geometry, dfB.geometry) ) SELECT dfA.*, inner_join.b_id FROM dfA LEFT JOIN inner_join ON dfA.a_id = inner_join.a_id;
!!!note 可以将这种策略定义为一个存储过程或 DBT 宏,以避免重复编写相同的代码。
简介:执行范围连接或距离连接,但将其中一侧广播。这样可以保留非广播侧的分区,避免 shuffle。
Sedona 会在被广播的表上构建空间索引。
只有当正确的一侧带有 broadcast 提示时,Sedona 才会使用广播连接。 支持的连接类型——广播侧组合如下:
pointDf.alias("pointDf").join(broadcast(polygonDf).alias("polygonDf"), expr("ST_Contains(polygonDf.polygonshape, pointDf.pointshape)"))
在 SQL 中指定 broadcast 提示,使用以下语法:
SELECT /*+ BROADCAST(polygonDf) */ pointDf.*, polygonDf.* FROM pointDf JOIN polygonDf ON ST_Contains(polygonDf.polygonshape, pointDf.pointshape);
Spark SQL 物理计划:
== Physical Plan ==
BroadcastIndexJoin pointshape#52: geometry, BuildRight, BuildRight, false ST_Contains(polygonshape#30, pointshape#52)
:- Project [st_point(cast(_c0#48 as decimal(24,20)), cast(_c1#49 as decimal(24,20))) AS pointshape#52]
: +- FileScan csv
+- SpatialIndex polygonshape#30: geometry, QUADTREE, [id=#62]
+- Project [st_polygonfromenvelope(cast(_c0#22 as decimal(24,20)), cast(_c1#23 as decimal(24,20)), cast(_c2#24 as decimal(24,20)), cast(_c3#25 as decimal(24,20))) AS polygonshape#30]
+- FileScan csv
这同样适用于使用 ST_Distance、ST_DistanceSpheroid、ST_DistanceSphere、ST_HausdorffDistance 或 ST_FrechetDistance 的距离连接:
pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"), expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
Spark SQL 物理计划:
== Physical Plan ==
BroadcastIndexJoin pointshape#52: geometry, BuildRight, BuildLeft, true, 2.0 ST_Distance(pointshape#52, pointshape#415) <= 2.0
:- Project [st_point(cast(_c0#48 as decimal(24,20)), cast(_c1#49 as decimal(24,20))) AS pointshape#52]
: +- FileScan csv
+- SpatialIndex pointshape#415: geometry, QUADTREE, [id=#1068]
+- Project [st_point(cast(_c0#48 as decimal(24,20)), cast(_c1#49 as decimal(24,20))) AS pointshape#415]
+- FileScan csv
注意:如果 distance 是一个表达式,它只会在 ST_Distance 的第一个参数(上例中的 pointDf1)上求值。
当参与空间连接的某张表小于阈值时,Sedona 会自动选择广播索引连接而不是 Sedona 优化连接。当前阈值由 sedona.join.autoBroadcastJoinThreshold 控制,默认与 spark.sql.autoBroadcastJoinThreshold 保持一致。
空间连接的优化同样适用于栅格谓词,例如 RS_Intersects、RS_Contains 与 RS_Within。
SQL 示例:
-- Raster-geometry join SELECT df1.id, df2.id, RS_Value(df1.rast, df2.geom) FROM df1 JOIN df2 ON RS_Intersects(df1.rast, df2.geom) -- Raster-raster join SELECT df1.id, df2.id FROM df1 JOIN df2 ON RS_Intersects(df1.rast, df2.rast)
这些查询可被规划为 RangeJoin 或 BroadcastIndexJoin。下面是一个使用 RangeJoin 的物理计划示例:
== Physical Plan == *(1) Project [id#14, id#25] +- RangeJoin rast#13: raster, geom#24: geometry, INTERSECTS, **org.apache.spark.sql.sedona_sql.expressions.RS_Intersects** :- LocalTableScan [rast#13, id#14] +- LocalTableScan [geom#24, id#25]
如果 Sedona 优化连接性能不理想(可能由复杂且相互重叠的几何对象导致),可以借助 Sedona 内置的、基于 Google S2 的近似等值连接。该等值连接利用 Spark 内部的等值连接算法,并且如果你愿意牺牲一些查询精度跳过精化步骤,性能可能更佳。
请按以下步骤操作:
使用 ST_S2CellIDs 生成 cell ID。每个几何对象可能产生一个或多个 ID。
SELECT id, geom, name, explode(ST_S2CellIDs(geom, 15)) as cellId FROM lefts
SELECT id, geom, name, explode(ST_S2CellIDs(geom, 15)) as cellId FROM rights
通过两张表的 S2 cellId 进行连接
SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name FROM lcs JOIN rcs ON lcs.cellId = rcs.cellId
由于 S2 Cellid 的特性,依据所选 S2 层级的不同,等值连接结果中可能会有少量误报。层级越小,cell 越大,膨胀(exploded)后的行数越少,但误报越多。
为保证正确性,可以使用 空间谓词 之一来过滤掉误报。使用下面的查询替换第 2 步中的查询。
SELECT lcs.id as lcs_id, lcs.geom as lcs_geom, lcs.name as lcs_name, rcs.id as rcs_id, rcs.geom as rcs_geom, rcs.name as rcs_name FROM lcs, rcs WHERE lcs.cellId = rcs.cellId AND ST_Contains(lcs.geom, rcs.geom)
如你所见,相比第 2 步的查询,我们额外加了一个过滤条件 ST_Contains,用于去除误报。也可以使用 ST_Intersects 等其他谓词。
!!!tip 如果不需要 100% 的精度并希望获得更快的查询速度,可以跳过该步骤。
由于在生成 S2 Cell Ids 时使用了 explode 函数,结果 DataFrame 中可能会有若干重复的 <lcs_geom, rcs_geom> 匹配。可以通过 GroupBy 查询移除它们。
SELECT lcs_id, rcs_id, first(lcs_geom), first(lcs_name), first(rcs_geom), first(rcs_name) FROM joinresult GROUP BY (lcs_id, rcs_id)
first 函数用于在一组重复值中取第一个。
如果你没有每个几何对象的唯一 id,也可以按几何对象本身分组。见下:
SELECT lcs_geom, rcs_geom, first(lcs_name), first(rcs_name) FROM joinresult GROUP BY (lcs_geom, rcs_geom)
!!!note 如果你做的是 point-in-polygon 连接,则不存在此问题,可以放心忽略。该问题仅在 polygon-polygon、polygon-linestring、linestring-linestring 连接中出现。
这也适用于距离连接。你需要先用 ST_Buffer(geometry, distance) 包装其中一个原始几何列。如果原始几何列是点,这次 ST_Buffer 会将它们变成半径为 distance 的圆。
由于坐标位于经纬度系统下,distance 的单位应是度而非米或英里。可以通过 METER_DISTANCE/111000.0 得到近似值,然后过滤掉误报。注意,当数据接近极点或反子午线时,这种做法可能导致不准确的结果。
简而言之,在第 1 步之前先对左表运行下面的查询。请将 METER_DISTANCE 替换为以米为单位的距离。在第 1 步中,基于 buffered_geom 列生成 S2 ID。然后在原始的 geom 列上运行第 2、3、4 步。
SELECT id, geom, ST_Buffer(geom, METER_DISTANCE/111000.0) as buffered_geom, name FROM lefts
简介:当同一个 WHERE 子句中同时包含一个连接查询和一个谓词时,先将谓词作为过滤条件执行,再执行连接查询。
SQL 示例
SELECT * FROM polygondf, pointdf WHERE ST_Contains(polygondf.polygonshape,pointdf.pointshape) AND ST_Contains(ST_PolygonFromEnvelope(1.0,101.0,501.0,601.0), polygondf.polygonshape)
Spark SQL 物理计划:
== Physical Plan == RangeJoin polygonshape#20: geometry, pointshape#43: geometry, false :- Project [st_polygonfromenvelope(cast(_c0#0 as decimal(24,20)), cast(_c1#1 as decimal(24,20)), cast(_c2#2 as decimal(24,20)), cast(_c3#3 as decimal(24,20)), mypolygonid) AS polygonshape#20] : +- Filter **org.apache.spark.sql.sedona_sql.expressions.ST_Contains$** : +- *FileScan csv +- Project [st_point(cast(_c0#31 as decimal(24,20)), cast(_c1#32 as decimal(24,20)), myPointId) AS pointshape#43] +- *FileScan csv
Sedona 支持对 GeoParquet 文件进行空间谓词下推。当对由 GeoParquet 文件支撑的 dataframe 应用空间过滤时,Sedona 会利用 元数据中的 bbox 属性 来判断该文件中的全部数据是否会被该空间谓词过滤掉。当 GeoParquet 数据集按空间临近性进行分区时, 这种优化可以减少需要扫描的文件数量。
为了最大化 Sedona 对 GeoParquet 的过滤下推性能,建议按几何对象的 geohash 值(参见 ST_GeoHash)对数据排序后再保存为 GeoParquet 文件。示例如下:
SELECT col1, col2, geom, ST_GeoHash(geom, 5) as geohash FROM spatialDf ORDER BY geohash
下图展示了一个 GeoParquet 数据集的可视化。所有 GeoParquet 文件的 bbox 以蓝色矩形绘制,查询窗口以红色矩形绘制。Sedona 只会扫描 6 个文件中的 1 个就能 回答类似 SELECT * FROM geoparquet_dataset WHERE ST_Intersects(geom, <query window>) 的查询,因此只需要扫描浅绿色矩形覆盖的部分数据。
我们可以对比启用与不启用空间谓词时查询 GeoParquet 数据集的指标,可以观察到使用空间谓词时扫描的行数明显减少。
| 不使用空间谓词 | 使用空间谓词 |
|---|---|
将空间谓词下推到 GeoParquet 默认启用。用户可以通过将 Spark 配置 spark.sedona.geoparquet.spatialFilterPushDown 设置为 false 来手动禁用。