blob: d86ad74f1ce0a07d73a45b450cd9aa5bf324ae0a [file] [log] [blame]
import abc
import inspect
import sys
import typing
from collections import defaultdict
from types import ModuleType
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, TypedDict, Union
_sys_version_info = sys.version_info
_version_tuple = (_sys_version_info.major, _sys_version_info.minor, _sys_version_info.micro)
if _version_tuple < (3, 11, 0):
from typing_extensions import NotRequired
else:
from typing import NotRequired
# Copied this over from function_graph
# TODO -- determine the best place to put this code
from hamilton import graph_utils, node
from hamilton.function_modifiers import base, dependencies
from hamilton.function_modifiers.base import InvalidDecoratorException, NodeTransformer
from hamilton.function_modifiers.dependencies import (
LiteralDependency,
ParametrizedDependency,
UpstreamDependency,
)
def assign_namespace(node_name: str, namespace: str) -> str:
return f"{namespace}.{node_name}"
def derive_type(dependency: dependencies.LiteralDependency):
"""Quick hack to derive the type of a static dependency.
We might want to consider the type provided by the function that needs it.
Or we can use the subclass checker/whatnot in function_graph
(althoyugh we'll want to move it out)
:param dependency: Dependency on which
:return: The type of the dependency
"""
return type(dependency.value)
def create_identity_node(
from_: str, typ: Type[Type], name: str, namespace: Tuple[str, ...], tags: Dict[str, Any]
) -> node.Node:
"""Creates an identity node -- this passes through the exact
value returned by the upstream node.
:param from_: Source node
:param typ: Type of the input node
:param name: Name of the final node to create
:param namespace: Namespace of the node
:return: A node that simply copies the source node
"""
def identity(**kwargs):
return list(kwargs.values())[0] # Maybe come up with a better way to do this
return node.Node(
name=name,
typ=typ,
doc_string="",
callabl=identity,
input_types={from_: typ},
namespace=namespace,
tags=tags,
# TODO -- add tags?
)
def extract_all_known_types(nodes: Collection[node.Node]) -> Dict[str, Type[Type]]:
"""Extracts all known types from a set of nodes given the dependencies.
We have to do this as we don't know the dependency types at compile-time of
upstream nodes. That said, this is only used for guessing dependency types of
identity nodes. In which case, we probably want some sort of sentinel "pass-through"
dependency type that handles this better. But, for now, we'll derive it from the
dependencies we've seen.
:param nodes: nodes to look through for dependencies
:return: A dictionary of all known types.
"""
observed_types = {}
for node_ in nodes:
for dep_name, (type_, _) in node_.input_types.items():
observed_types[dep_name] = type_
return observed_types
def create_static_node(
typ: Type, name: str, value: Any, namespace: Tuple[str, ...], tags: Dict[str, Any]
) -> node.Node:
"""Utility function to create a static node -- this helps us bridge nodes together.
:param typ: Type of the node to create
:param name: Name of the node to create
:param value: Value that the node's function always returns
:param namespace: Namespace of the node
:return: The instantiated static node
"""
def node_fn(_value=value):
return _value
return node.Node(
name=name, typ=typ, callabl=node_fn, input_types={}, namespace=namespace, tags=tags
)
def _validate_config_inputs(config: Dict[str, Any], inputs: Dict[str, Any]):
"""Validates that the inputs specified in the config are valid.
:param original_config: Original configuration
:return: None
"""
# TODO -- implement this
shared_keys = set(config.keys()).intersection(set(inputs.keys()))
if shared_keys:
raise InvalidDecoratorException(
f"Config keys {shared_keys} are shared with inputs. This is not allowed."
f"Instead, please specify the inputs you need *just* as part of the config. "
f"That way, you only write them once! Or, if you don't need them as a config item,"
f"just use them in inputs."
)
for key, value in inputs.items():
if not isinstance(value, (UpstreamDependency, LiteralDependency)):
raise InvalidDecoratorException(
f"Input {key} must be either an UpstreamDependency or a LiteralDependency ,"
f" not {type(value)}."
)
def _resolve_subdag_configuration(
configuration: Dict[str, Any], fields: Dict[str, Any], function_name: str
) -> Dict[str, Any]:
"""Resolves the configuration for a subdag.
:param configuration: the Hamilton configuration
:param fields: the fields passed to the subdag decorator
:return: resolved configuration to use for this subdag.
"""
sources_to_map = {}
values_to_include = {}
for key, value in fields.items():
if isinstance(value, dependencies.ConfigDependency):
sources_to_map[key] = value.source
elif isinstance(value, dependencies.LiteralDependency):
values_to_include[key] = value.value
elif isinstance(value, (dependencies.GroupedDependency, dependencies.SingleDependency)):
raise InvalidDecoratorException(
f"`{value}` is not allowed in the config= part of the subdag decorator. "
"Please use `configuration()` or `value()` or literal python values."
)
plain_configs = {
k: v for k, v in fields.items() if k not in sources_to_map and k not in values_to_include
}
resolved_config = dict(configuration, **plain_configs, **values_to_include)
# override any values from sources
for key, source in sources_to_map.items():
try:
resolved_config[key] = resolved_config[source]
except KeyError as e:
raise InvalidDecoratorException(
f"Source {source} was not found in the configuration. This is required for the {function_name} subdag."
) from e
return resolved_config
NON_FINAL_TAGS = {NodeTransformer.NON_FINAL_TAG: True}
class subdag(base.NodeCreator):
"""The `@subdag` decorator enables you to rerun components of your DAG with varying parameters.
That is, it enables you to "chain" what you could express with a driver into a single DAG.
That is, instead of using Hamilton within itself:
.. code-block:: python
def feature_engineering(source_path: str) -> pd.DataFrame:
'''You could recursively use Hamilton within itself.'''
dr = driver.Driver({}, feature_modules)
df = dr.execute(["feature_df"], inputs={"path": source_path})
return df
You instead can use the `@subdag` decorator to do the same thing, with the added benefit of visibility into the\
whole DAG:
.. code-block:: python
@subdag(
feature_modules,
inputs={"path": source("source_path")},
config={}
)
def feature_engineering(feature_df: pd.DataFrame) -> pd.DataFrame:
return feature_df
Note that this is immensely powerful -- if we draw analogies from Hamilton to standard procedural programming \
paradigms, we might have the following correspondence:
- `config.when` + friends -- `if/else` statements
- `parameterize`/`extract_columns` -- `for` loop
- `does` -- effectively macros
And so on. `@subdag` takes this one step further:
- `@subdag` -- subroutine definition
E.G. take a certain set of nodes, and run them with specified parameters.
@subdag declares parameters that are outputs of its subdags. Note that, if you want to use outputs of other
components of the DAG, you can use the `external_inputs` parameter to declare the parameters that do *not* come
from the subDAG.
Why might you want to use this? Let's take a look at some examples:
1. You have a feature engineering pipeline that you want to run on multiple datasets. If its exactly the same, \
this is perfect. If not, this works perfectly as well, you just have to utilize different functions in each or \
the `config.when` + `config` parameter to rerun it.
2. You want to train multiple models in the same DAG that share some logic (in features or training) -- this \
allows you to reuse and continually add more.
3. You want to combine multiple similar DAGs (e.g. one for each business line) into one so you can build a \
cross-business line model.
This basically bridges the gap between the flexibility of non-declarative pipelining frameworks with the \
readability/maintainability of declarative ones.
"""
def __init__(
self,
*load_from: Union[ModuleType, Callable],
inputs: Dict[str, ParametrizedDependency] = None,
config: Dict[str, Any] = None,
namespace: str = None,
final_node_name: str = None,
external_inputs: List[str] = None,
):
"""Adds a subDAG to the main DAG.
:param load_from: The functions that will be used to generate this subDAG.
:param inputs: Parameterized dependencies to inject into all sources of this subDAG.
This should *not* be an intermediate node in the subDAG.
:param config: A configuration dictionary for *just* this subDAG. Note that this passed in
value takes precedence over the DAG's config.
:param namespace: Namespace with which to prefix nodes. This is optional -- if not included,
this will default to the function name.
:param final_node_name: Name of the final node in the subDAG. This is optional -- if not included,
this will default to the function name.
:param external_inputs: Parameters in the function that are not produced by the functions
passed to the subdag. This is useful if you want to perform some logic with other inputs
in the subdag's processing function. Note that this is currently required to
differentiate and clarify the inputs to the subdag.
"""
self.subdag_functions = subdag.collect_functions(load_from)
self.inputs = inputs if inputs is not None else {}
self.config = config if config is not None else {}
self.external_inputs = external_inputs if external_inputs is not None else []
_validate_config_inputs(self.config, self.inputs)
self.namespace = namespace
self.final_node_name = final_node_name
@staticmethod
def collect_functions(
load_from: Union[Collection[ModuleType], Collection[Callable]],
) -> List[Callable]:
"""Utility function to collect functions from a list of callables/modules.
:param load_from: A list of callables or modules to load from
:return: a list of callables to use to create a DAG.
"""
if len(load_from) == 0:
raise ValueError(f"No functions were passed to {subdag.__name__}(load_from=...)")
out = []
for item in load_from:
if isinstance(item, Callable):
out.append(item)
out.extend(
[function for _, function in graph_utils.find_functions(function_module=item)]
)
return out
@staticmethod
def collect_nodes(config: Dict[str, Any], subdag_functions: List[Callable]) -> List[node.Node]:
nodes = []
for fn in subdag_functions:
for node_ in base.resolve_nodes(fn, config):
nodes.append(node_.copy_with(tags={**node_.tags, **NON_FINAL_TAGS}))
return nodes
def _create_additional_static_nodes(
self, nodes: Collection[node.Node], namespace: str
) -> Collection[node.Node]:
# These already have the namespace on them
# This allows us to inject values into the replayed subdag
node_types = extract_all_known_types(nodes)
out = []
for key, value in self.inputs.items():
# TODO -- fix type derivation. Currently we don't use the specified type as we don't
# really know what it should be...
new_node_name = assign_namespace(key, namespace)
if value.get_dependency_type() == dependencies.ParametrizedDependencySource.LITERAL:
out.append(
create_static_node(
typ=derive_type(value),
name=key,
value=value.value,
namespace=(namespace,),
tags=NON_FINAL_TAGS,
)
)
elif value.get_dependency_type() == dependencies.ParametrizedDependencySource.UPSTREAM:
if new_node_name not in node_types:
continue
out.append(
create_identity_node(
from_=value.source,
typ=node_types[new_node_name],
name=key,
namespace=(namespace,),
tags=NON_FINAL_TAGS,
)
)
for key, value in self.config.items():
out.append(
create_static_node(
typ=type(value),
name=key,
value=value,
namespace=(namespace,),
tags=NON_FINAL_TAGS,
)
)
return out
@staticmethod
def add_namespace(
nodes: List[node.Node],
namespace: str,
inputs: Dict[str, Any] = None,
config: Dict[str, Any] = None,
) -> List[node.Node]:
"""Utility function to add a namespace to nodes.
:param nodes:
:return:
"""
inputs = inputs if inputs is not None else {}
config = config if config is not None else {}
new_nodes = []
new_name_map = {}
# First pass we validate + collect names so we can alter dependencies
for node_ in nodes:
new_name = assign_namespace(node_.name, namespace)
new_name_map[node_.name] = new_name
for dep, _value in inputs.items():
# We create nodes for both namespace assignment and source assignment
# Why? Cause we need unique parameter names, and with source() some can share params
new_name_map[dep] = assign_namespace(dep, namespace)
for dep, _value in config.items():
new_name_map[dep] = assign_namespace(dep, namespace)
# Reassign sources
for node_ in nodes:
# This is not perfect -- we might get strangeness if its dynamically generated
# that said, it should work
is_async = inspect.iscoroutinefunction(node_.callable)
new_name = new_name_map[node_.name]
kwarg_mapping = {
(new_name_map[key] if key in new_name_map else key): key
for key in node_.input_types
}
# Map of argument in function to source, can't be the other way
# around as sources can potentially serve multiple destinations (with the source()) decorator
def fn(
_callabl=node_.callable,
_kwarg_mapping=dict(kwarg_mapping), # noqa: B006
_new_name=new_name,
_new_name_map=dict(new_name_map), # noqa: B006
**kwargs,
):
new_kwargs = {_kwarg_mapping[kwarg]: value for kwarg, value in kwargs.items()}
return _callabl(**new_kwargs)
async def async_fn(
_callabl=node_.callable,
_kwarg_mapping=dict(kwarg_mapping), # noqa: B006
_new_name=new_name,
_new_name_map=dict(new_name_map), # noqa: B006
**kwargs,
):
new_kwargs = {_kwarg_mapping[kwarg]: value for kwarg, value in kwargs.items()}
return await _callabl(**new_kwargs)
new_input_types = {
dep: node_.input_types[original_dep] for dep, original_dep in kwarg_mapping.items()
}
fn_to_use = async_fn if is_async else fn
new_nodes.append(
node_.copy_with(input_types=new_input_types, name=new_name, callabl=fn_to_use)
)
return new_nodes
def add_final_node(self, fn: Callable, node_name: str, namespace: str):
"""
:param fn:
:return:
"""
is_async = inspect.iscoroutinefunction(fn) # determine if its async
node_ = node.Node.from_fn(fn)
namespaced_input_map = {
(assign_namespace(key, namespace) if key not in self.external_inputs else key): key
for key in node_.input_types
}
new_input_types = {
(assign_namespace(key, namespace) if key not in self.external_inputs else key): value
for key, value in node_.input_types.items()
}
def new_function(**kwargs):
kwargs_without_namespace = {
namespaced_input_map[key]: value for key, value in kwargs.items()
}
# Have to translate it back to use the kwargs the fn is expecting
return fn(**kwargs_without_namespace)
async def async_function(**kwargs):
return await new_function(**kwargs)
fn_to_use = async_function if is_async else new_function
return node_.copy_with(name=node_name, input_types=new_input_types, callabl=fn_to_use)
def _derive_namespace(self, fn: Callable) -> str:
"""Utility function to derive a namespace from a function.
:param fn: Function we're decorating.
:return: The function we're outputting.
"""
return fn.__name__ if self.namespace is None else self.namespace
def _derive_name(self, fn: Callable) -> str:
"""Utility function to derive a name from a function.
The user will be able to likely pass this in as an override, but
we have not exposed it yet.
:param fn: Function we're decorating.
:return: The function we're outputting.
"""
return fn.__name__ if self.final_node_name is None else self.final_node_name
def generate_nodes(self, fn: Callable, configuration: Dict[str, Any]) -> Collection[node.Node]:
# Resolve all nodes from passed in functions
# if self.config has configuration() or value() in it, we need to resolve it
resolved_config = _resolve_subdag_configuration(configuration, self.config, fn.__name__)
# resolved_config = dict(configuration, **self.config)
nodes = self.collect_nodes(config=resolved_config, subdag_functions=self.subdag_functions)
# Derive the namespace under which all these nodes will live
namespace = self._derive_namespace(fn)
final_node_name = self._derive_name(fn)
# Rename them all to have the right namespace
nodes = self.add_namespace(nodes, namespace, self.inputs, self.config)
# Create any static input nodes we need to translate
nodes += self._create_additional_static_nodes(nodes, namespace)
# Add the final node that does the translation
nodes += [self.add_final_node(fn, final_node_name, namespace)]
return nodes
def _validate_parameterization(self):
invalid_values = []
for _key, value in self.inputs.items():
if not isinstance(value, dependencies.ParametrizedDependency):
invalid_values.append(value)
if invalid_values:
raise ValueError(
f"Parameterization using the following values is not permitted -- "
f"must be either source() or value(): {invalid_values}"
)
def validate(self, fn):
"""Validates everything we can before we create the subdag.
:param fn: Function that this decorates
:raises InvalidDecoratorException: if this is not a valid decorator
"""
self._validate_parameterization()
def required_config(self) -> Optional[List[str]]:
"""Currently we do not filter for subdag as we do not *statically* know what configuration
is required. This is because we need to parse the function so that we can figure it out,
and that is not available at the time that we call required_config. We need to think about
the best way to do this, but its likely that we'll want to allow required_config to consume
the function itself, and pass it in when its called with that.
That said, we don't have sufficient justification to do that yet, so we're just going to
return None for now, meaning that it has access to all configuration variables.
:return:
"""
return None
class SubdagParams(TypedDict):
inputs: NotRequired[Dict[str, ParametrizedDependency]]
config: NotRequired[Dict[str, Any]]
external_inputs: NotRequired[List[str]]
class parameterized_subdag(base.NodeCreator):
"""parameterized subdag is when you want to create multiple subdags at one time.
Why might you want to do this?
1. You have multiple data sets you want to run the same feature engineering pipeline on.
2. You want to run some sort of optimization routine with a variety of results
3. You want to run some sort of pipeline over slightly different configuration (E.G. region/business line)
Note that this really is just syntactic sugar for creating multiple subdags, just as `@parameterize
is syntactic sugar for creating multiple nodes from a function. That said, it is common that you
won't know what you want until compile time (E.G. when you have the config available), so this
decorator along with the `@resolve` decorator is a good way to make that feasible. Note that
we are getting into *advanced* Hamilton here -- we don't recommend starting with this. In fact,
we generally recommend repeating subdags multiple times if you don't have too many. That said,
that can get cumbersome if you have a lot, so this decorator is a good way to help with that.
Let's take a look at an example:
.. code-block:: python
@parameterized_subdag(
feature_modules,
from_datasource_1={"inputs" : {"data" : value("datasource_1.csv")}},
from_datasource_2={"inputs" : {"data" : value("datasource_2.csv")}},
from_datasource_3={
"inputs" : {"data" : value("datasource_3.csv")},
"config" : {"filter" : "only_even_client_ids"}
}
)
def feature_engineering(feature_df: pd.DataFrame) -> pd.DataFrame:
return feature_df
This is (obviously) contrived, but what it does is create three subdags, each with a different
data source. The third one also applies a configuration to that subdags. Note that we can also
pass in inputs/config to the decorator itself, which will be applied to all subdags.
This is effectively the same as the example above.
.. code-block:: python
@parameterized_subdag(
feature_modules,
inputs={"data" : value("datasource_1.csv")},
from_datasource_1={},
from_datasource_2={
"inputs" : {"data" : value("datasource_2.csv")}
},
from_datasource_3={
"inputs" : {"data" : value("datasource_3.csv")},
"config" : {"filter" : "only_even_client_ids"},
}
)
Again, think about whether this feature is really the one you want -- often times, verbose,
static DAGs are far more readable than very concise, highly parameterized DAGs.
"""
def __init__(
self,
*load_from: Union[ModuleType, Callable],
inputs: Dict[
str, Union[dependencies.ParametrizedDependency, dependencies.LiteralDependency]
] = None,
config: Dict[str, Any] = None,
external_inputs: List[str] = None,
**parameterization: SubdagParams,
):
"""Initializes a parameterized_subdag decorator.
:param load_from: Modules to load from
:param inputs: Inputs for each subdag generated by the decorated function
:param config: Config for each subdag generated by the decorated function
:param external_inputs: External inputs to all parameterized subdags. Note that
if you pass in any external inputs from local subdags, it overrides this (does not merge).
:param parameterization: Parameterizations for each subdag generated.
Note that this *overrides* any inputs/config passed to the decorator itself.
Furthermore, note the following:
1. The parameterizations passed to the constructor are \\*\\*kwargs, so you are not
allowed to name these `load_from`, `inputs`, or `config`. That's a good thing, as these
are not good names for variables anyway.
2. Any empty items (not included) will default to an empty dict (or an empty list in
the case of parameterization)
"""
self.load_from = load_from
self.inputs = inputs if inputs is not None else {}
self.config = config if config is not None else {}
self.parameterization = parameterization
self.external_inputs = external_inputs if external_inputs is not None else []
def _gather_subdag_generators(self) -> List[subdag]:
subdag_generators = []
for key, parameterization in self.parameterization.items():
subdag_generators.append(
subdag(
*self.load_from,
inputs={**self.inputs, **parameterization.get("inputs", {})},
config={**self.config, **parameterization.get("config", {})},
external_inputs=parameterization.get("external_inputs", self.external_inputs),
namespace=key,
final_node_name=key,
)
)
return subdag_generators
def generate_nodes(self, fn: Callable, config: Dict[str, Any]) -> List[node.Node]:
generated_nodes = []
for subdag_generator in self._gather_subdag_generators():
generated_nodes.extend(subdag_generator.generate_nodes(fn, config))
return generated_nodes
def validate(self, fn: Callable):
for subdag_generator in self._gather_subdag_generators():
subdag_generator.validate(fn)
def required_config(self) -> Optional[List[str]]:
"""See note for subdag.required_config -- this is the same pattern.
:return: Any required config items.
"""
return None
def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> List[node.Node]:
"""Prunes the nodes to only include those upstream from the select columns.
Conducts a depth-first search using the nodes `input_types` field.
If select is None, we just assume all nodes should be included.
:param nodes: Full set of nodes
:param select: Columns to select
:return: Pruned set of nodes
"""
if select is None:
return nodes
node_name_map = {node_.name: node_ for node_ in nodes}
seen_nodes = set(select)
stack = list({node_name_map[col] for col in select if col in node_name_map})
output = []
while len(stack) > 0:
node_ = stack.pop()
output.append(node_)
for dep in node_.input_types:
if dep not in seen_nodes and dep in node_name_map:
dep_node = node_name_map[dep]
stack.append(dep_node)
seen_nodes.add(dep)
if not set(select) <= set([node_.name for node_ in output]):
raise ValueError(
"At least one of the selected nodes is not not in the DAG. "
f"You selected: {select}, but we only found nodes: {nodes}."
)
return output
def _default_inject_parameter(fn: Callable, target_dataframe: str = None) -> str:
if target_dataframe is not None:
inject_parameter = target_dataframe
else:
# If we don't have a specified dataframe we assume it's the first argument
function_parameters = list(inspect.signature(fn).parameters.values())
if function_parameters:
inject_parameter = function_parameters[0].name
else:
raise ValueError(
f"Function {fn.__qualname__} has no parameters, but was "
f"decorated with with_columns. with_columns requires the first "
f"parameter to be a dataframe or using the on_input argument."
)
return inject_parameter
class with_columns_base(base.NodeInjector, abc.ABC):
"""Factory for with_columns operation on a dataframe. This is used when you want to extract some
columns out of the dataframe, perform operations on them and then append to the original dataframe.
This is an internal class that is meant to be extended by each individual dataframe library implementing
the following abstract methods:
- get_initial_nodes
- get_subdag_nodes
- chain_subdag_nodes
- validate
"""
# TODO: if we rename the column nodes into something smarter this can be avoided and
# can also modify columns in place
@staticmethod
def contains_duplicates(nodes_: List[node.Node]) -> bool:
"""Ensures that we don't run into name clashing of columns and group operations.
In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want
to safeguard against nameclashing with functions that are passed into ``with_columns`` - i.e.
there are no functions that have the same name as the columns. This effectively means that
using ``columns_to_pass`` will only append new columns to the dataframe and for changing
existing columns ``pass_dataframe_as`` or ``on_input`` needs to be used.
"""
node_counter = defaultdict(int)
for node_ in nodes_:
node_counter[node_.name] += 1
if node_counter[node_.name] > 1:
return True
return False
@staticmethod
def validate_dataframe(
fn: Callable, inject_parameter: str, params: Dict[str, Type[Type]], required_type: Type
) -> None:
input_types = typing.get_type_hints(fn)
if inject_parameter not in params:
raise InvalidDecoratorException(
f"Function: {fn.__name__} does not have the parameter {inject_parameter} as a dependency. "
f"@with_columns requires the parameter names to match the function parameters. "
f"If you wish do not wish to use the first argument, please use ``pass_dataframe_as`` or ``on_input`` option. "
f"It might not be compatible with some other decorators."
)
if isinstance(input_types[inject_parameter], required_type):
raise InvalidDecoratorException(
"The selected dataframe parameter is not the correct dataframe type. "
f"You selected a parameter of type {input_types[inject_parameter]}, but we expect to get {required_type}"
)
def __init__(
self,
*load_from: Union[Callable, ModuleType],
columns_to_pass: List[str] = None,
pass_dataframe_as: str = None,
on_input: str = None,
select: List[str] = None,
namespace: str = None,
config_required: List[str] = None,
dataframe_type: Type = None,
):
"""Instantiates a ``@with_columns`` decorator.
:param load_from: The functions or modules that will be used to generate the group of map operations.
:param columns_to_pass: The initial schema of the dataframe. This is used to determine which
upstream inputs should be taken from the dataframe, and which shouldn't. Note that, if this is
left empty (and external_inputs is as well), we will assume that all dependencies come
from the dataframe. This cannot be used in conjunction with pass_dataframe_as.
:param pass_dataframe_as: The name of the dataframe that we're modifying, as known to the subdag.
If you pass this in, you are responsible for extracting columns out. If not provided, you have
to pass columns_to_pass in, and we will extract the columns out for you.
:param on_input: the dataframe parameter that we are applying with_columns on. By default we
will assume the first parameter is the corresponding dataframe.
:param select: The end nodes that represent columns to be appended to the original dataframe
via with_columns. Existing columns will be overridden.
:param namespace: The namespace of the nodes, so they don't clash with the global namespace
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
:param config_required: the list of config keys that are required to resolve any functions. Pass in None\
if you want the functions/modules to have access to all possible config.
"""
self.subdag_functions = subdag.collect_functions(load_from)
self.select = select
# This is here to restrict to using either pass_dataframe_as or on_input or columns_to_pass
# TODO: decouple columns_to_pass, pass_dataframe_as and on_input
# For spark, we always perform with_columns on first parameter and use pass_dataframe_as;
# for pandas/polars, we can select which dataframe with on_input, but columns_to_pass, will always only work on first parameter
# We can decouple it so that on_input selects the target dataframe parameter that will inject into the next node
# pass_dataframe_as selects the original dataframe we want to extract columns from
# columns_to_pass is optinal helper that can be toggled on/off so no need to raise this error.
if (
int(pass_dataframe_as is None) + int(columns_to_pass is None) + int(on_input is None)
== 1
):
raise ValueError(
"You must specify only one of ``columns_to_pass``, ``pass_dataframe_as``, and ``on_input``. "
"This is because specifying ``pass_dataframe_as`` or ``on_input`` injects into "
"the set of columns, allowing you to perform your own extraction"
"from the dataframe. We then execute all columns in the subdag"
"in order, passing in that initial dataframe. If you want"
"to reference columns in your code, you'll have to specify "
"the set of initial columns, and allow the subdag decorator "
"to inject the dataframe through. The initial columns tell "
"us which parameters to take from that dataframe, so we can"
"feed the right data into the right columns."
)
self.initial_schema = columns_to_pass
self.dataframe_subdag_param = pass_dataframe_as
self.target_dataframe = on_input
self.namespace = namespace
self.config_required = config_required
if dataframe_type is None:
raise InvalidDecoratorException(
"Please provide the dataframe type for this specific library."
)
self.dataframe_type = dataframe_type
def required_config(self) -> List[str]:
return self.config_required
@abc.abstractmethod
def get_initial_nodes(
self, fn: Callable, params: Dict[str, Type[Type]]
) -> Tuple[str, Collection[node.Node]]:
"""Preparation stage where columns get extracted into nodes. In case `pass_dataframe_as` or `on_input` is
used, this should return an empty list (no column nodes) since the users will extract it
themselves.
:param fn: the function we are decorating. By using the inspect library you can get information.
about what arguments it has / find out the dataframe argument.
:param params: Dictionary of all the type names one wants to inject.
:return: name of the dataframe parameter and list of nodes representing the extracted columns (can be empty).
"""
pass
@abc.abstractmethod
def get_subdag_nodes(self, fn: Callable, config: Dict[str, Any]) -> Collection[node.Node]:
"""Creates subdag from the passed in module / functions.
:param config: Configuration with which the DAG was constructed.
:return: the subdag as a list of nodes.
"""
pass
@abc.abstractmethod
def chain_subdag_nodes(
self, fn: Callable, inject_parameter: str, generated_nodes: Collection[node.Node]
) -> node.Node:
"""Combines the origanl dataframe with selected columns. This should produce a
dataframe output that is injected into the decorated function with new columns
appended and existing columns overriden.
:param inject_parameter: the name of the original dataframe that.
:return: the new dataframe with the columns appended / overwritten.
"""
pass
def inject_nodes(
self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable
) -> Tuple[List[node.Node], Dict[str, str]]:
namespace = fn.__name__ if self.namespace is None else self.namespace
inject_parameter, initial_nodes = self.get_initial_nodes(fn=fn, params=params)
subdag_nodes = self.get_subdag_nodes(fn=fn, config=config)
generated_nodes = initial_nodes + subdag_nodes
# TODO: for now we restrict that if user wants to change columns that already exist, he needs to
# pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the
# initial node names with the ongoing ones that have a column argument, we can also allow in place
# changes when using columns_to_pass
if with_columns_base.contains_duplicates(generated_nodes):
raise ValueError(
"You can only specify columns once. You used `columns_to_pass` and we "
"extract the columns for you. In this case they cannot be overwritten -- only new columns get "
"appended. If you want to modify in-place columns pass in a dataframe and "
"extract + modify the columns and afterwards select them."
)
pruned_nodes = prune_nodes(nodes=generated_nodes, select=self.select)
if len(pruned_nodes) == 0:
raise ValueError(
f"No nodes found upstream from select columns: {self.select} for function: "
f"{fn.__qualname__}"
)
# Node combining columns and dataframe might need info about prior nodes
output_nodes, current_param = self.chain_subdag_nodes(
fn=fn, inject_parameter=inject_parameter, generated_nodes=pruned_nodes
)
output_nodes = subdag.add_namespace(output_nodes, namespace)
return output_nodes, {inject_parameter: assign_namespace(current_param, namespace)}