blob: a4a8b76a6cfc14bc4d158309c9901eb78a7b5553 [file] [log] [blame]
"""This module contains base constructs for executing a hamilton graph.
It should only import hamilton.node, numpy, pandas.
It cannot import hamilton.graph, or hamilton.driver.
"""
import abc
import collections
import inspect
import logging
from typing import Any, Dict, List, Tuple, Type, Union
import numpy as np
import pandas as pd
import typing_inspect
from pandas.core.indexes import extension as pd_extension
try:
from . import node
except ImportError:
import node
logger = logging.getLogger(__name__)
class ResultMixin(object):
"""Abstract base class housing the static function.
Why a static function? That's because certain frameworks can only pickle a static function, not an entire
object.
All result builders should inherit from this class and implement the build_result function.
"""
@staticmethod
@abc.abstractmethod
def build_result(**outputs: Dict[str, Any]) -> Any:
"""This function builds the result given the computed values."""
pass
class DictResult(ResultMixin):
"""Simple function that returns the dict of column -> value results.
It returns the results as a dictionary, where the keys map to outputs requested,
and values map to what was computed for those values.
Use this when you want to:
1. debug dataflows.
2. have heterogeneous return types.
3. Want to manually transform the result into something of your choosing.
.. code-block:: python
from hamilton import base, driver
dict_builder = base.DictResult()
adapter = base.SimplePythonGraphAdapter(dict_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
dict_result = dr.execute([...], inputs=...)
"""
@staticmethod
def build_result(**outputs: Dict[str, Any]) -> Dict:
"""This function builds a simple dict of output -> computed values."""
return outputs
class PandasDataFrameResult(ResultMixin):
"""Mixin for building a pandas dataframe from the result.
It returns the results as a Pandas Dataframe, where the columns map to outputs requested, and values map to what\
was computed for those values. Note: this only works if the computed values are pandas series, or scalar values.
Use this when you want to create a pandas dataframe.
Example:
.. code-block:: python
from hamilton import base, driver
default_builder = base.PandasDataFrameResult()
adapter = base.SimplePythonGraphAdapter(default_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
df = dr.execute([...], inputs=...)
"""
@staticmethod
def pandas_index_types(
outputs: Dict[str, Any]
) -> Tuple[Dict[str, List[str]], Dict[str, List[str]], Dict[str, List[str]]]:
"""This function creates three dictionaries according to whether there is an index type or not.
The three dicts we create are:
1. Dict of index type to list of outputs that match it.
2. Dict of time series / categorical index types to list of outputs that match it.
3. Dict of `no-index` key to list of outputs with no index type.
:param outputs: the dict we're trying to create a result from.
:return: dict of all index types, dict of time series/categorical index types, dict if there is no index
"""
all_index_types = collections.defaultdict(list)
time_indexes = collections.defaultdict(list)
no_indexes = collections.defaultdict(list)
def index_key_name(pd_object: Union[pd.DataFrame, pd.Series]) -> str:
"""Creates a string helping identify the index and it's type.
Useful for disambiguating time related indexes."""
return f"{pd_object.index.__class__.__name__}:::{pd_object.index.dtype}"
def get_parent_time_index_type():
"""Helper to pull the right time index parent class."""
if hasattr(pd_extension, "NDArrayBackedExtensionIndex"):
index_type = pd_extension.NDArrayBackedExtensionIndex
else:
index_type = None # weird case, but not worth breaking for.
return index_type
for output_name, output_value in outputs.items():
if isinstance(
output_value, (pd.DataFrame, pd.Series)
): # if it has an index -- let's grab it's type
dict_key = index_key_name(output_value)
if isinstance(output_value.index, get_parent_time_index_type()):
# it's a time index -- these will produce garbage if not aligned properly.
time_indexes[dict_key].append(output_name)
elif isinstance(
output_value, pd.Index
): # there is no index on this - so it's just an integer one.
int_index = pd.Series(
[1, 2, 3], index=[0, 1, 2]
) # dummy to get right values for string.
dict_key = index_key_name(int_index)
else:
dict_key = "no-index"
no_indexes[dict_key].append(output_name)
all_index_types[dict_key].append(output_name)
return all_index_types, time_indexes, no_indexes
@staticmethod
def check_pandas_index_types_match(
all_index_types: Dict[str, List[str]],
time_indexes: Dict[str, List[str]],
no_indexes: Dict[str, List[str]],
) -> bool:
"""Checks that pandas index types match.
This only logs warning errors, and if debug is enabled, a debug statement to list index types.
"""
no_index_length = len(no_indexes)
time_indexes_length = len(time_indexes)
all_indexes_length = len(all_index_types)
number_with_indexes = all_indexes_length - no_index_length
types_match = True # default to True
# if there is more than one time index
if time_indexes_length > 1:
logger.warning(
"WARNING: Time/Categorical index type mismatches detected - check output to ensure Pandas "
"is doing what you intend to do. Else change the index types to match. Set logger to debug "
"to see index types."
)
types_match = False
# if there is more than one index type and it's not explained by the time indexes then
if number_with_indexes > 1 and all_indexes_length > time_indexes_length:
logger.warning(
"WARNING: Multiple index types detected - check output to ensure Pandas is "
"doing what you intend to do. Else change the index types to match. Set logger to debug to "
"see index types."
)
types_match = False
elif number_with_indexes == 1 and no_index_length > 0:
logger.warning(
f"WARNING: a single pandas index was found, but there are also {len(no_indexes['no-index'])} "
"outputs without an index. Please check whether the dataframe created matches what what you "
"expect to happen."
)
# Strictly speaking the index types match -- there is only one -- so setting to True.
types_match = True
# if all indexes matches no indexes
elif no_index_length == all_indexes_length:
logger.warning(
"It appears no Pandas index type was detected (ignore this warning if you're using DASK for now.) "
"Please check whether the dataframe created matches what what you expect to happen."
)
types_match = False
if logger.isEnabledFor(logging.DEBUG):
import pprint
pretty_string = pprint.pformat(dict(all_index_types))
logger.debug(f"Index types encountered:\n{pretty_string}.")
return types_match
@staticmethod
def build_result(**outputs: Dict[str, Any]) -> pd.DataFrame:
"""Builds a Pandas DataFrame from the outputs.
This function will check the index types of the outputs, and log warnings if they don't match.
The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in.
:param outputs: the outputs to build a dataframe from.
"""
# TODO check inputs are pd.Series, arrays, or scalars -- else error
output_index_type_tuple = PandasDataFrameResult.pandas_index_types(outputs)
# this next line just log warnings
# we don't actually care about the result since this is the current default behavior.
PandasDataFrameResult.check_pandas_index_types_match(*output_index_type_tuple)
if len(outputs) == 1:
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pd.DataFrame):
return value
if not any(pd.api.types.is_list_like(value) for value in outputs.values()):
# If we're dealing with all values that don't have any "index" that could be created
# (i.e. scalars, objects) coerce the output to a single-row, multi-column dataframe.
return pd.DataFrame([outputs])
#
contains_df = any(isinstance(value, pd.DataFrame) for value in outputs.values())
if contains_df:
# build the dataframe from the outputs
return PandasDataFrameResult.build_dataframe_with_dataframes(outputs)
# don't do anything special if dataframes aren't in the output.
return pd.DataFrame(outputs) # this does an implicit outer join based on index.
@staticmethod
def build_dataframe_with_dataframes(outputs: Dict[str, Any]) -> pd.DataFrame:
"""Builds a dataframe from the outputs in an "outer join" manner based on index.
The behavior of pd.Dataframe(outputs) is that it will do an outer join based on indexes of the Series passed in.
To handle dataframes, we unpack the dataframe into a dict of series, check to ensure that no columns are
redefined in a rolling fashion going in order of the outputs requested. This then results in an "enlarged"
outputs dict that is then passed to pd.Dataframe(outputs) to get the final dataframe.
:param outputs: The outputs to build the dataframe from.
:return: A dataframe with the outputs.
"""
def get_output_name(output_name: str, column_name: str) -> str:
"""Add function prefix to columns.
Note this means that they stop being valid python identifiers due to the `.` in the string.
"""
return f"{output_name}.{column_name}"
flattened_outputs = {}
for name, output in outputs.items():
if isinstance(output, pd.DataFrame):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Unpacking dataframe {name} into dict of series with columns {list(output.columns)}."
)
df_dict = {
get_output_name(name, col_name): col_value
for col_name, col_value in output.to_dict(orient="series").items()
}
flattened_outputs.update(df_dict)
elif isinstance(output, pd.Series):
if name in flattened_outputs:
raise ValueError(
f"Series {name} already exists in the output. "
f"Please rename the series to avoid this error, or determine from where the initial series is "
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output
else:
if name in flattened_outputs:
raise ValueError(
f"Non series output {name} already exists in the output. "
f"Please rename this output to avoid this error, or determine from where the initial value is "
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output
return pd.DataFrame(flattened_outputs)
class StrictIndexTypePandasDataFrameResult(PandasDataFrameResult):
"""A ResultBuilder that produces a dataframe only if the index types match exactly.
Note: If there is no index type on some outputs, e.g. the value is a scalar, as long as there exists a single \
pandas index type, no error will be thrown, because a dataframe can be easily created.
Use this when you want to create a pandas dataframe from the outputs, but you want to ensure that the index types \
match exactly.
To use:
.. code-block:: python
from hamilton import base, driver
strict_builder = base.StrictIndexTypePandasDataFrameResult()
adapter = base.SimplePythonGraphAdapter(strict_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
df = dr.execute([...], inputs=...) # this will now error if index types mismatch.
"""
@staticmethod
def build_result(**outputs: Dict[str, Any]) -> pd.DataFrame:
# TODO check inputs are pd.Series, arrays, or scalars -- else error
output_index_type_tuple = PandasDataFrameResult.pandas_index_types(outputs)
indexes_match = PandasDataFrameResult.check_pandas_index_types_match(
*output_index_type_tuple
)
if not indexes_match:
import pprint
pretty_string = pprint.pformat(dict(output_index_type_tuple[0]))
raise ValueError(
"Error: pandas index types did not match exactly. "
f"Found the following indexes:\n{pretty_string}"
)
return PandasDataFrameResult.build_result(**outputs)
class NumpyMatrixResult(ResultMixin):
"""Mixin for building a Numpy Matrix from the result of walking the graph.
All inputs to the build_result function are expected to be numpy arrays.
.. code-block:: python
from hamilton import base, driver
adapter = base.SimplePythonGraphAdapter(base.NumpyMatrixResult())
dr = driver.Driver(config, *modules, adapter=adapter)
numpy_matrix = dr.execute([...], inputs=...)
"""
@staticmethod
def build_result(**outputs: Dict[str, Any]) -> np.matrix:
"""Builds a numpy matrix from the passed in, inputs.
Note: this does not check that the inputs are all numpy arrays/array like things.
:param outputs: function_name -> np.array.
:return: numpy matrix
"""
# TODO check inputs are all numpy arrays/array like things -- else error
num_rows = -1
columns_with_lengths = collections.OrderedDict()
for col, val in outputs.items(): # assumption is fixed order
if isinstance(val, (int, float)): # TODO add more things here
columns_with_lengths[(col, 1)] = val
else:
length = len(val)
if num_rows == -1:
num_rows = length
elif length == num_rows:
# we're good
pass
else:
raise ValueError(
f"Error, got non scalar result that mismatches length of other vector. "
f"Got {length} for {col} instead of {num_rows}."
)
columns_with_lengths[(col, num_rows)] = val
list_of_columns = []
for (col, length), val in columns_with_lengths.items():
if length != num_rows and length == 1:
list_of_columns.append([val] * num_rows) # expand single values into a full row
elif length == num_rows:
list_of_columns.append(list(val))
else:
raise ValueError(
f"Do not know how to make this column {col} with length {length }have {num_rows} rows"
)
# Create the matrix with columns as rows and then transpose
return np.asmatrix(list_of_columns).T
class HamiltonGraphAdapter(ResultMixin):
"""Any GraphAdapters should implement this interface to adapt the HamiltonGraph for that particular context.
Note since it inherits ResultMixin -- HamiltonGraphAdapters need a `build_result` function too.
"""
@staticmethod
@abc.abstractmethod
def check_input_type(node_type: Type, input_value: Any) -> bool:
"""Used to check whether the user inputs match what the execution strategy & functions can handle.
:param node_type: The type of the node.
:param input_value: An actual value that we want to inspect matches our expectation.
:return:
"""
pass
@staticmethod
@abc.abstractmethod
def check_node_type_equivalence(node_type: Type, input_type: Type) -> bool:
"""Used to check whether two types are equivalent.
This is used when the function graph is being created and we're statically type checking the annotations
for compatibility.
:param node_type: The type of the node.
:param input_type: The type of the input that would flow into the node.
:return:
"""
pass
@abc.abstractmethod
def execute_node(self, node: node.Node, kwargs: Dict[str, Any]) -> Any:
"""Given a node that represents a hamilton function, execute it.
Note, in some adapters this might just return some type of "future".
:param node: the Hamilton Node
:param kwargs: the kwargs required to exercise the node function.
:return: the result of exercising the node.
"""
pass
class SimplePythonDataFrameGraphAdapter(HamiltonGraphAdapter, PandasDataFrameResult):
"""This is the default (original Hamilton) graph adapter. It uses plain python and builds a dataframe result.
This executes the Hamilton dataflow locally on a machine in a single threaded, single process fashion. It assumes\
a pandas dataframe as a result.
Use this when you want to execute on a single machine, without parallelization, and you want a pandas dataframe \
as output.
"""
@staticmethod
def check_input_type(node_type: Type, input_value: Any) -> bool:
if node_type == Any:
return True
# In the case of dict[str, Any] (or equivalent) in python 3.9 +
# we need to double-check that its not generic, as the isinstance clause will break this
elif (
inspect.isclass(node_type)
and not typing_inspect.is_generic_type(node_type)
and isinstance(input_value, node_type)
):
return True
elif typing_inspect.is_typevar(node_type): # skip runtime comparison for now.
return True
elif typing_inspect.is_generic_type(node_type) and typing_inspect.get_origin(
node_type
) == type(input_value):
return True
elif typing_inspect.is_union_type(node_type):
union_types = typing_inspect.get_args(node_type)
return any(
[
SimplePythonDataFrameGraphAdapter.check_input_type(ut, input_value)
for ut in union_types
]
)
elif node_type == type(input_value):
return True
return False
@staticmethod
def check_node_type_equivalence(node_type: Type, input_type: Type) -> bool:
return node_type == input_type
def execute_node(self, node: node.Node, kwargs: Dict[str, Any]) -> Any:
return node.callable(**kwargs)
class SimplePythonGraphAdapter(SimplePythonDataFrameGraphAdapter):
"""This class allows you to swap out the build_result very easily.
This executes the Hamilton dataflow locally on a machine in a single threaded, single process fashion. It allows\
you to specify a ResultBuilder to control the return type of what ``execute()`` returns.
"""
def __init__(self, result_builder: ResultMixin):
"""Allows you to swap out the build_result very easily.
:param result_builder: A ResultMixin object that will be used to build the result.
"""
self.result_builder = result_builder
if self.result_builder is None:
raise ValueError("You must provide a ResultMixin object for `result_builder`.")
def build_result(self, **outputs: Dict[str, Any]) -> Any:
"""Delegates to the result builder function supplied."""
return self.result_builder.build_result(**outputs)