blob: 5b9bc5f9eaac4f960f82f84244f63fdb10880a1d [file] [log] [blame]
Customizing Execution
----------------------------------
The Driver
----------------------------------
The Hamilton Driver by default has the following behaviors:
#. It is single threaded, and runs on the machine you call execute from.
#. It is limited to the memory available on your machine.
#. ``execute()`` by default returns a pandas DataFrame.
To change these behaviors, we need to introduce two concepts:
#. A Result Builder -- this is how we tell Hamilton what kind of object we want to return when we call ``execute()``.
#. A Graph Adapters -- this is how we tell Hamilton where and how functions should be executed.
Result Builders
###############
In effect, this is a class with a static function, that takes a dictionary of computed results, and turns it into
something.
.. code-block:: python
class ResultMixin(object):
"""Base class housing the static function.
Why a static function? That's because certain frameworks can only pickle a static function, not an entire
object.
"""
@staticmethod
@abc.abstractmethod
def build_result(**outputs: typing.Dict[str, typing.Any]) -> typing.Any:
"""This function builds the result given the computed values."""
pass
So we have a few implementations see :doc:`../../reference/result-builders/index` for the list.
To use it, it needs to be paired with a GraphAdapter - onto the next section!
Graph Adapters
##############
Graph Adapters `adapt` the Hamilton DAG, and change how it is executed. They all implement a single interface called
``base.HamiltonGraphAdapter``. They are called internally by Hamilton at the right points in time to make execution
work. The link with the Result Builders, is that GraphAdapters need to implement a ``build_result()`` function
themselves.
.. code-block:: python
class HamiltonGraphAdapter(ResultMixin):
"""Any GraphAdapters should implement this interface to adapt the HamiltonGraph for that particular context.
Note since it inherits ResultMixin -- HamiltonGraphAdapters need a `build_result` function too.
"""
# four functions not shown
The default GraphAdapter is the ``base.SimplePythonDataFrameGraphAdapter`` which by default makes Hamilton try to build
a ``pandas.DataFrame`` when ``.execute()`` is called.
If you want to tell Hamilton to return something else, we suggest starting with the ``base.SimplePythonGraphAdapter``
and writing a simple class & function that implements the ``base.ResultMixin`` interface and passing that in. See
:doc:`../reference/graph-adapters/index` and
:doc:`../reference/result-builders/index` for options.
Otherwise, let's quickly walk through some options on how to execute a Hamilton DAG.
Local Execution
***************
You have two options:
#. Do nothing -- and you'll get ``base.SimplePythonDataFrameGraphAdapter`` by default.
#. Use ``base.SimplePythonGraphAdapter`` and pass in a subclass of ``base.ResultMixin`` (you can create your own), and then pass that to the constructor of the Driver.
e.g.
.. code-block:: python
adapter = base.SimplePythonGraphAdapter(base.DictResult())
dr = driver.Driver(..., adapter=adapter)
By passing in ``base.DictResult()`` we are telling Hamilton that the result of ``execute()`` should be a dictionary with
a map of ``output`` to computed result.
Note that the above is the most common method of executing Hamilton DAGs. You can also use `base.DefaultAdapter`
to get a `SimplePythonGraphAdapter` with a `DictResult`.
Dynamic DAGs/Parallel Execution
----------------------------------
Hamilton now has pluggable execution, which allows for the following:
1. Grouping of nodes into "tasks" (discrete execution unit between serialization boundaries)
2. Executing the tasks in parallel, using any executor of your choice
You can run this executor using the `Builder`, a utility class that allows you to build a driver piece by piece.
Note that you currently have to call `enable_dynamic_execution(allow_experimental_mode=True)`
which will toggle it to use the `V2` executor. Then, you can:
1. Add task executors to specify how to run the tasks
2. Add node gropuing strategies
3. Add modules to crawl for functions
4. Add a results builder to shape the results
Either constructing the driver, or using the builder and `not` calling `enable_dynamic_execution` will give you the standard executor.
We highly recommend you use the builder pattern -- while the constructor of the `Driver` will be fully
backwards compatible according to the rules of semantic versioning, we may change it in the future (for 2.0).
Note that the new executor is required to handle dynamic creation of nodes (E.G. using `Parallelizable[]` and `Collect[]`.
Let's look at an example of the driver:
.. code-block:: python
from my_code import foo_module, bar_module
from hamilton import driver
from hamilton.execution import executors
dr = (
driver.Builder()
.with_modules(foo_module)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({"config_key": "config_value"})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
dr.execute(["my_variable"], inputs={...}, overrides={...})
Note that we set a `remote` executor, and a local executor. While you can bypass this and instead set an `execution_manager`
in the builder call (see :doc:`../reference/drivers/Driver` for documentation on the `Builder`),this goes along with the default grouping strategy,
which is to place each node in its own group, except for
dynamically generated (`Parallelizable[]`) blocks, which are each made into one group, and executed locally.
Thus, when you write a DAG like this (a simple map-reduce pattern):
.. code-block:: python
from hamilton.htypes import Parallelizable
def url() -> Parallelizable[str]:
for url_ in _list_all_urls():
yield url_
def url_loaded(url: str) -> str:
return _load(urls)
def counts(url_loaded: str) -> str:
return len(url_loaded.split(" "))
def total_words(counts: Collect[int]) -> int:
return sum(counts)
The block containing `counts` and `url_loaded` will get marked as one task, repeated for each URL in url_loaded,
and run on the remote executor (which in this case is the `ThreadPoolExecutor`).
Note that we currently have the following caveats:
1. No nested `Parallelizable[]`/`Collect[]` blocks -- we only allow one level of parallelization
2. Serialization for `Multiprocessing` is suboptimal -- we currently use the default `pickle` serializer, which breaks with certain cases. Ray, Dask, etc... all work well, and we plan to add support for joblib + cloudpickle serialization.
3. `Collect[]` input types are limited to one per function -- this is another caveat that we intend to get rid of, but for now you'll want to concat/put into one function before collecting.