blob: dd278c6f16eb8ae5fa626a5bd7178d781df1be0e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{io::Write, sync::Arc, vec};
use arrow_array::builder::BinaryBuilder;
use datafusion_common::error::Result;
use datafusion_common::exec_err;
use datafusion_common::DataFusionError;
use datafusion_expr::{
scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation, Volatility,
};
use geo_traits::{CoordTrait, GeometryTrait, LineStringTrait, MultiPointTrait, PointTrait};
use sedona_common::sedona_internal_err;
use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
use sedona_geometry::wkb_factory::write_wkb_linestring_header;
use sedona_schema::datatypes::WKB_GEOGRAPHY;
use sedona_schema::{
datatypes::{SedonaType, WKB_GEOMETRY},
matchers::ArgMatcher,
};
use crate::executor::WkbExecutor;
/// ST_MakeLine() scalar UDF implementation
///
/// Native implementation to create geometries from coordinates.
/// See [`st_geogline_udf`] for the corresponding geography constructor.
pub fn st_makeline_udf() -> SedonaScalarUDF {
SedonaScalarUDF::new(
"st_makeline",
vec![
Arc::new(STMakeLine {
out_type: WKB_GEOMETRY,
}),
Arc::new(STMakeLine {
out_type: WKB_GEOGRAPHY,
}),
],
Volatility::Immutable,
Some(doc()),
)
}
fn doc() -> Documentation {
Documentation::builder(
DOC_SECTION_OTHER,
"Construct a line".to_string(),
"ST_MakeLine (g1: Geometry or Geography, g2: Geometry or Geography)".to_string(),
)
.with_argument("g1", "Geometry or Geography: The first point or geometry")
.with_argument("g2", "Geometry or Geography: The second point or geometry")
.with_sql_example("SELECT ST_MakeLine(ST_Point(0, 1), ST_Point(2, 3)) as geom")
.build()
}
#[derive(Debug)]
struct STMakeLine {
out_type: SedonaType,
}
impl SedonaScalarKernel for STMakeLine {
fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
let match_geom = ArgMatcher::is_geometry();
let match_geog = ArgMatcher::is_geography();
let arg_matchers = if match_geom.match_type(&self.out_type) {
vec![match_geom.clone(), match_geom]
} else if match_geog.match_type(&self.out_type) {
vec![match_geog.clone(), match_geog]
} else {
return sedona_internal_err!("Unexpected ST_MakeLine() output");
};
let matcher = ArgMatcher::new(arg_matchers, self.out_type.clone());
matcher.match_args(args)
}
fn invoke_batch(
&self,
arg_types: &[SedonaType],
args: &[ColumnarValue],
) -> Result<ColumnarValue> {
let executor = WkbExecutor::new(arg_types, args);
let min_segment_bytes = 1 + 4 + 4 + 16 + 16;
let mut builder = BinaryBuilder::with_capacity(
executor.num_iterations(),
min_segment_bytes * executor.num_iterations(),
);
let mut coords = Vec::new();
executor.execute_wkb_wkb_void(|lhs, rhs| {
match (lhs, rhs) {
(Some(lhs), Some(rhs)) => {
invoke_scalar(lhs, rhs, &mut coords, &mut builder)?;
builder.append_value([]);
}
_ => builder.append_null(),
};
Ok(())
})?;
executor.finish(Arc::new(builder.finish()))
}
}
fn invoke_scalar(
g1: &impl GeometryTrait<T = f64>,
g2: &impl GeometryTrait<T = f64>,
coords: &mut Vec<f64>,
out: &mut impl Write,
) -> Result<()> {
if g1.dim() != g2.dim() {
return exec_err!("Can't ST_MakeLine() with mismatched dimensions");
}
coords.clear();
let coord_size = g1.dim().size();
// Add the first item
add_coords(g1, coords, coord_size, None)?;
// If there were any coordinates in the second item, pull the last few items and
// pass to add_coords() so it can deduplicate
if coords.len() >= coord_size {
let mut last_coord = [0.0; 4];
last_coord[0..coord_size]
.copy_from_slice(&coords[(coords.len() - coord_size)..coords.len()]);
add_coords(g2, coords, coord_size, Some(&last_coord[0..coord_size]))?;
} else {
add_coords(g2, coords, coord_size, None)?;
}
let n_coords = coords.len() / coord_size;
write_wkb_linestring_header(out, g1.dim(), n_coords)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
for ord in coords {
out.write_all(&ord.to_le_bytes())?;
}
Ok(())
}
fn add_coords(
geom: &impl GeometryTrait<T = f64>,
coords: &mut Vec<f64>,
coord_size: usize,
last_coord: Option<&[f64]>,
) -> Result<()> {
match geom.as_type() {
geo_traits::GeometryType::Point(pt) => {
if let Some(coord) = pt.coord() {
for j in 0..coord_size {
coords.push(unsafe { coord.nth_unchecked(j) });
}
}
}
geo_traits::GeometryType::LineString(ls) => {
for (i, coord) in ls.coords().enumerate() {
// Deduplicate the first point of any appended linestring
if i == 0 {
let mut tmp = Vec::new();
for j in 0..coord_size {
tmp.push(unsafe { coord.nth_unchecked(j) });
}
if last_coord != Some(tmp.as_slice()) {
for item in tmp {
coords.push(item);
}
}
} else {
for j in 0..coord_size {
coords.push(unsafe { coord.nth_unchecked(j) });
}
}
}
}
geo_traits::GeometryType::MultiPoint(mp) => {
for pt in mp.points() {
add_coords(&pt, coords, coord_size, None)?;
}
}
_ => {
return exec_err!(
"ST_MakeLine() only supports Point, LineString, and MultiPoint as input"
)
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use datafusion_expr::ScalarUDF;
use rstest::rstest;
use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_VIEW_GEOGRAPHY, WKB_VIEW_GEOMETRY};
use sedona_testing::{
testers::ScalarUdfTester,
{compare::assert_array_equal, create::create_array},
};
#[test]
fn udf_metadata() {
let geom_from_point: ScalarUDF = st_makeline_udf().into();
assert_eq!(geom_from_point.name(), "st_makeline");
assert!(geom_from_point.documentation().is_some());
}
#[rstest]
fn udf_invoke(#[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY)] sedona_type: SedonaType) {
let tester = ScalarUdfTester::new(
st_makeline_udf().into(),
vec![sedona_type.clone(), sedona_type.clone()],
);
tester.assert_return_type(WKB_GEOMETRY);
// Basic usage
let result = tester
.invoke_scalar_scalar("POINT (0 1)", "POINT (2 3)")
.unwrap();
tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)");
// Deduplicating the first point of a linestring
let result = tester
.invoke_scalar_scalar("POINT (0 1)", "LINESTRING (0 1, 2 3)")
.unwrap();
tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)");
// Two linestrings should work as well
let result = tester
.invoke_scalar_scalar("LINESTRING (0 1, 2 3)", "LINESTRING (4 5, 6 7)")
.unwrap();
tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3, 4 5, 6 7)");
// Also multipoints
let result = tester
.invoke_scalar_scalar("MULTIPOINT (0 1, 2 3)", "MULTIPOINT (4 5, 6 7)")
.unwrap();
tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3, 4 5, 6 7)");
// Mismatched dimensions or unsupported types should error
let err = tester
.invoke_scalar_scalar("POINT (0 1)", "POINT Z (1 2 3)")
.unwrap_err();
assert_eq!(
err.message(),
"Can't ST_MakeLine() with mismatched dimensions"
);
let err = tester
.invoke_scalar_scalar("POINT (0 1)", "POLYGON EMPTY")
.unwrap_err();
assert_eq!(
err.message(),
"ST_MakeLine() only supports Point, LineString, and MultiPoint as input"
);
// Arrays, nulls, and dimensions
let array0 = create_array(
&[
Some("POINT (0 1)"),
Some("POINT Z (0 1 2)"),
Some("POINT M (0 1 3)"),
Some("POINT ZM (0 1 2 3)"),
Some("POINT (0 0)"),
None,
None,
],
&sedona_type,
);
let array1 = create_array(
&[
Some("POINT (10 11)"),
Some("POINT Z (10 11 12)"),
Some("POINT M (10 11 13)"),
Some("POINT ZM (10 11 12 13)"),
None,
Some("POINT (0 0)"),
None,
],
&sedona_type,
);
let expected = create_array(
&[
Some("LINESTRING (0 1, 10 11)"),
Some("LINESTRING Z (0 1 2, 10 11 12)"),
Some("LINESTRING M (0 1 3, 10 11 13)"),
Some("LINESTRING ZM (0 1 2 3, 10 11 12 13)"),
None,
None,
None,
],
&WKB_GEOMETRY,
);
let result = tester.invoke_array_array(array0, array1).unwrap();
assert_array_equal(&result, &expected);
}
#[rstest]
fn udf_invoke_geog(#[values(WKB_GEOGRAPHY, WKB_VIEW_GEOGRAPHY)] sedona_type: SedonaType) {
let tester = ScalarUdfTester::new(
st_makeline_udf().into(),
vec![sedona_type.clone(), sedona_type.clone()],
);
tester.assert_return_type(WKB_GEOGRAPHY);
// Basic usage
let result = tester
.invoke_scalar_scalar("POINT (0 1)", "POINT (2 3)")
.unwrap();
tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)");
}
}