blob: 70868128e5717c92fe2fbf127fda55a2ba3063a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql.sedona_sql.strategy.join
import org.apache.sedona.core.spatialRDD.SpatialRDD
import org.apache.sedona.core.utils.SedonaConf
import org.apache.sedona.sql.utils.{GeometrySerializer, RasterSerializer}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeRow}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sedona_sql.UDT.RasterUDT
import org.locationtech.jts.geom.Geometry
trait TraitJoinQueryBase {
self: SparkPlan =>
def toSpatialRddPair(leftRdd: RDD[UnsafeRow],
leftShapeExpr: Expression,
rightRdd: RDD[UnsafeRow],
rightShapeExpr: Expression): (SpatialRDD[Geometry], SpatialRDD[Geometry]) = {
if (leftShapeExpr.dataType.isInstanceOf[RasterUDT] || rightShapeExpr.dataType.isInstanceOf[RasterUDT]) {
(toWGS84EnvelopeRDD(leftRdd, leftShapeExpr),
toWGS84EnvelopeRDD(rightRdd, rightShapeExpr))
} else {
(toSpatialRDD(leftRdd, leftShapeExpr), toSpatialRDD(rightRdd, rightShapeExpr))
}
}
def toSpatialRDD(rdd: RDD[UnsafeRow], shapeExpression: Expression): SpatialRDD[Geometry] = {
val spatialRdd = new SpatialRDD[Geometry]
spatialRdd.setRawSpatialRDD(
rdd
.map { x =>
val shape = GeometrySerializer.deserialize(shapeExpression.eval(x).asInstanceOf[Array[Byte]])
shape.setUserData(x.copy)
shape
}
.toJavaRDD())
spatialRdd
}
def toWGS84EnvelopeRDD(rdd: RDD[UnsafeRow], shapeExpression: Expression): SpatialRDD[Geometry] = {
// This RDD is for performing raster-geometry or raster-raster join, where we need to perform implicit CRS
// transformation for both sides. We use expanded WGS84 envelope as the joined geometries and perform a
// coarse-grained spatial join.
val spatialRdd = new SpatialRDD[Geometry]
val wgs84EnvelopeRdd = if (shapeExpression.dataType.isInstanceOf[RasterUDT]) {
rdd.map { row =>
val raster = RasterSerializer.deserialize(shapeExpression.eval(row).asInstanceOf[Array[Byte]])
val shape = JoinedGeometryRaster.rasterToWGS84Envelope(raster)
shape.setUserData(row.copy)
shape
}
} else {
rdd.map { row =>
val geom = GeometrySerializer.deserialize(shapeExpression.eval(row).asInstanceOf[Array[Byte]])
val shape = JoinedGeometryRaster.geometryToWGS84Envelope(geom)
shape.setUserData(row.copy)
shape
}
}
spatialRdd.setRawSpatialRDD(wgs84EnvelopeRdd)
spatialRdd
}
def toExpandedEnvelopeRDD(rdd: RDD[UnsafeRow], shapeExpression: Expression, boundRadius: Expression, isGeography: Boolean): SpatialRDD[Geometry] = {
val spatialRdd = new SpatialRDD[Geometry]
spatialRdd.setRawSpatialRDD(
rdd
.map { x =>
val shape = GeometrySerializer.deserialize(shapeExpression.eval(x).asInstanceOf[Array[Byte]])
val distance = boundRadius.eval(x).asInstanceOf[Double]
val expandedEnvelope = JoinedGeometry.geometryToExpandedEnvelope(shape, distance, isGeography)
expandedEnvelope.setUserData(x.copy)
expandedEnvelope
}
.toJavaRDD())
spatialRdd
}
def doSpatialPartitioning(dominantShapes: SpatialRDD[Geometry], followerShapes: SpatialRDD[Geometry],
numPartitions: Integer, sedonaConf: SedonaConf): Unit = {
if (dominantShapes.approximateTotalCount > 0) {
dominantShapes.spatialPartitioning(sedonaConf.getJoinGridType, numPartitions)
followerShapes.spatialPartitioning(dominantShapes.getPartitioner)
}
}
}