WIP

This doesn't work because:

1. daft doesn't do column level ops.
2. we need to implement with_columns
diff --git a/examples/daft/hello_world/functions.py b/examples/daft/hello_world/functions.py
new file mode 100644
index 0000000..4d7e1fa
--- /dev/null
+++ b/examples/daft/hello_world/functions.py
@@ -0,0 +1,56 @@
+import daft
+from daft import DataType, col, udf
+
+from hamilton.function_modifiers import extract_columns
+
+
+# @extract_columns("signups", "spend")
+def base_df(base_df_location: str) -> daft.DataFrame:
+    """Loads base dataframe of data.
+
+    :param base_df_location: just showing that we could load this from a file...
+    :return: a Polars dataframe
+    """
+    return daft.from_pydict(
+        {
+            "signups": [1, 10, 50, 100, 200, 400],
+            "spend": [10, 10, 20, 40, 40, 50],
+        }
+    )
+
+
+def avg_3wk_spend(base_df: daft.DataFrame) -> daft.DataFrame:
+    """Computes rolling 3 week average spend."""
+    # return base_df.with_column("avg_3wk_spend", (col("spend").agg("sum")), "sum")]
+    # return spend.rolling_mean(3)
+
+
+def spend_per_signup(base_df: daft.DataFrame) -> daft.DataFrame:
+    """Computes cost per signup in relation to spend."""
+
+    return base_df.with_column("spend_per_signup", base_df["spend"] / base_df["signups"])
+
+
+def spend_mean(base_df: daft.DataFrame) -> daft.DataFrame:
+    """Shows function creating a scalar. In this case it computes the mean of the entire column."""
+    return base_df.with_column("spend_mean", base_df.agg([(col("spend"), "sum")]))
+
+
+def spend_zero_mean(spend_mean: daft.DataFrame) -> daft.DataFrame:
+    """Shows function that takes a scalar. In this case to zero mean spend."""
+    return spend_mean.with_column("spend_zero_mean", spend_mean["spend"] - spend_mean["spend_mean"])
+
+
+def spend_std_dev(base_df: daft.DataFrame) -> daft.DataFrame:
+    """Computes the standard deviation of the spend column."""
+    return base_df.with_column("spend_std_dev", base_df["spend"].std())
+
+
+def spend_zero_mean_unit_variance(
+    spend_zero_mean: daft.DataFrame, spend_std_dev: daft.DataFrame
+) -> daft.DataFrame:
+    """Shows one way to make spend have zero mean and unit variance."""
+    return spend_zero_mean.with_column(
+        "spend_zero_mean_unit_variance",
+        spend_zero_mean["spend_zero_mean"] / spend_zero_mean["spend_std_dev"],
+    )
diff --git a/examples/daft/hello_world/run.py b/examples/daft/hello_world/run.py
new file mode 100644
index 0000000..f48ea80
--- /dev/null
+++ b/examples/daft/hello_world/run.py
@@ -0,0 +1,70 @@
+import logging
+import sys
+from typing import Any, Dict, Type, Union
+
+import daft
+
+from hamilton import base, driver
+
+logging.basicConfig(stream=sys.stdout)
+
+
+class DaftDataFrameResult(base.ResultMixin):
+    def build_result(
+        self, **outputs: Dict[str, Union[daft.dataframe.dataframe.DataFrame, Any]]
+    ) -> daft.dataframe.dataframe.DataFrame:
+        """This is the method that Hamilton will call to build the final result. It will pass in the results
+        of the requested outputs that you passed in to the execute() method.
+
+        Note: this function could do smarter things; looking for contributions here!
+
+        :param outputs: The results of the requested outputs.
+        :return: a polars DataFrame.
+        """
+        # TODO:
+        # do dataframe to dataframe
+        # cannot pull expressions
+        # do single column dataframes
+        #
+        if len(outputs) == 1:
+            (value,) = outputs.values()  # this works because it's length 1.
+            if isinstance(value, daft.dataframe.dataframe.DataFrame):  # it's a dataframe
+                return value
+            elif not isinstance(
+                value, daft.expressions.expressions.Expression
+            ):  # it's a single scalar/object
+                key, value = outputs.popitem()
+                return daft.dataframe.dataframe.DataFrame({key: [value]})
+            else:  # it's a series
+                return daft.dataframe.dataframe.DataFrame(outputs)
+        # TODO: check for length of outputs and determine what should
+        # happen for mixed outputs that include scalars for example.
+        return daft.from_pydict(outputs)
+
+    def output_type(self) -> Type:
+        return daft.dataframe.dataframe.DataFrame
+
+
+# Create a driver instance.
+adapter = base.SimplePythonGraphAdapter(result_builder=DaftDataFrameResult())
+config = {
+    "base_df_location": "dummy_value",
+}
+import functions  # where our functions are defined
+
+dr = driver.Driver(config, functions, adapter=adapter)
+# note -- currently the result builder does not handle mixed outputs, e.g. Series and scalars.
+output_columns = [
+    "base_df",
+    # "signups",
+    "avg_3wk_spend",
+    "spend_per_signup",
+    "spend_zero_mean_unit_variance",
+]
+# let's create the dataframe!
+df = dr.execute(output_columns)
+print(df)
+
+# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
+# dr.visualize_execution(output_columns, './polars', {"format": "png"})
+# dr.display_all_functions('./my_full_dag.dot')
diff --git a/hamilton/function_modifiers/base.py b/hamilton/function_modifiers/base.py
index baa5803..f8eab74 100644
--- a/hamilton/function_modifiers/base.py
+++ b/hamilton/function_modifiers/base.py
@@ -33,6 +33,7 @@
         "xgboost",
         "lightgbm",
         "sklearn_plot",
