Adds read_after_write flag
Rather than using persist with pyspark, you could just
tell Spark to read from it.
diff --git a/hamilton/experimental/h_cache.py b/hamilton/experimental/h_cache.py
index 18b24ba..ba0a74c 100644
--- a/hamilton/experimental/h_cache.py
+++ b/hamilton/experimental/h_cache.py
@@ -298,6 +298,7 @@
writers: Optional[Dict[str, Callable[[Any, str, str], None]]] = None,
readers: Optional[Dict[str, Callable[[Any, str], Any]]] = None,
read_kwargs: Optional[Dict[str, Any]] = None,
+ read_after_write: bool = False,
**kwargs,
):
"""Constructs the adapter.
@@ -318,7 +319,7 @@
self.readers = readers or {}
self.read_kwargs = read_kwargs or {}
-
+ self.read_after_write = read_after_write
self._init_default_readers_writers()
def _init_default_readers_writers(self):
@@ -385,6 +386,10 @@
cache_format,
)
self._write_cache(cache_format, result, filepath, node.name)
+ if self.read_after_write:
+ # this could be useful for delayed execution type things as a means to reset
+ # that they have set internally
+ result = self._read_cache(cache_format, result, filepath)
self.computed_nodes.add(node.name)
return result
empty_expected_type = self._get_empty_expected_type(node.type)