blob: a9942deb4ed8e055d6e6e1d77ad8191c68d99b8e [file] [log] [blame]
"""
A basic script to run the pipeline defined in `doc_pipeline.py`.
By default this runs parts of the pipeline in parallel using threads or processes.
To choose threads or processed uncomment the appropriate line in the `Builder` below.
To scale processing here, see `run_ray.py`, `run_dask.py`, and `spark/spark_pipeline.py`.
"""
import doc_pipeline
from hamilton import driver
from hamilton.execution import executors
if __name__ == "__main__":
dr = (
driver.Builder()
.with_modules(doc_pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
# Choose a backend to process the parallel parts of the pipeline
.with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))
# .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
# do something with the result...
import pprint
for chunk in result["collect_chunked_url_text"]:
pprint.pprint(chunk)