blob: 9554dd68e175808b1327d3a7d2e947dff48447e6 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! FunctionRegistry trait
use crate::expr_rewriter::FunctionRewrite;
use crate::planner::ExprPlanner;
use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result};
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
/// A registry knows how to build logical expressions out of user-defined function' names
pub trait FunctionRegistry {
/// Returns names of all available scalar user defined functions.
fn udfs(&self) -> HashSet<String>;
/// Returns names of all available aggregate user defined functions.
fn udafs(&self) -> HashSet<String>;
/// Returns names of all available window user defined functions.
fn udwfs(&self) -> HashSet<String>;
/// Returns a reference to the user defined scalar function (udf) named
/// `name`.
fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>>;
/// Returns a reference to the user defined aggregate function (udaf) named
/// `name`.
fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>>;
/// Returns a reference to the user defined window function (udwf) named
/// `name`.
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>>;
/// Registers a new [`ScalarUDF`], returning any previously registered
/// implementation.
///
/// Returns an error (the default) if the function can not be registered,
/// for example if the registry is read only.
fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
not_impl_err!("Registering ScalarUDF")
}
/// Registers a new [`AggregateUDF`], returning any previously registered
/// implementation.
///
/// Returns an error (the default) if the function can not be registered,
/// for example if the registry is read only.
fn register_udaf(
&mut self,
_udaf: Arc<AggregateUDF>,
) -> Result<Option<Arc<AggregateUDF>>> {
not_impl_err!("Registering AggregateUDF")
}
/// Registers a new [`WindowUDF`], returning any previously registered
/// implementation.
///
/// Returns an error (the default) if the function can not be registered,
/// for example if the registry is read only.
fn register_udwf(&mut self, _udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
not_impl_err!("Registering WindowUDF")
}
/// Deregisters a [`ScalarUDF`], returning the implementation that was
/// deregistered.
///
/// Returns an error (the default) if the function can not be deregistered,
/// for example if the registry is read only.
fn deregister_udf(&mut self, _name: &str) -> Result<Option<Arc<ScalarUDF>>> {
not_impl_err!("Deregistering ScalarUDF")
}
/// Deregisters a [`AggregateUDF`], returning the implementation that was
/// deregistered.
///
/// Returns an error (the default) if the function can not be deregistered,
/// for example if the registry is read only.
fn deregister_udaf(&mut self, _name: &str) -> Result<Option<Arc<AggregateUDF>>> {
not_impl_err!("Deregistering AggregateUDF")
}
/// Deregisters a [`WindowUDF`], returning the implementation that was
/// deregistered.
///
/// Returns an error (the default) if the function can not be deregistered,
/// for example if the registry is read only.
fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
not_impl_err!("Deregistering WindowUDF")
}
/// Registers a new [`FunctionRewrite`] with the registry.
///
/// `FunctionRewrite` rules are used to rewrite certain / operators in the
/// logical plan to function calls. For example `a || b` might be written to
/// `array_concat(a, b)`.
///
/// This allows the behavior of operators to be customized by the user.
fn register_function_rewrite(
&mut self,
_rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
) -> Result<()> {
not_impl_err!("Registering FunctionRewrite")
}
/// Set of all registered [`ExprPlanner`]s
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>>;
/// Registers a new [`ExprPlanner`] with the registry.
fn register_expr_planner(
&mut self,
_expr_planner: Arc<dyn ExprPlanner>,
) -> Result<()> {
not_impl_err!("Registering ExprPlanner")
}
}
/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
pub trait SerializerRegistry: Debug + Send + Sync {
/// Serialize this node to a byte array. This serialization should not include
/// input plans.
fn serialize_logical_plan(
&self,
node: &dyn UserDefinedLogicalNode,
) -> Result<Vec<u8>>;
/// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
/// bytes.
fn deserialize_logical_plan(
&self,
name: &str,
bytes: &[u8],
) -> Result<Arc<dyn UserDefinedLogicalNode>>;
}
/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s
#[derive(Default, Debug)]
pub struct MemoryFunctionRegistry {
/// Scalar Functions
udfs: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate Functions
udafs: HashMap<String, Arc<AggregateUDF>>,
/// Window Functions
udwfs: HashMap<String, Arc<WindowUDF>>,
}
impl MemoryFunctionRegistry {
pub fn new() -> Self {
Self::default()
}
}
impl FunctionRegistry for MemoryFunctionRegistry {
fn udfs(&self) -> HashSet<String> {
self.udfs.keys().cloned().collect()
}
fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
self.udfs
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("Function {name} not found"))
}
fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
self.udafs
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found"))
}
fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
self.udwfs
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("Window Function {name} not found"))
}
fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
Ok(self.udfs.insert(udf.name().to_string(), udf))
}
fn register_udaf(
&mut self,
udaf: Arc<AggregateUDF>,
) -> Result<Option<Arc<AggregateUDF>>> {
Ok(self.udafs.insert(udaf.name().into(), udaf))
}
fn register_udwf(&mut self, udaf: Arc<WindowUDF>) -> Result<Option<Arc<WindowUDF>>> {
Ok(self.udwfs.insert(udaf.name().into(), udaf))
}
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
vec![]
}
fn udafs(&self) -> HashSet<String> {
self.udafs.keys().cloned().collect()
}
fn udwfs(&self) -> HashSet<String> {
self.udwfs.keys().cloned().collect()
}
}