WIP
This doesn't work because:
1. daft doesn't do column level ops.
2. we need to implement with_columns
diff --git a/examples/daft/hello_world/functions.py b/examples/daft/hello_world/functions.py
new file mode 100644
index 0000000..4d7e1fa
--- /dev/null
+++ b/examples/daft/hello_world/functions.py
@@ -0,0 +1,56 @@
+import daft
+from daft import DataType, col, udf
+
+from hamilton.function_modifiers import extract_columns
+
+
+# @extract_columns("signups", "spend")
+def base_df(base_df_location: str) -> daft.DataFrame:
+ """Loads base dataframe of data.
+
+ :param base_df_location: just showing that we could load this from a file...
+ :return: a Polars dataframe
+ """
+ return daft.from_pydict(
+ {
+ "signups": [1, 10, 50, 100, 200, 400],
+ "spend": [10, 10, 20, 40, 40, 50],
+ }
+ )
+
+
+def avg_3wk_spend(base_df: daft.DataFrame) -> daft.DataFrame:
+ """Computes rolling 3 week average spend."""
+ # return base_df.with_column("avg_3wk_spend", (col("spend").agg("sum")), "sum")]
+ # return spend.rolling_mean(3)
+
+
+def spend_per_signup(base_df: daft.DataFrame) -> daft.DataFrame:
+ """Computes cost per signup in relation to spend."""
+
+ return base_df.with_column("spend_per_signup", base_df["spend"] / base_df["signups"])
+
+
+def spend_mean(base_df: daft.DataFrame) -> daft.DataFrame:
+ """Shows function creating a scalar. In this case it computes the mean of the entire column."""
+ return base_df.with_column("spend_mean", base_df.agg([(col("spend"), "sum")]))
+
+
+def spend_zero_mean(spend_mean: daft.DataFrame) -> daft.DataFrame:
+ """Shows function that takes a scalar. In this case to zero mean spend."""
+ return spend_mean.with_column("spend_zero_mean", spend_mean["spend"] - spend_mean["spend_mean"])
+
+
+def spend_std_dev(base_df: daft.DataFrame) -> daft.DataFrame:
+ """Computes the standard deviation of the spend column."""
+ return base_df.with_column("spend_std_dev", base_df["spend"].std())
+
+
+def spend_zero_mean_unit_variance(
+ spend_zero_mean: daft.DataFrame, spend_std_dev: daft.DataFrame
+) -> daft.DataFrame:
+ """Shows one way to make spend have zero mean and unit variance."""
+ return spend_zero_mean.with_column(
+ "spend_zero_mean_unit_variance",
+ spend_zero_mean["spend_zero_mean"] / spend_zero_mean["spend_std_dev"],
+ )
diff --git a/examples/daft/hello_world/run.py b/examples/daft/hello_world/run.py
new file mode 100644
index 0000000..f48ea80
--- /dev/null
+++ b/examples/daft/hello_world/run.py
@@ -0,0 +1,70 @@
+import logging
+import sys
+from typing import Any, Dict, Type, Union
+
+import daft
+
+from hamilton import base, driver
+
+logging.basicConfig(stream=sys.stdout)
+
+
+class DaftDataFrameResult(base.ResultMixin):
+ def build_result(
+ self, **outputs: Dict[str, Union[daft.dataframe.dataframe.DataFrame, Any]]
+ ) -> daft.dataframe.dataframe.DataFrame:
+ """This is the method that Hamilton will call to build the final result. It will pass in the results
+ of the requested outputs that you passed in to the execute() method.
+
+ Note: this function could do smarter things; looking for contributions here!
+
+ :param outputs: The results of the requested outputs.
+ :return: a polars DataFrame.
+ """
+ # TODO:
+ # do dataframe to dataframe
+ # cannot pull expressions
+ # do single column dataframes
+ #
+ if len(outputs) == 1:
+ (value,) = outputs.values() # this works because it's length 1.
+ if isinstance(value, daft.dataframe.dataframe.DataFrame): # it's a dataframe
+ return value
+ elif not isinstance(
+ value, daft.expressions.expressions.Expression
+ ): # it's a single scalar/object
+ key, value = outputs.popitem()
+ return daft.dataframe.dataframe.DataFrame({key: [value]})
+ else: # it's a series
+ return daft.dataframe.dataframe.DataFrame(outputs)
+ # TODO: check for length of outputs and determine what should
+ # happen for mixed outputs that include scalars for example.
+ return daft.from_pydict(outputs)
+
+ def output_type(self) -> Type:
+ return daft.dataframe.dataframe.DataFrame
+
+
+# Create a driver instance.
+adapter = base.SimplePythonGraphAdapter(result_builder=DaftDataFrameResult())
+config = {
+ "base_df_location": "dummy_value",
+}
+import functions # where our functions are defined
+
+dr = driver.Driver(config, functions, adapter=adapter)
+# note -- currently the result builder does not handle mixed outputs, e.g. Series and scalars.
+output_columns = [
+ "base_df",
+ # "signups",
+ "avg_3wk_spend",
+ "spend_per_signup",
+ "spend_zero_mean_unit_variance",
+]
+# let's create the dataframe!
+df = dr.execute(output_columns)
+print(df)
+
+# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
+# dr.visualize_execution(output_columns, './polars', {"format": "png"})
+# dr.display_all_functions('./my_full_dag.dot')
diff --git a/hamilton/function_modifiers/base.py b/hamilton/function_modifiers/base.py
index baa5803..f8eab74 100644
--- a/hamilton/function_modifiers/base.py
+++ b/hamilton/function_modifiers/base.py
@@ -33,6 +33,7 @@
"xgboost",
"lightgbm",
"sklearn_plot",
+ "daft",
]
for plugin_module in plugins_modules:
try:
diff --git a/hamilton/function_modifiers/expanders.py b/hamilton/function_modifiers/expanders.py
index b0fdfea..b24f228 100644
--- a/hamilton/function_modifiers/expanders.py
+++ b/hamilton/function_modifiers/expanders.py
@@ -663,7 +663,11 @@
column_to_extract: str = column, **kwargs
) -> Any: # avoiding problems with closures
df = kwargs[node_.name]
- if column_to_extract not in df:
+ if hasattr(df, "column_names"):
+ clause = column_to_extract not in df.column_names
+ else:
+ clause = column_to_extract not in df
+ if clause:
raise base.InvalidDecoratorException(
f"No such column: {column_to_extract} produced by {node_.name}. "
f"It only produced {str(df.columns)}"
diff --git a/hamilton/plugins/daft_extensions.py b/hamilton/plugins/daft_extensions.py
new file mode 100644
index 0000000..0e91ca0
--- /dev/null
+++ b/hamilton/plugins/daft_extensions.py
@@ -0,0 +1,56 @@
+import dataclasses
+from io import BytesIO, IOBase, TextIOWrapper
+from pathlib import Path
+from typing import (
+ Any,
+ BinaryIO,
+ Collection,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ TextIO,
+ Tuple,
+ Type,
+ Union,
+)
+
+try:
+ import daft
+except ImportError:
+ raise NotImplementedError("Daft is not installed.")
+
+
+from hamilton import registry
+from hamilton.io import utils
+from hamilton.io.data_adapters import DataLoader, DataSaver
+
+DATAFRAME_TYPE = daft.dataframe.dataframe.DataFrame
+COLUMN_TYPE = daft.expressions.expressions.Expression
+
+
+@registry.get_column.register(daft.dataframe.dataframe.DataFrame)
+def get_column_daft(
+ df: daft.dataframe.dataframe.DataFrame, column_name: str
+) -> daft.dataframe.dataframe.DataFrame:
+ return df.select(column_name)
+
+
+@registry.fill_with_scalar.register(daft.dataframe.dataframe.DataFrame)
+def fill_with_scalar_daft(
+ df: daft.dataframe.dataframe.DataFrame, column_name: str, scalar_value: Any
+) -> daft.dataframe.dataframe.DataFrame:
+ if not isinstance(scalar_value, daft.expressions.expressions.Expression):
+ scalar_value = [scalar_value]
+ return df.with_column(
+ daft.expressions.expressions.Expression(name=column_name, values=scalar_value)
+ )
+
+
+def register_types():
+ """Function to register the types for this extension."""
+ registry.register_types("daft", DATAFRAME_TYPE, COLUMN_TYPE)
+
+
+register_types()
diff --git a/hamilton/plugins/h_daft.py b/hamilton/plugins/h_daft.py
new file mode 100644
index 0000000..2331f15
--- /dev/null
+++ b/hamilton/plugins/h_daft.py
@@ -0,0 +1,53 @@
+from typing import Any, Dict, Type, Union
+
+import daft
+
+from hamilton import base
+
+
+class DaftDataFrameResult(base.ResultMixin):
+ """A ResultBuilder that produces a polars dataframe.
+
+ Use this when you want to create a polars dataframe from the outputs. Caveat: you need to ensure that the length
+ of the outputs is the same, otherwise you will get an error; mixed outputs aren't that well handled.
+
+ To use:
+
+ .. code-block:: python
+
+ from hamilton import base, driver
+ from hamilton.plugins import polars_extensions
+ polars_builder = polars_extensions.PolarsDataFrameResult()
+ adapter = base.SimplePythonGraphAdapter(polars_builder)
+ dr = driver.Driver(config, *modules, adapter=adapter)
+ df = dr.execute([...], inputs=...) # returns polars dataframe
+
+ Note: this is just a first attempt at something for Polars. Think it should handle more? Come chat/open a PR!
+ """
+
+ def build_result(
+ self, **outputs: Dict[str, Union[daft.dataframe.dataframe.DataFrame, Any]]
+ ) -> daft.DataFrame:
+ """This is the method that Hamilton will call to build the final result. It will pass in the results
+ of the requested outputs that you passed in to the execute() method.
+
+ Note: this function could do smarter things; looking for contributions here!
+
+ :param outputs: The results of the requested outputs.
+ :return: a polars DataFrame.
+ """
+ if len(outputs) == 1:
+ (value,) = outputs.values() # this works because it's length 1.
+ if isinstance(value, daft.DataFrame): # it's a dataframe
+ return value
+ elif not isinstance(value, daft.DataFrame): # it's a single scalar/object
+ key, value = outputs.popitem()
+ return daft.DataFrame({key: [value]})
+ else: # it's a series
+ return daft.DataFrame(outputs)
+ # TODO: check for length of outputs and determine what should
+ # happen for mixed outputs that include scalars for example.
+ return daft.DataFrame(outputs)
+
+ def output_type(self) -> Type:
+ return daft.DataFrame