blob: 35beff19c981d980da4750d2d9ea4f07a22effc6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable, Dict, List, Optional, Union, overload
from pyspark.errors import PySparkTypeError
from pyspark.pipelines.graph_element_registry import get_active_graph_element_registry
from pyspark.pipelines.type_error_utils import validate_optional_list_of_str_arg
from pyspark.pipelines.flow import Flow, QueryFunction
from pyspark.pipelines.source_code_location import (
get_caller_source_code_location,
)
from pyspark.pipelines.dataset import (
MaterializedView,
StreamingTable,
TemporaryView,
)
from pyspark.sql.types import StructType
def append_flow(
*,
target: str,
name: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
) -> Callable[[QueryFunction], None]:
"""
Return a decorator on a query function to define a flow in a pipeline.
:param name: The name of the flow. If unspecified, the query function's name will be used.
:param target: The name of the dataset this flow writes to. Must be specified.
:param spark_conf: A dict whose keys are the conf names and values are the conf values. \
These confs will be set when the flow is executed; they can override confs set for the \
destination, for the pipeline, or on the cluster.
"""
if name is not None and type(name) is not str:
raise PySparkTypeError(
errorClass="NOT_STR",
messageParameters={"arg_name": "name", "arg_type": type(name).__name__},
)
source_code_location = get_caller_source_code_location(stacklevel=1)
if spark_conf is None:
spark_conf = {}
def outer(func: QueryFunction) -> None:
query_name = name if name is not None else func.__name__
flow = Flow(
name=query_name,
target=target,
spark_conf=spark_conf,
source_code_location=source_code_location,
func=func,
)
get_active_graph_element_registry().register_flow(flow)
return outer
def _validate_stored_dataset_args(
name: Optional[str],
table_properties: Optional[Dict[str, str]],
partition_cols: Optional[List[str]],
) -> None:
if name is not None and type(name) is not str:
raise PySparkTypeError(
errorClass="NOT_STR",
messageParameters={"arg_name": "name", "arg_type": type(name).__name__},
)
if table_properties is not None and not isinstance(table_properties, dict):
raise PySparkTypeError(
errorClass="NOT_DICT",
messageParameters={
"arg_name": "table_properties",
"arg_type": type(table_properties).__name__,
},
)
validate_optional_list_of_str_arg(arg_name="partition_cols", arg_value=partition_cols)
@overload
def table(query_function: QueryFunction) -> None:
...
@overload
def table(
*,
query_function: None = None,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
table_properties: Optional[Dict[str, str]] = None,
partition_cols: Optional[List[str]] = None,
schema: Optional[Union[StructType, str]] = None,
) -> Callable[[QueryFunction], None]:
...
def table(
query_function: Optional[QueryFunction] = None,
*,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
table_properties: Optional[Dict[str, str]] = None,
partition_cols: Optional[List[str]] = None,
schema: Optional[Union[StructType, str]] = None,
format: Optional[str] = None,
) -> Union[Callable[[QueryFunction], None], None]:
"""
(Return a) decorator to define a table in the pipeline and mark a function as the table's query
function.
@table can be used with or without parameters. If called without parameters, Python will
implicitly pass the decorated query function as the query_function param. If called with
parameters, @table will return a decorator that is applied on the decorated query function.
:param query_function: The table's query function. This parameter should not be explicitly \
passed by users. This is passed implicitly by Python if the decorator is called without \
parameters.
:param name: The name of the dataset. If unspecified, the query function's name will be used.
:param comment: Description of the dataset.
:param spark_conf: A dict whose keys are the conf names and values are the conf values. \
These confs will be set when the query for the dataset is executed and they can override \
confs set for the pipeline or on the cluster.
:param table_properties: A dict where the keys are the property names and the values are the \
property values. These properties will be set on the table.
:param partition_cols: A list containing the column names of the partition columns.
:param schema: Explicit Spark SQL schema to materialize this table with. Supports either a \
Pyspark StructType or a SQL DDL string, such as "a INT, b STRING".
:param format: The format of the table, e.g. "parquet".
"""
_validate_stored_dataset_args(name, table_properties, partition_cols)
source_code_location = get_caller_source_code_location(stacklevel=1)
def outer(
decorated: QueryFunction,
) -> None:
_validate_decorated(decorated, "table")
resolved_name = name or decorated.__name__
registry = get_active_graph_element_registry()
registry.register_dataset(
StreamingTable(
comment=comment,
name=resolved_name,
table_properties=table_properties or {},
partition_cols=partition_cols,
schema=schema,
source_code_location=source_code_location,
format=format,
)
)
registry.register_flow(
Flow(
name=resolved_name,
target=resolved_name,
spark_conf=spark_conf or {},
source_code_location=source_code_location,
func=decorated,
)
)
if query_function is not None:
# Case where the decorator is called without parameters, e.g.:
# @table
# def query_fn():
# return ...
outer(query_function)
return None
else:
# Case where the decorator is called with parameters, e.g.:
# @table(name="tbl")
# def query_fn():
# return ...
return outer
@overload
def materialized_view(query_function: QueryFunction) -> None:
...
@overload
def materialized_view(
*,
query_function: None = None,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
table_properties: Optional[Dict[str, str]] = None,
partition_cols: Optional[List[str]] = None,
schema: Optional[Union[StructType, str]] = None,
) -> Callable[[QueryFunction], None]:
...
def materialized_view(
query_function: Optional[QueryFunction] = None,
*,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
table_properties: Optional[Dict[str, str]] = None,
partition_cols: Optional[List[str]] = None,
schema: Optional[Union[StructType, str]] = None,
format: Optional[str] = None,
) -> Union[Callable[[QueryFunction], None], None]:
"""
(Return a) decorator to define a materialized view in the pipeline and mark a function as the
materialized view's query function.
@materialized_view can be used with or without parameters. If called without parameters, Python
will implicitly pass the decorated query function as the query_function param. If called with
parameters, it will return a decorator that is applied on the decorated query function.
:param query_function: The table's query function. This parameter should not be explicitly \
passed by users. This is passed implicitly by Python if the decorator is called without \
parameters.
:param name: The name of the dataset. If unspecified, the query function's name will be used.
:param comment: Description of the dataset.
:param spark_conf: A dict whose keys are the conf names and values are the conf values. \
These confs will be set when the query for the dataset is executed and they can override \
confs set for the pipeline or on the cluster.
:param table_properties: A dict where the keys are the property names and the values are the \
property values. These properties will be set on the table.
:param partition_cols: A list containing the column names of the partition columns.
:param schema: Explicit Spark SQL schema to materialize this table with. Supports either a \
Pyspark StructType or a SQL DDL string, such as "a INT, b STRING".
:param format: The format of the table, e.g. "parquet".
"""
_validate_stored_dataset_args(name, table_properties, partition_cols)
source_code_location = get_caller_source_code_location(stacklevel=1)
def outer(
decorated: QueryFunction,
) -> None:
_validate_decorated(decorated, "materialized_view")
resolved_name = name or decorated.__name__
registry = get_active_graph_element_registry()
registry.register_dataset(
MaterializedView(
comment=comment,
name=resolved_name,
table_properties=table_properties or {},
partition_cols=partition_cols,
schema=schema,
source_code_location=source_code_location,
format=format,
)
)
registry.register_flow(
Flow(
name=resolved_name,
target=resolved_name,
spark_conf=spark_conf or {},
source_code_location=source_code_location,
func=decorated,
)
)
if query_function is not None:
# Case where the decorator is called without parameters, e.g.:
# @materialized_view
# def query_fn():
# return ...
outer(query_function)
return None
else:
# Case where the decorator is called with parameters, e.g.:
# @materialized_view(name="tbl")
# def query_fn():
# return ...
return outer
@overload
def temporary_view(
query_function: QueryFunction,
) -> None:
...
@overload
def temporary_view(
*,
query_function: None = None,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
) -> Callable[[QueryFunction], None]:
...
def temporary_view(
query_function: Optional[QueryFunction] = None,
*,
name: Optional[str] = None,
comment: Optional[str] = None,
spark_conf: Optional[Dict[str, str]] = None,
) -> Union[Callable[[QueryFunction], None], None]:
"""
(Return a) decorator to define a view in the pipeline and mark a function as the view's query
function.
@view can be used with or without parameters. If called without parameters, Python will
implicitly pass the decorated query function as the query_function param. If called with
parameters, @view will return a decorator that is applied on the decorated query function.
:param query_function: The view's query function. This parameter should not be explicitly \
passed by users. This is passed implicitly by Python if the decorator is called without \
parameters.
:param name: The name of the dataset. If unspecified, the query function's name will be used.
:param comment: Description of the dataset.
:param spark_conf: A dict whose keys are the conf names and values are the conf values. \
These confs will be set when the query for the dataset is executed and they can override \
confs set for the pipeline or on the cluster.
"""
if name is not None and type(name) is not str:
raise PySparkTypeError(
errorClass="NOT_STR",
messageParameters={"arg_name": "name", "arg_type": type(name).__name__},
)
source_code_location = get_caller_source_code_location(stacklevel=1)
def outer(decorated: QueryFunction) -> None:
_validate_decorated(decorated, "temporary_view")
resolved_name = name or decorated.__name__
registry = get_active_graph_element_registry()
registry.register_dataset(
TemporaryView(
comment=comment,
name=resolved_name,
source_code_location=source_code_location,
)
)
registry.register_flow(
Flow(
target=resolved_name,
func=decorated,
spark_conf=spark_conf or {},
name=resolved_name,
source_code_location=source_code_location,
)
)
if query_function is not None:
# Case where the decorator is called without parameters, e.g.:
# @temporary_view
# def query_fn():
# return ...
outer(query_function)
return None
else:
# Case where the decorator is called with parameters, e.g.:
# @temporary_view(name="tbl")
# def query_fn():
# return ...
return outer
def _validate_decorated(decorated: QueryFunction, decorator_name: str) -> None:
if not callable(decorated):
raise PySparkTypeError(
errorClass="DECORATOR_ARGUMENT_NOT_CALLABLE",
messageParameters={
"decorator_name": decorator_name,
"example_usage": f"@{decorator_name}(name='{decorator_name}_a')",
},
)
def create_streaming_table(
name: str,
*,
comment: Optional[str] = None,
table_properties: Optional[Dict[str, str]] = None,
partition_cols: Optional[List[str]] = None,
schema: Optional[Union[StructType, str]] = None,
format: Optional[str] = None,
) -> None:
"""
Creates a table that can be targeted by append flows.
Example:
create_streaming_table("target")
:param name: The name of the table.
:param comment: Description of the table.
:param table_properties: A dict where the keys are the property names and the values are the \
property values. These properties will be set on the table.
:param partition_cols: A list containing the column names of the partition columns.
:param schema Explicit Spark SQL schema to materialize this table with. Supports either a \
Pyspark StructType or a SQL DDL string, such as "a INT, b STRING".
:param format: The format of the table, e.g. "parquet".
"""
if type(name) is not str:
raise PySparkTypeError(
errorClass="NOT_STR",
messageParameters={"arg_name": "name", "arg_type": type(name).__name__},
)
if table_properties is not None and not isinstance(table_properties, dict):
raise PySparkTypeError(
errorClass="NOT_DICT",
messageParameters={
"arg_name": "table_properties",
"arg_type": type(table_properties).__name__,
},
)
validate_optional_list_of_str_arg(arg_name="partition_cols", arg_value=partition_cols)
source_code_location = get_caller_source_code_location(stacklevel=1)
table = StreamingTable(
name=name,
comment=comment,
source_code_location=source_code_location,
table_properties=table_properties or {},
partition_cols=partition_cols,
schema=schema,
format=format,
)
get_active_graph_element_registry().register_dataset(table)