WIP: placeholder places for hooks
diff --git a/hamilton/driver.py b/hamilton/driver.py
index b9f0652..b5f5379 100644
--- a/hamilton/driver.py
+++ b/hamilton/driver.py
@@ -160,6 +160,7 @@
) -> Dict[str, Any]:
"""Basic executor for a function graph. Does no task-based execution, just does a DFS
and executes the graph in order, in memory."""
+ print("OnTaskStart(run_id, task_id, nodes, inputs, overrides]")
memoized_computation = dict() # memoized storage
nodes = [fg.nodes[node_name] for node_name in final_vars]
fg.execute(nodes, memoized_computation, overrides, inputs)
@@ -167,6 +168,8 @@
final_var: memoized_computation[final_var] for final_var in final_vars
} # only want request variables in df.
del memoized_computation # trying to cleanup some memory
+ print("OnTaskEnd(run_id, task_id, status)") # we skip adapter
+ # TODO: try catch to capture life cycle hook? (or do that elsewhere?)
return outputs
@@ -288,6 +291,7 @@
self.graph_modules = modules
try:
self.graph = graph.FunctionGraph.from_modules(*modules, config=config, adapter=adapter)
+ print("OnGraphBuild(self.graph, modules, config)")
self.adapter = adapter
except Exception as e:
error = telemetry.sanitize_error(*sys.exc_info())
@@ -403,6 +407,9 @@
error = None
_final_vars = self._create_final_vars(final_vars)
try:
+ print(
+ "OnStartExecution(run_id, final_vars, overrides, inputs) # run tags via other means"
+ )
outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
result = self.adapter.build_result(**outputs)
return result
@@ -413,6 +420,7 @@
raise e
finally:
duration = time.time() - start_time
+ print("OnEndExecution(run_id, status)")
self.capture_execute_telemetry(
error, _final_vars, inputs, overrides, run_successful, duration
)
diff --git a/hamilton/execution/executors.py b/hamilton/execution/executors.py
index e98bc37..7541c63 100644
--- a/hamilton/execution/executors.py
+++ b/hamilton/execution/executors.py
@@ -90,6 +90,7 @@
:param task: task to execute.
:return: a diciontary of the results of all the nodes in that task's nodes to compute.
"""
+ print("OnTaskStart(run_id, task_id)")
# We do this as an edge case to force the callable to return a list if it is an expand,
# and would normally return a generator. That said, we will likely remove this in the future --
# its an implementation detail, and a true queuing system between nodes/controller would mean
@@ -114,6 +115,7 @@
for key, value in out.items()
if key in task.outputs_to_compute or key in task.overrides
}
+ print("OnTaskEnd(run_id, task_id, status)")
return final_retval
@@ -344,8 +346,10 @@
task_executor = execution_manager.get_executor_for_task(next_task)
if task_executor.can_submit_task():
try:
+ print("OnTaskCreate(run_id, task_id, nodes)")
submitted = task_executor.submit_task(next_task)
except Exception as e:
+ print("OnTaskEnd(run_id, task_id, status=Failure)")
logger.exception(
f"Exception submitting task {next_task.task_id}, with nodes: "
f"{[item.name for item in next_task.nodes]}"
@@ -364,6 +368,7 @@
result = task_future.get_result()
execution_state.update_task_state(task_name, state, result)
if TaskState.is_terminal(state):
+ print("OnTaskEnd(run_id, task_id, status=?)")
del task_futures[task_name]
logger.info(f"Graph is done, graph state is {execution_state.get_graph_state()}")
finally:
diff --git a/hamilton/execution/graph_functions.py b/hamilton/execution/graph_functions.py
index fcdd91e..c6bf7f5 100644
--- a/hamilton/execution/graph_functions.py
+++ b/hamilton/execution/graph_functions.py
@@ -144,12 +144,17 @@
if dependency.name in computed:
kwargs[dependency.name] = computed[dependency.name]
try:
+ print("OnNodeStart(run_id, node, kwargs)")
+ print("compute_node hook.")
value = adapter.execute_node(node_, kwargs)
+ print("OnNodeEnd(run_id, node, value, status=SUCCESS)")
except Exception:
message = f"> Node {node_.name} encountered an error <"
border = "*" * len(message)
logger.exception("\n" + border + "\n" + message + "\n" + border)
+ print("OnNodeEnd(run_id, node, status=Failure)")
raise
+
computed[node_.name] = value
# > pruning the graph
# This doesn't narrow it down to the entire space of the graph
diff --git a/hamilton/graph.py b/hamilton/graph.py
index ac9fc17..e72b44c 100644
--- a/hamilton/graph.py
+++ b/hamilton/graph.py
@@ -854,6 +854,7 @@
if inputs is None:
inputs = {}
inputs = combine_config_and_inputs(self.config, inputs)
+ # prune graph
return execute_subdag(
nodes=nodes,
inputs=inputs,