+        "daft",
     ]
     for plugin_module in plugins_modules:
         try:
diff --git a/hamilton/function_modifiers/expanders.py b/hamilton/function_modifiers/expanders.py
index b0fdfea..b24f228 100644
--- a/hamilton/function_modifiers/expanders.py
+++ b/hamilton/function_modifiers/expanders.py
@@ -663,7 +663,11 @@
                     column_to_extract: str = column, **kwargs
                 ) -> Any:  # avoiding problems with closures
                     df = kwargs[node_.name]
-                    if column_to_extract not in df:
+                    if hasattr(df, "column_names"):
+                        clause = column_to_extract not in df.column_names
+                    else:
+                        clause = column_to_extract not in df
+                    if clause:
                         raise base.InvalidDecoratorException(
                             f"No such column: {column_to_extract} produced by {node_.name}. "
                             f"It only produced {str(df.columns)}"
diff --git a/hamilton/plugins/daft_extensions.py b/hamilton/plugins/daft_extensions.py
new file mode 100644
index 0000000..0e91ca0
--- /dev/null
+++ b/hamilton/plugins/daft_extensions.py
@@ -0,0 +1,56 @@
+import dataclasses
+from io import BytesIO, IOBase, TextIOWrapper
+from pathlib import Path
+from typing import (
+    Any,
+    BinaryIO,
+    Collection,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    TextIO,
+    Tuple,
+    Type,
+    Union,
+)
+
+try:
+    import daft
+except ImportError:
+    raise NotImplementedError("Daft is not installed.")
+
+
+from hamilton import registry
+from hamilton.io import utils
+from hamilton.io.data_adapters import DataLoader, DataSaver
+
+DATAFRAME_TYPE = daft.dataframe.dataframe.DataFrame
+COLUMN_TYPE = daft.expressions.expressions.Expression
+
+
+@registry.get_column.register(daft.dataframe.dataframe.DataFrame)
+def get_column_daft(
+    df: daft.dataframe.dataframe.DataFrame, column_name: str
+) -> daft.dataframe.dataframe.DataFrame:
+    return df.select(column_name)
+
+
+@registry.fill_with_scalar.register(daft.dataframe.dataframe.DataFrame)
+def fill_with_scalar_daft(
+    df: daft.dataframe.dataframe.DataFrame, column_name: str, scalar_value: Any
+) -> daft.dataframe.dataframe.DataFrame:
+    if not isinstance(scalar_value, daft.expressions.expressions.Expression):
+        scalar_value = [scalar_value]
+    return df.with_column(
+        daft.expressions.expressions.Expression(name=column_name, values=scalar_value)
+    )
+
+
+def register_types():
+    """Function to register the types for this extension."""
+    registry.register_types("daft", DATAFRAME_TYPE, COLUMN_TYPE)
+
+
+register_types()
diff --git a/hamilton/plugins/h_daft.py b/hamilton/plugins/h_daft.py
new file mode 100644
index 0000000..2331f15
--- /dev/null
+++ b/hamilton/plugins/h_daft.py
@@ -0,0 +1,53 @@
+from typing import Any, Dict, Type, Union
+
+import daft
+
+from hamilton import base
+
+
+class DaftDataFrameResult(base.ResultMixin):
+    """A ResultBuilder that produces a polars dataframe.
+
+    Use this when you want to create a polars dataframe from the outputs. Caveat: you need to ensure that the length
+    of the outputs is the same, otherwise you will get an error; mixed outputs aren't that well handled.
+
+    To use:
+
+    .. code-block:: python
+
+        from hamilton import base, driver
+        from hamilton.plugins import polars_extensions
+        polars_builder = polars_extensions.PolarsDataFrameResult()
+        adapter = base.SimplePythonGraphAdapter(polars_builder)
+        dr =  driver.Driver(config, *modules, adapter=adapter)
+        df = dr.execute([...], inputs=...)  # returns polars dataframe
+
+    Note: this is just a first attempt at something for Polars. Think it should handle more? Come chat/open a PR!
+    """
+
+    def build_result(
+        self, **outputs: Dict[str, Union[daft.dataframe.dataframe.DataFrame, Any]]
+    ) -> daft.DataFrame:
+        """This is the method that Hamilton will call to build the final result. It will pass in the results
+        of the requested outputs that you passed in to the execute() method.
+
+        Note: this function could do smarter things; looking for contributions here!
+
+        :param outputs: The results of the requested outputs.
+        :return: a polars DataFrame.
+        """
+        if len(outputs) == 1:
+            (value,) = outputs.values()  # this works because it's length 1.
+            if isinstance(value, daft.DataFrame):  # it's a dataframe
+                return value
+            elif not isinstance(value, daft.DataFrame):  # it's a single scalar/object
+                key, value = outputs.popitem()
+                return daft.DataFrame({key: [value]})
+            else:  # it's a series
+                return daft.DataFrame(outputs)
+        # TODO: check for length of outputs and determine what should
+        # happen for mixed outputs that include scalars for example.
+        return daft.DataFrame(outputs)
+
+    def output_type(self) -> Type:
+        return daft.DataFrame