| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "## A basic notebook to run the pipeline defined in `doc_pipeline.py`.\n", |
| "\n", |
| "By default this runs parts of the pipeline in parallel using threads or processes.\n", |
| "\n", |
| "To scale processing here look at all the subsequent cells that show how to run on \n", |
| " ray or dask. For spark see spark/notebook.ipynb." |
| ], |
| "metadata": { |
| "collapsed": false |
| }, |
| "id": "c174ce5a23eed9a1" |
| }, |
| { |
| "cell_type": "code", |
| "outputs": [], |
| "source": [ |
| "import doc_pipeline\n", |
| "\n", |
| "from hamilton import driver\n", |
| "from hamilton.execution import executors\n", |
| "\n", |
| "dr = (\n", |
| " driver.Builder()\n", |
| " .with_modules(doc_pipeline)\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_config({})\n", |
| " .with_local_executor(executors.SynchronousLocalTaskExecutor())\n", |
| " # Choose a backend to process the parallel parts of the pipeline\n", |
| " .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n", |
| " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n", |
| " .build()\n", |
| ")\n", |
| "dag = dr.display_all_functions()\n", |
| "result = dr.execute(\n", |
| " [\"collect_chunked_url_text\"],\n", |
| " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n", |
| ")\n", |
| "# do something with the result...\n", |
| "import pprint\n", |
| "\n", |
| "for chunk in result[\"collect_chunked_url_text\"]:\n", |
| " pprint.pprint(chunk)\n", |
| "dag" |
| ], |
| "metadata": { |
| "collapsed": true |
| }, |
| "id": "initial_id", |
| "execution_count": 0 |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Ray" |
| ], |
| "metadata": { |
| "collapsed": false |
| }, |
| "id": "7bc40e6914aed330" |
| }, |
| { |
| "cell_type": "code", |
| "outputs": [], |
| "source": [ |
| "import logging\n", |
| "\n", |
| "import doc_pipeline\n", |
| "import ray\n", |
| "\n", |
| "from hamilton import driver, log_setup\n", |
| "from hamilton.plugins import h_ray\n", |
| "\n", |
| "log_setup.setup_logging(logging.INFO)\n", |
| "ray.init()\n", |
| "\n", |
| "dr = (\n", |
| " driver.Builder()\n", |
| " .with_modules(doc_pipeline)\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_config({})\n", |
| " # Choose a backend to process the parallel parts of the pipeline\n", |
| " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n", |
| " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n", |
| " .with_remote_executor(\n", |
| " h_ray.RayTaskExecutor()\n", |
| " ) # be sure to run ray.init() or pass in config.\n", |
| " .build()\n", |
| ")\n", |
| "dag = dr.display_all_functions()\n", |
| "result = dr.execute(\n", |
| " [\"collect_chunked_url_text\"],\n", |
| " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n", |
| ")\n", |
| "# do something with the result...\n", |
| "import pprint\n", |
| "\n", |
| "for chunk in result[\"collect_chunked_url_text\"]:\n", |
| " pprint.pprint(chunk)\n", |
| "\n", |
| "ray.shutdown()\n", |
| "dag" |
| ], |
| "metadata": { |
| "collapsed": false |
| }, |
| "id": "a4df6e50283f68ab" |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Dask" |
| ], |
| "metadata": { |
| "collapsed": false |
| }, |
| "id": "46aa4763a337dcb1" |
| }, |
| { |
| "cell_type": "code", |
| "outputs": [], |
| "source": [ |
| "import logging\n", |
| "\n", |
| "import doc_pipeline\n", |
| "from dask import distributed\n", |
| "\n", |
| "from hamilton import driver, log_setup\n", |
| "from hamilton.plugins import h_dask\n", |
| "\n", |
| "log_setup.setup_logging(logging.INFO)\n", |
| "\n", |
| "cluster = distributed.LocalCluster()\n", |
| "client = distributed.Client(cluster)\n", |
| "remote_executor = h_dask.DaskExecutor(client=client)\n", |
| "\n", |
| "dr = (\n", |
| " driver.Builder()\n", |
| " .with_modules(doc_pipeline)\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_config({})\n", |
| " # Choose a backend to process the parallel parts of the pipeline\n", |
| " # .with_remote_executor(executors.MultiThreadingExecutor(max_tasks=5))\n", |
| " # .with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))\n", |
| " .with_remote_executor(h_dask.DaskExecutor(client=client))\n", |
| " .build()\n", |
| ")\n", |
| "dag = dr.display_all_functions()\n", |
| "result = dr.execute(\n", |
| " [\"collect_chunked_url_text\"],\n", |
| " inputs={\"chunk_size\": 256, \"chunk_overlap\": 32},\n", |
| ")\n", |
| "# do something with the result...\n", |
| "import pprint\n", |
| "\n", |
| "for chunk in result[\"collect_chunked_url_text\"]:\n", |
| " pprint.pprint(chunk)\n", |
| "\n", |
| "client.shutdown()\n", |
| "dag" |
| ], |
| "metadata": { |
| "collapsed": false |
| }, |
| "id": "103824eec22810fe" |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 2 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython2", |
| "version": "2.7.6" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |