blob: 5163a6701ee046f0c7e754f8449b44dfc8e44813 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Spark-compatible function bindings.
These functions mirror the semantics of their Apache Spark counterparts
exactly. Some override DataFusion built-ins (``substring`` is 1-indexed,
``concat`` propagates NULL, ``round`` uses HALF_UP rounding, etc.), which is
why they live in a separate namespace rather than replacing the defaults.
For DataFrame use, import this module and call functions directly. For SQL
use, call :py:meth:`datafusion.SessionContext.enable_spark_functions` to
register the Spark UDFs by name (overriding any built-ins with matching
names) before issuing SQL queries.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import pyarrow as pa
from datafusion._internal import functions as _functions
from datafusion.expr import (
Expr,
_to_raw_expr,
coerce_to_expr,
sort_list_to_raw_sort_list,
)
if TYPE_CHECKING:
from datafusion.common import NullTreatment
from datafusion.expr import SortKey
_f = _functions.spark
# Reused int32 literal so optional-arg defaults don't rebuild it per call.
_ZERO_I32 = Expr.literal(pa.scalar(0, type=pa.int32()))
def _filter_raw(filter: Expr | None) -> Any:
return filter.expr if filter is not None else None
def _coerce_i32(value: Expr | int | None) -> Expr | None:
"""Coerce a native ``int`` to an int32 literal, passing ``Expr``/``None`` through.
Several Spark datetime and interval builders require 32-bit integer
inputs, so a bare ``int`` must become an int32 literal rather than the
int64 default that :meth:`Expr.literal` would produce.
"""
if value is None or isinstance(value, Expr):
return value
return Expr.literal(pa.scalar(value, type=pa.int32()))
# ---------------------------------------------------------------------------
# Aggregate functions
# ---------------------------------------------------------------------------
def avg(
col: Expr,
distinct: bool | None = None,
filter: Expr | None = None,
order_by: list[SortKey] | SortKey | None = None,
null_treatment: NullTreatment | None = None,
) -> Expr:
"""Spark ``avg``: returns the mean of a numeric column.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
>>> r = df.aggregate(
... [], [dfn.functions.spark.avg(dfn.col("a")).alias("v")])
>>> r.collect_column("v")[0].as_py()
2.0
"""
return Expr(
_f.avg(
col.expr,
distinct=distinct,
filter=_filter_raw(filter),
order_by=sort_list_to_raw_sort_list(order_by),
null_treatment=null_treatment,
)
)
def try_sum(
col: Expr,
distinct: bool | None = None,
filter: Expr | None = None,
order_by: list[SortKey] | SortKey | None = None,
null_treatment: NullTreatment | None = None,
) -> Expr:
"""Spark ``try_sum``: sum that returns NULL on overflow.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> r = df.aggregate(
... [], [dfn.functions.spark.try_sum(dfn.col("a")).alias("v")])
>>> r.collect_column("v")[0].as_py()
6
"""
return Expr(
_f.try_sum(
col.expr,
distinct=distinct,
filter=_filter_raw(filter),
order_by=sort_list_to_raw_sort_list(order_by),
null_treatment=null_treatment,
)
)
def collect_list(
col: Expr,
distinct: bool | None = None,
filter: Expr | None = None,
order_by: list[SortKey] | SortKey | None = None,
null_treatment: NullTreatment | None = None,
) -> Expr:
"""Spark ``collect_list``: collect values into an array (preserves dups).
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 2]})
>>> r = df.aggregate(
... [], [dfn.functions.spark.collect_list(dfn.col("a")).alias("v")])
>>> sorted(r.collect_column("v")[0].as_py())
[1, 2, 2]
"""
return Expr(
_f.collect_list(
col.expr,
distinct=distinct,
filter=_filter_raw(filter),
order_by=sort_list_to_raw_sort_list(order_by),
null_treatment=null_treatment,
)
)
def collect_set(
col: Expr,
distinct: bool | None = None,
filter: Expr | None = None,
order_by: list[SortKey] | SortKey | None = None,
null_treatment: NullTreatment | None = None,
) -> Expr:
"""Spark ``collect_set``: collect distinct values into an array.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 2, 3]})
>>> r = df.aggregate(
... [], [dfn.functions.spark.collect_set(dfn.col("a")).alias("v")])
>>> sorted(r.collect_column("v")[0].as_py())
[1, 2, 3]
"""
return Expr(
_f.collect_set(
col.expr,
distinct=distinct,
filter=_filter_raw(filter),
order_by=sort_list_to_raw_sort_list(order_by),
null_treatment=null_treatment,
)
)
# ---------------------------------------------------------------------------
# Array functions
# ---------------------------------------------------------------------------
def array_contains(col: Expr, value: Expr | Any) -> Expr:
"""Spark ``array_contains``: true if the array contains the element.
``value`` accepts a native Python literal or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.array_contains(
... dfn.functions.spark.array(dfn.lit(1), dfn.lit(2)), 1
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
True
"""
return Expr(_f.array_contains(col.expr, coerce_to_expr(value).expr))
def array(*cols: Expr) -> Expr:
"""Spark ``array``: builds an array from the given elements.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.array(
... dfn.lit(1), dfn.lit(2), dfn.lit(3)
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
[1, 2, 3]
"""
return Expr(_f.array(*[c.expr for c in cols]))
def shuffle(col: Expr, seed: int | None = None) -> Expr:
"""Spark ``shuffle``: returns a random permutation of the input array.
``seed`` is accepted for pyspark parity but is not yet wired through the
Rust binding; passing a non-``None`` value raises ``NotImplementedError``.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.shuffle(
... dfn.functions.spark.array(dfn.lit(1), dfn.lit(2), dfn.lit(3))
... ).alias("v")
... )
>>> sorted(r.collect_column("v")[0].as_py())
[1, 2, 3]
"""
if seed is not None:
msg = "shuffle(seed=...) is not yet supported by the Spark UDF binding"
raise NotImplementedError(msg)
return Expr(_f.shuffle(col.expr))
def array_repeat(col: Expr, count: Expr | int) -> Expr:
"""Spark ``array_repeat``: array of ``element`` repeated ``count`` times.
``count`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.array_repeat(dfn.lit("a"), 3).alias("v"))
>>> r.collect_column("v")[0].as_py()
['a', 'a', 'a']
"""
return Expr(_f.array_repeat(col.expr, coerce_to_expr(count).expr))
def slice(x: Expr, start: Expr | int, length: Expr | int) -> Expr:
"""Spark ``slice``: subset of the array from 1-indexed ``start`` with ``length``.
Negative ``start`` counts from the end. ``start`` and ``length`` accept
native ``int`` values or :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.slice(
... dfn.functions.spark.array(
... dfn.lit(1), dfn.lit(2), dfn.lit(3), dfn.lit(4)),
... 2, 2,
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
[2, 3]
"""
return Expr(
_f.slice(x.expr, coerce_to_expr(start).expr, coerce_to_expr(length).expr)
)
# ---------------------------------------------------------------------------
# Bitmap functions
# ---------------------------------------------------------------------------
def bitmap_count(col: Expr) -> Expr:
r"""Spark ``bitmap_count``: number of set bits in a bitmap.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.bitmap_count(dfn.lit(b"\xff")).alias("v"))
>>> r.collect_column("v")[0].as_py()
8
"""
return Expr(_f.bitmap_count(col.expr))
def bitmap_bit_position(col: Expr) -> Expr:
"""Spark ``bitmap_bit_position``: bit position for a child expression.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.bitmap_bit_position(dfn.lit(15)).alias("v"))
>>> r.collect_column("v")[0].as_py()
14
"""
return Expr(_f.bitmap_bit_position(col.expr))
def bitmap_bucket_number(col: Expr) -> Expr:
"""Spark ``bitmap_bucket_number``: bucket number for a child expression.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.bitmap_bucket_number(dfn.lit(15)).alias("v"))
>>> r.collect_column("v")[0].as_py()
1
"""
return Expr(_f.bitmap_bucket_number(col.expr))
# ---------------------------------------------------------------------------
# Bitwise functions
# ---------------------------------------------------------------------------
def bit_get(col: Expr, pos: Expr | str) -> Expr:
"""Spark ``bit_get``: returns the bit (0 or 1) at ``pos``.
A bare ``str`` ``pos`` is treated as a column name (matching pyspark),
not a literal; pass :func:`~datafusion.lit` for a literal position.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.bit_get(dfn.lit(5), dfn.lit(0)).alias("v"))
>>> r.collect_column("v")[0].as_py()
1
"""
return Expr(_f.bit_get(col.expr, _to_raw_expr(pos)))
def bit_count(col: Expr) -> Expr:
"""Spark ``bit_count``: number of bits set in the integer's binary form.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.bit_count(dfn.lit(7)).alias("v"))
>>> r.collect_column("v")[0].as_py()
3
"""
return Expr(_f.bit_count(col.expr))
def bitwise_not(col: Expr) -> Expr:
"""Spark ``~``: bitwise NOT.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.bitwise_not(dfn.lit(0)).alias("v"))
>>> r.collect_column("v")[0].as_py()
-1
"""
return Expr(_f.bitwise_not(col.expr))
def shiftleft(col: Expr, numBits: Expr | int) -> Expr: # noqa: N803
"""Spark ``shiftleft``: ``value`` shifted left by ``shift`` bits.
``numBits`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.shiftleft(dfn.lit(1), 3).alias("v"))
>>> r.collect_column("v")[0].as_py()
8
"""
return Expr(_f.shiftleft(col.expr, coerce_to_expr(numBits).expr))
def shiftright(col: Expr, numBits: Expr | int) -> Expr: # noqa: N803
"""Spark ``shiftright``: arithmetic right shift.
``numBits`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.shiftright(dfn.lit(8), 2).alias("v"))
>>> r.collect_column("v")[0].as_py()
2
"""
return Expr(_f.shiftright(col.expr, coerce_to_expr(numBits).expr))
def shiftrightunsigned(col: Expr, numBits: Expr | int) -> Expr: # noqa: N803
"""Spark ``shiftrightunsigned``: logical (unsigned) right shift.
``numBits`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.shiftrightunsigned(dfn.lit(8), 2).alias("v"))
>>> r.collect_column("v")[0].as_py()
2
"""
return Expr(_f.shiftrightunsigned(col.expr, coerce_to_expr(numBits).expr))
# ---------------------------------------------------------------------------
# Collection / Conditional / Conversion
# ---------------------------------------------------------------------------
def size(col: Expr) -> Expr:
"""Spark ``size``: length of an array or map.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.size(
... dfn.functions.spark.array(dfn.lit(1), dfn.lit(2), dfn.lit(3))
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
3
"""
return Expr(_f.size(col.expr))
def if_(condition: Expr, if_true: Expr | Any, if_false: Expr | Any) -> Expr:
"""Spark ``if``: returns ``if_true`` when ``condition`` is true, else ``if_false``.
Exposed as ``if_`` because ``if`` is a Python keyword. ``if_true`` and
``if_false`` accept native Python literals or :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.if_(
... dfn.lit(2) > dfn.lit(1), "big", "small"
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'big'
"""
return Expr(
_f.if_(
condition.expr, coerce_to_expr(if_true).expr, coerce_to_expr(if_false).expr
)
)
def spark_cast(arg: Expr, type_str: Expr | str) -> Expr:
"""Spark ``cast``: cast ``arg`` to the type named by ``type_str``.
Uses Spark cast semantics (e.g. overflow returns NULL, not error).
``type_str`` accepts a native ``str`` or an :class:`Expr`.
Currently only supports casting numeric values to ``"timestamp"``.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.spark_cast(
... dfn.lit(1579098645), "timestamp"
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py().isoformat()
'2020-01-15T14:30:45+00:00'
"""
return Expr(_f.spark_cast(arg.expr, coerce_to_expr(type_str).expr))
# ---------------------------------------------------------------------------
# Datetime functions
# ---------------------------------------------------------------------------
def add_months(start: Expr, months: Expr | int) -> Expr:
"""Spark ``add_months``: date + N months.
``months`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.add_months(d, 2).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 3, 15)
"""
return Expr(_f.add_months(start.expr, _coerce_i32(months).expr))
def date_add(start: Expr, days: Expr | int) -> Expr:
"""Spark ``date_add``: date + N days.
``days`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.date_add(d, 5).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 1, 20)
"""
return Expr(_f.date_add(start.expr, _coerce_i32(days).expr))
def date_sub(start: Expr, days: Expr | int) -> Expr:
"""Spark ``date_sub``: date - N days.
``days`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.date_sub(d, 5).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 1, 10)
"""
return Expr(_f.date_sub(start.expr, _coerce_i32(days).expr))
def hour(col: Expr) -> Expr:
"""Spark ``hour``: extract hour component of a timestamp.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.hour(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
14
"""
return Expr(_f.hour(col.expr))
def minute(col: Expr) -> Expr:
"""Spark ``minute``: extract minute component of a timestamp.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.minute(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
30
"""
return Expr(_f.minute(col.expr))
def second(col: Expr) -> Expr:
"""Spark ``second``: extract second component of a timestamp.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.second(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
45
"""
return Expr(_f.second(col.expr))
def last_day(col: Expr) -> Expr:
"""Spark ``last_day``: last day of the month containing the date.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.last_day(d).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 1, 31)
"""
return Expr(_f.last_day(col.expr))
def make_dt_interval(
days: Expr | int | None = None,
hours: Expr | int | None = None,
mins: Expr | int | None = None,
secs: Expr | float | None = None,
) -> Expr:
"""Spark ``make_dt_interval``: day-time interval from components.
All parts are optional; omitted parts default to zero, matching pyspark.
Integer parts accept a native ``int`` and ``secs`` accepts a ``float``,
or any part may be an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.make_dt_interval().alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.timedelta(0)
>>> r = df.select(
... dfn.functions.spark.make_dt_interval(
... days=1, hours=2, mins=3, secs=4.5
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
datetime.timedelta(days=1, seconds=7384, microseconds=500000)
"""
return Expr(
_f.make_dt_interval(
(_coerce_i32(days) if days is not None else _ZERO_I32).expr,
(_coerce_i32(hours) if hours is not None else _ZERO_I32).expr,
(_coerce_i32(mins) if mins is not None else _ZERO_I32).expr,
(coerce_to_expr(secs) if secs is not None else Expr.literal(0.0)).expr,
)
)
def make_interval(
years: Expr | int | None = None,
months: Expr | int | None = None,
weeks: Expr | int | None = None,
days: Expr | int | None = None,
hours: Expr | int | None = None,
mins: Expr | int | None = None,
secs: Expr | float | None = None,
) -> Expr:
"""Spark ``make_interval``: interval from year/month/week/day/hour/min/sec parts.
All parts are optional; omitted parts default to zero, matching pyspark.
Integer parts accept a native ``int`` and ``secs`` accepts a ``float``,
or any part may be an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.make_interval().alias("v"))
>>> r.collect_column("v")[0].as_py().months
0
>>> r = df.select(dfn.functions.spark.make_interval(years=1).alias("v"))
>>> r.collect_column("v")[0].as_py().months
12
"""
return Expr(
_f.make_interval(
(_coerce_i32(years) if years is not None else _ZERO_I32).expr,
(_coerce_i32(months) if months is not None else _ZERO_I32).expr,
(_coerce_i32(weeks) if weeks is not None else _ZERO_I32).expr,
(_coerce_i32(days) if days is not None else _ZERO_I32).expr,
(_coerce_i32(hours) if hours is not None else _ZERO_I32).expr,
(_coerce_i32(mins) if mins is not None else _ZERO_I32).expr,
(coerce_to_expr(secs) if secs is not None else Expr.literal(0.0)).expr,
)
)
def next_day(date: Expr, dayOfWeek: Expr | str) -> Expr: # noqa: N803
"""Spark ``next_day``: first date after ``start_date`` named ``day_of_week``.
``dayOfWeek`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.next_day(d, "Mon").alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 1, 20)
"""
return Expr(_f.next_day(date.expr, coerce_to_expr(dayOfWeek).expr))
def date_diff(end: Expr, start: Expr) -> Expr:
"""Spark ``date_diff``: number of days from ``start_date`` to ``end_date``.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> end = dfn.lit(pa.scalar(date(2020, 1, 20), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.date_diff(end, d).alias("v"))
>>> r.collect_column("v")[0].as_py()
5
"""
return Expr(_f.date_diff(end.expr, start.expr))
def date_trunc(format: Expr | str, timestamp: Expr) -> Expr:
"""Spark ``date_trunc``: truncate timestamp to unit ``fmt``.
``format`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(
... dfn.functions.spark.date_trunc("month", ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.datetime(2020, 1, 1, 0, 0)
"""
return Expr(_f.date_trunc(coerce_to_expr(format).expr, timestamp.expr))
def time_trunc(unit: Expr | str, time: Expr) -> Expr:
"""Spark ``time_trunc``: truncate time value to unit ``fmt``.
A bare ``str`` ``unit`` is treated as a column name (matching pyspark),
not a literal; pass :func:`~datafusion.lit` for a literal unit.
Examples:
>>> import pyarrow as pa
>>> from datetime import time
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> t = dfn.lit(pa.scalar(time(14, 30, 45), type=pa.time64('us')))
>>> r = df.select(
... dfn.functions.spark.time_trunc(dfn.lit("hour"), t).alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.time(14, 0)
"""
return Expr(_f.time_trunc(_to_raw_expr(unit), time.expr))
def trunc(date: Expr, format: Expr | str) -> Expr:
"""Spark ``trunc``: truncate date to unit ``fmt``.
``format`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.trunc(d, "YEAR").alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.date(2020, 1, 1)
"""
return Expr(_f.trunc(date.expr, coerce_to_expr(format).expr))
def date_part(field: Expr | str, source: Expr) -> Expr:
"""Spark ``date_part``: extract ``field`` from a date/time/timestamp.
``field`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(
... dfn.functions.spark.date_part("year", d).alias("v"))
>>> r.collect_column("v")[0].as_py()
2020
"""
return Expr(_f.date_part(coerce_to_expr(field).expr, source.expr))
def from_utc_timestamp(timestamp: Expr, tz: Expr | str) -> Expr:
"""Spark ``from_utc_timestamp``: interpret ``ts`` as UTC, convert to ``tz``.
``tz`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(
... dfn.functions.spark.from_utc_timestamp(ts, "UTC").alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.datetime(2020, 1, 15, 14, 30, 45)
"""
return Expr(_f.from_utc_timestamp(timestamp.expr, coerce_to_expr(tz).expr))
def to_utc_timestamp(timestamp: Expr, tz: Expr | str) -> Expr:
"""Spark ``to_utc_timestamp``: interpret ``ts`` as ``tz``, convert to UTC.
``tz`` accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(
... dfn.functions.spark.to_utc_timestamp(ts, "UTC").alias("v"))
>>> r.collect_column("v")[0].as_py()
datetime.datetime(2020, 1, 15, 14, 30, 45)
"""
return Expr(_f.to_utc_timestamp(timestamp.expr, coerce_to_expr(tz).expr))
def unix_date(col: Expr) -> Expr:
"""Spark ``unix_date``: days since 1970-01-01 for ``dt``.
Examples:
>>> import pyarrow as pa
>>> from datetime import date
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> d = dfn.lit(pa.scalar(date(2020, 1, 15), type=pa.date32()))
>>> r = df.select(dfn.functions.spark.unix_date(d).alias("v"))
>>> r.collect_column("v")[0].as_py()
18276
"""
return Expr(_f.unix_date(col.expr))
def unix_micros(col: Expr) -> Expr:
"""Spark ``unix_micros``: microseconds since epoch for ``ts``.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.unix_micros(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
1579098645000000
"""
return Expr(_f.unix_micros(col.expr))
def unix_millis(col: Expr) -> Expr:
"""Spark ``unix_millis``: milliseconds since epoch for ``ts``.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.unix_millis(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
1579098645000
"""
return Expr(_f.unix_millis(col.expr))
def unix_seconds(col: Expr) -> Expr:
"""Spark ``unix_seconds``: seconds since epoch for ``ts``.
Examples:
>>> import pyarrow as pa
>>> from datetime import datetime
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> ts = dfn.lit(
... pa.scalar(datetime(2020, 1, 15, 14, 30, 45),
... type=pa.timestamp('us')))
>>> r = df.select(dfn.functions.spark.unix_seconds(ts).alias("v"))
>>> r.collect_column("v")[0].as_py()
1579098645
"""
return Expr(_f.unix_seconds(col.expr))
# ---------------------------------------------------------------------------
# Hash functions
# ---------------------------------------------------------------------------
def crc32(col: Expr) -> Expr:
"""Spark ``crc32``: cyclic redundancy check value as a bigint.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"s": ["ABC"]})
>>> r = df.select(dfn.functions.spark.crc32(dfn.col("s")).alias("v"))
>>> r.collect_column("v")[0].as_py()
2743272264
"""
return Expr(_f.crc32(col.expr))
def sha1(col: Expr) -> Expr:
"""Spark ``sha1``: SHA-1 hash as a hex string.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"s": ["hello"]})
>>> r = df.select(dfn.functions.spark.sha1(dfn.col("s")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d'
"""
return Expr(_f.sha1(col.expr))
def sha2(col: Expr, numBits: Expr | int) -> Expr: # noqa: N803
"""Spark ``sha2``: SHA-2 family hash (224, 256, 384, 512). Bit length 0 = 256.
``numBits`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"s": ["hello"]})
>>> r = df.select(
... dfn.functions.spark.sha2(dfn.col("s"), 256).alias("v"))
>>> r.collect_column("v")[0].as_py()
'2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824'
"""
return Expr(_f.sha2(col.expr, coerce_to_expr(numBits).expr))
def xxhash64(*cols: Expr) -> Expr:
"""Spark ``xxhash64``: 64-bit xxHash of the arguments.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.xxhash64(dfn.lit("hello")).alias("v"))
>>> r.collect_column("v")[0].as_py()
-4367754540140381902
"""
return Expr(_f.xxhash64(*[c.expr for c in cols]))
# ---------------------------------------------------------------------------
# JSON functions
# ---------------------------------------------------------------------------
def json_tuple(col: Expr, *fields: Expr | str) -> Expr:
"""Spark ``json_tuple``: extract top-level fields from a JSON string.
Each field name accepts a native ``str`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.json_tuple(
... dfn.lit('{"a":1,"b":"x"}'), "a", "b"
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
{'c0': '1', 'c1': 'x'}
"""
return Expr(_f.json_tuple(col.expr, *[coerce_to_expr(f).expr for f in fields]))
# ---------------------------------------------------------------------------
# Map functions
# ---------------------------------------------------------------------------
def map_from_arrays(col1: Expr, col2: Expr) -> Expr:
"""Spark ``map_from_arrays``: build a map from parallel key/value arrays.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> keys = dfn.functions.spark.array(dfn.lit("a"), dfn.lit("b"))
>>> vals = dfn.functions.spark.array(dfn.lit(1), dfn.lit(2))
>>> r = df.select(
... dfn.functions.spark.map_from_arrays(keys, vals).alias("v"))
>>> r.collect_column("v")[0].as_py()
[('a', 1), ('b', 2)]
"""
return Expr(_f.map_from_arrays(col1.expr, col2.expr))
def map_from_entries(col: Expr) -> Expr:
"""Spark ``map_from_entries``: build a map from an array of key/value structs.
``col`` must be an array whose elements are two-field structs; the first
field becomes the map key and the second the value.
Examples:
>>> import pyarrow as pa
>>> ctx = dfn.SessionContext()
>>> entry_type = pa.list_(
... pa.struct([("key", pa.string()), ("value", pa.int64())]))
>>> entries = pa.array(
... [[{"key": "a", "value": 1}, {"key": "b", "value": 2}]],
... type=entry_type)
>>> df = ctx.from_arrow(pa.record_batch([entries], names=["e"]))
>>> r = df.select(
... dfn.functions.spark.map_from_entries(dfn.col("e")).alias("v"))
>>> r.collect_column("v")[0].as_py()
[('a', 1), ('b', 2)]
"""
return Expr(_f.map_from_entries(col.expr))
def str_to_map(
text: Expr,
pairDelim: Expr | str | None = None, # noqa: N803
keyValueDelim: Expr | str | None = None, # noqa: N803
) -> Expr:
"""Spark ``str_to_map``: split text into key/value pairs using delimiters.
Delimiters default to ``","`` and ``":"`` when omitted, matching pyspark.
Parameter names match ``pyspark.sql.functions.str_to_map``; pyspark types
the delimiters as column-or-name, so a bare ``str`` is treated as a column
name. Pass :func:`~datafusion.lit` for a literal delimiter.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.str_to_map(dfn.lit("a:1,b:2")).alias("v"))
>>> r.collect_column("v")[0].as_py()
[('a', '1'), ('b', '2')]
>>> r = df.select(
... dfn.functions.spark.str_to_map(
... dfn.lit("a=1;b=2"),
... pairDelim=dfn.lit(";"),
... keyValueDelim=dfn.lit("="),
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
[('a', '1'), ('b', '2')]
"""
pd = _to_raw_expr(pairDelim) if pairDelim is not None else Expr.literal(",").expr
kvd = (
_to_raw_expr(keyValueDelim)
if keyValueDelim is not None
else Expr.literal(":").expr
)
return Expr(_f.str_to_map(text.expr, pd, kvd))
# ---------------------------------------------------------------------------
# Math functions
# ---------------------------------------------------------------------------
def abs(col: Expr) -> Expr:
"""Spark ``abs``: absolute value.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.abs(dfn.lit(-5)).alias("v"))
>>> r.collect_column("v")[0].as_py()
5
"""
return Expr(_f.abs(col.expr))
def ceil(col: Expr) -> Expr:
"""Spark ``ceil``: smallest integer ≥ arg.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.ceil(dfn.lit(1.2)).alias("v"))
>>> r.collect_column("v")[0].as_py()
2
"""
return Expr(_f.ceil(col.expr))
def expm1(col: Expr) -> Expr:
"""Spark ``expm1``: exp(arg) - 1.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.expm1(dfn.lit(0.0)).alias("v"))
>>> r.collect_column("v")[0].as_py()
0.0
"""
return Expr(_f.expm1(col.expr))
def factorial(col: Expr) -> Expr:
"""Spark ``factorial``: n! for n in [0..20], else NULL.
Examples:
>>> import pyarrow as pa
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.factorial(
... dfn.lit(pa.scalar(5, type=pa.int32()))
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
120
"""
return Expr(_f.factorial(col.expr))
def floor(col: Expr) -> Expr:
"""Spark ``floor``: largest integer ≤ arg.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.floor(dfn.lit(1.8)).alias("v"))
>>> r.collect_column("v")[0].as_py()
1
"""
return Expr(_f.floor(col.expr))
def hex(col: Expr) -> Expr:
"""Spark ``hex``: hexadecimal representation.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.hex(dfn.lit(255)).alias("v"))
>>> r.collect_column("v")[0].as_py()
'FF'
"""
return Expr(_f.hex(col.expr))
def modulus(dividend: Expr | float, divisor: Expr | float) -> Expr:
"""Spark ``mod``: remainder of ``dividend / divisor`` (sign follows dividend).
``dividend`` and ``divisor`` accept native numbers or :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.modulus(dfn.lit(10), dfn.lit(3)).alias("v"))
>>> r.collect_column("v")[0].as_py()
1
"""
return Expr(_f.modulus(coerce_to_expr(dividend).expr, coerce_to_expr(divisor).expr))
def pmod(dividend: Expr | float, divisor: Expr | float) -> Expr:
"""Spark ``pmod``: positive remainder of division.
``dividend`` and ``divisor`` accept native numbers or :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.pmod(dfn.lit(-1), dfn.lit(3)).alias("v"))
>>> r.collect_column("v")[0].as_py()
2
"""
return Expr(_f.pmod(coerce_to_expr(dividend).expr, coerce_to_expr(divisor).expr))
def rint(col: Expr) -> Expr:
"""Spark ``rint``: round to nearest mathematical integer (as double).
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.rint(dfn.lit(2.5)).alias("v"))
>>> r.collect_column("v")[0].as_py()
2.0
"""
return Expr(_f.rint(col.expr))
def round(col: Expr, scale: Expr | int | None = None) -> Expr:
"""Spark ``round``: round to ``scale`` decimal places, HALF_UP rounding.
``scale`` defaults to zero when omitted, matching pyspark, and accepts a
native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.round(dfn.lit(2.5)).alias("v"))
>>> r.collect_column("v")[0].as_py()
3.0
>>> r = df.select(
... dfn.functions.spark.round(dfn.lit(2.345), scale=2).alias("v"))
>>> r.collect_column("v")[0].as_py()
2.35
"""
scale_expr = coerce_to_expr(scale) if scale is not None else _ZERO_I32
return Expr(_f.round(col.expr, scale_expr.expr))
def unhex(col: Expr) -> Expr:
r"""Spark ``unhex``: convert hexadecimal string to binary.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.unhex(dfn.lit("FF")).alias("v"))
>>> r.collect_column("v")[0].as_py()
b'\xff'
"""
return Expr(_f.unhex(col.expr))
def width_bucket(
v: Expr,
min: Expr,
max: Expr,
numBucket: Expr | int, # noqa: N803
) -> Expr:
"""Spark ``width_bucket``: bucket number for ``value`` in equi-width histogram.
``numBucket`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.width_bucket(
... dfn.lit(5.0), dfn.lit(0.0), dfn.lit(10.0), 5
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
3
"""
return Expr(
_f.width_bucket(v.expr, min.expr, max.expr, coerce_to_expr(numBucket).expr)
)
def csc(col: Expr) -> Expr:
"""Spark ``csc``: cosecant.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.csc(dfn.lit(1.5708)).alias("v"))
>>> f"{r.collect_column('v')[0].as_py():.4f}"
'1.0000'
"""
return Expr(_f.csc(col.expr))
def sec(col: Expr) -> Expr:
"""Spark ``sec``: secant.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.sec(dfn.lit(0.0)).alias("v"))
>>> r.collect_column("v")[0].as_py()
1.0
"""
return Expr(_f.sec(col.expr))
def negative(col: Expr) -> Expr:
"""Spark ``negative``: unary minus.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.negative(dfn.lit(3)).alias("v"))
>>> r.collect_column("v")[0].as_py()
-3
"""
return Expr(_f.negative(col.expr))
def bin(col: Expr) -> Expr:
"""Spark ``bin``: binary string representation of a long.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.bin(dfn.lit(7)).alias("v"))
>>> r.collect_column("v")[0].as_py()
'111'
"""
return Expr(_f.bin(col.expr))
# ---------------------------------------------------------------------------
# String functions
# ---------------------------------------------------------------------------
def ascii(col: Expr) -> Expr:
"""Spark ``ascii``: code point of the first character.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.ascii(dfn.lit("A")).alias("v"))
>>> r.collect_column("v")[0].as_py()
65
"""
return Expr(_f.ascii(col.expr))
def base64(col: Expr) -> Expr:
"""Spark ``base64``: encode binary as a base64 string.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.base64(dfn.lit(b"hi")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'aGk='
"""
return Expr(_f.base64(col.expr))
def char(col: Expr) -> Expr:
"""Spark ``char``: ASCII character for a code point (mod 256).
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.char(dfn.lit(65)).alias("v"))
>>> r.collect_column("v")[0].as_py()
'A'
"""
return Expr(_f.char(col.expr))
def concat(*cols: Expr) -> Expr:
"""Spark ``concat``: concatenates strings; NULL if any input is NULL.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.concat(dfn.lit("a"), dfn.lit("b")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'ab'
"""
return Expr(_f.concat(*[c.expr for c in cols]))
def elt(*inputs: Expr) -> Expr:
"""Spark ``elt``: returns the n-th input (1-indexed).
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.elt(
... dfn.lit(2), dfn.lit("a"), dfn.lit("b")
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'b'
"""
return Expr(_f.elt(*[i.expr for i in inputs]))
def ilike(
str: Expr,
pattern: Expr | str,
escapeChar: str | None = None, # noqa: N803
) -> Expr:
"""Spark ``ilike``: case-insensitive pattern match.
A bare ``str`` ``pattern`` is treated as a column name (matching pyspark),
not a literal; pass :func:`~datafusion.lit` for a literal pattern.
``escapeChar`` is accepted for pyspark parity but is not yet wired through
the Rust binding; passing a non-``None`` value raises ``NotImplementedError``.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.ilike(dfn.lit("HELLO"), dfn.lit("h%")).alias("v"))
>>> r.collect_column("v")[0].as_py()
True
"""
if escapeChar is not None:
msg = "ilike(escapeChar=...) is not yet supported by the Spark UDF binding"
raise NotImplementedError(msg)
return Expr(_f.ilike(str.expr, _to_raw_expr(pattern)))
def length(col: Expr) -> Expr:
"""Spark ``length``: character length of a string, or byte length of binary.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.length(dfn.lit("hello")).alias("v"))
>>> r.collect_column("v")[0].as_py()
5
"""
return Expr(_f.length(col.expr))
def like(
str: Expr,
pattern: Expr | str,
escapeChar: str | None = None, # noqa: N803
) -> Expr:
"""Spark ``like``: case-sensitive pattern match.
A bare ``str`` ``pattern`` is treated as a column name (matching pyspark),
not a literal; pass :func:`~datafusion.lit` for a literal pattern.
``escapeChar`` is accepted for pyspark parity but is not yet wired through
the Rust binding; passing a non-``None`` value raises ``NotImplementedError``.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.like(dfn.lit("hello"), dfn.lit("h%")).alias("v"))
>>> r.collect_column("v")[0].as_py()
True
"""
if escapeChar is not None:
msg = "like(escapeChar=...) is not yet supported by the Spark UDF binding"
raise NotImplementedError(msg)
return Expr(_f.like(str.expr, _to_raw_expr(pattern)))
def luhn_check(col: Expr) -> Expr:
"""Spark ``luhn_check``: true if the digit string passes the Luhn check.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.luhn_check(
... dfn.lit("4111111111111111")
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
True
"""
return Expr(_f.luhn_check(col.expr))
def format_string(format: str | Expr, *cols: Expr) -> Expr:
"""Spark ``format_string``: printf-style format string.
``format`` is the printf-style template (a plain ``str`` is auto-promoted
to a literal expression); remaining args are values to substitute.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.format_string(
... "%d-%s", dfn.lit(42), dfn.lit("hi")
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'42-hi'
"""
fmt_expr = format if isinstance(format, Expr) else Expr.literal(format)
return Expr(_f.format_string(fmt_expr.expr, *[c.expr for c in cols]))
def space(col: Expr | int) -> Expr:
"""Spark ``space``: string of n spaces.
``col`` accepts a native ``int`` or an :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.space(3).alias("v"))
>>> r.collect_column("v")[0].as_py()
' '
"""
return Expr(_f.space(_coerce_i32(col).expr))
def substring(str: Expr, pos: Expr | int, len: Expr | int) -> Expr:
"""Spark ``substring``: 1-indexed substring starting at ``pos`` of given ``length``.
Negative ``pos`` counts from the end. ``pos`` and ``len`` accept native
``int`` values or :class:`Expr`.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.substring(dfn.lit("hello"), 1, 3).alias("v"))
>>> r.collect_column("v")[0].as_py()
'hel'
"""
return Expr(
_f.substring(str.expr, coerce_to_expr(pos).expr, coerce_to_expr(len).expr)
)
def unbase64(col: Expr) -> Expr:
"""Spark ``unbase64``: decode a base64 string to binary.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.unbase64(dfn.lit("aGk=")).alias("v"))
>>> r.collect_column("v")[0].as_py()
b'hi'
"""
return Expr(_f.unbase64(col.expr))
def soundex(col: Expr) -> Expr:
"""Spark ``soundex``: Soundex phonetic code.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(dfn.functions.spark.soundex(dfn.lit("Robert")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'R163'
"""
return Expr(_f.soundex(col.expr))
def is_valid_utf8(str: Expr) -> Expr:
"""Spark ``is_valid_utf8``: true if the string is valid UTF-8.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.is_valid_utf8(dfn.lit("hello")).alias("v"))
>>> r.collect_column("v")[0].as_py()
True
"""
return Expr(_f.is_valid_utf8(str.expr))
def make_valid_utf8(str: Expr) -> Expr:
"""Spark ``make_valid_utf8``: replace invalid UTF-8 bytes with U+FFFD.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.make_valid_utf8(dfn.lit("hello")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'hello'
"""
return Expr(_f.make_valid_utf8(str.expr))
# ---------------------------------------------------------------------------
# URL functions
# ---------------------------------------------------------------------------
def parse_url(
url: Expr,
partToExtract: Expr | str, # noqa: N803
key: Expr | str | None = None,
) -> Expr:
"""Spark ``parse_url``: extract a part from a URL; errors on invalid URLs.
``partToExtract`` is one of ``"HOST"``, ``"PATH"``, ``"QUERY"``,
``"REF"``, ``"PROTOCOL"``, ``"FILE"``, ``"AUTHORITY"``, ``"USERINFO"``.
Pass ``key`` only with ``"QUERY"`` to extract a single parameter. Bare
``str`` values for ``partToExtract``/``key`` are treated as column names
(matching pyspark); pass :func:`~datafusion.lit` for a literal.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.parse_url(
... dfn.lit("http://example.com/path?q=1"), dfn.lit("HOST")
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'example.com'
>>> r = df.select(
... dfn.functions.spark.parse_url(
... dfn.lit("http://example.com/path?q=1"),
... dfn.lit("QUERY"),
... key=dfn.lit("q"),
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'1'
"""
if key is None:
return Expr(_f.parse_url(url.expr, _to_raw_expr(partToExtract)))
return Expr(_f.parse_url(url.expr, _to_raw_expr(partToExtract), _to_raw_expr(key)))
def try_parse_url(
url: Expr,
partToExtract: Expr | str, # noqa: N803
key: Expr | str | None = None,
) -> Expr:
"""Spark ``try_parse_url``: like ``parse_url`` but returns NULL on invalid URLs.
Bare ``str`` values for ``partToExtract``/``key`` are treated as column
names (matching pyspark); pass :func:`~datafusion.lit` for a literal.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.try_parse_url(
... dfn.lit("http://example.com/"), dfn.lit("HOST")
... ).alias("v")
... )
>>> r.collect_column("v")[0].as_py()
'example.com'
"""
if key is None:
return Expr(_f.try_parse_url(url.expr, _to_raw_expr(partToExtract)))
return Expr(
_f.try_parse_url(url.expr, _to_raw_expr(partToExtract), _to_raw_expr(key))
)
def url_decode(str: Expr) -> Expr:
"""Spark ``url_decode``: decode an application/x-www-form-urlencoded string.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.url_decode(dfn.lit("a%20b")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'a b'
"""
return Expr(_f.url_decode(str.expr))
def try_url_decode(str: Expr) -> Expr:
"""Spark ``try_url_decode``: like ``url_decode``; returns NULL on invalid input.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.try_url_decode(dfn.lit("a%20b")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'a b'
"""
return Expr(_f.try_url_decode(str.expr))
def url_encode(str: Expr) -> Expr:
"""Spark ``url_encode``: encode a string in application/x-www-form-urlencoded.
Examples:
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1]})
>>> r = df.select(
... dfn.functions.spark.url_encode(dfn.lit("a b")).alias("v"))
>>> r.collect_column("v")[0].as_py()
'a+b'
"""
return Expr(_f.url_encode(str.expr))
__all__ = [
# Math
"abs",
# Datetime
"add_months",
"array",
# Array
"array_contains",
"array_repeat",
# String
"ascii",
# Aggregate
"avg",
"base64",
"bin",
"bit_count",
# Bitwise
"bit_get",
"bitmap_bit_position",
"bitmap_bucket_number",
# Bitmap
"bitmap_count",
"bitwise_not",
"ceil",
"char",
"collect_list",
"collect_set",
"concat",
# Hash
"crc32",
"csc",
"date_add",
"date_diff",
"date_part",
"date_sub",
"date_trunc",
"elt",
"expm1",
"factorial",
"floor",
"format_string",
"from_utc_timestamp",
"hex",
"hour",
"if_",
"ilike",
"is_valid_utf8",
# JSON
"json_tuple",
"last_day",
"length",
"like",
"luhn_check",
"make_dt_interval",
"make_interval",
"make_valid_utf8",
# Map
"map_from_arrays",
"map_from_entries",
"minute",
"modulus",
"negative",
"next_day",
# URL
"parse_url",
"pmod",
"rint",
"round",
"sec",
"second",
"sha1",
"sha2",
"shiftleft",
"shiftright",
"shiftrightunsigned",
"shuffle",
# Collection / Conditional / Conversion
"size",
"slice",
"soundex",
"space",
"spark_cast",
"str_to_map",
"substring",
"time_trunc",
"to_utc_timestamp",
"trunc",
"try_parse_url",
"try_sum",
"try_url_decode",
"unbase64",
"unhex",
"unix_date",
"unix_micros",
"unix_millis",
"unix_seconds",
"url_decode",
"url_encode",
"width_bucket",
"xxhash64",
]