blob: 8aabef389b37849d70cc7830cfcd6d720d83e5bb [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"id": "9b28f411",
"metadata": {},
"source": [
"# Hamilton + Prefect\n",
"\n",
"\n",
"#### Requirements:\n",
"\n",
"- Set up Prefect \n",
"\n",
"- Install dependencies (listed in `requirements.txt`)\n",
"\n",
"More details on how to set up your environment can be found [here](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/prefect/README.md#prefect-setup).\n",
"\n",
"***\n",
"\n",
"Uncomment and run the cell below if you are in a Google Colab environment. It will:\n",
"1. Mount google drive. You will be asked to authenticate and give permissions.\n",
"2. Change directory to google drive.\n",
"3. Make a directory \"hamilton-tutorials\"\n",
"4. Change directory to it.\n",
"5. Clone this repository to your google drive\n",
"6. Move your current directory to the hello_world example\n",
"7. Install requirements.\n",
"\n",
"This means that any modifications will be saved, and you won't lose them if you close your browser."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "234ea58e",
"metadata": {},
"outputs": [],
"source": [
"## 1. Mount google drive\n",
"# from google.colab import drive\n",
"# drive.mount('/content/drive')\n",
"## 2. Change directory to google drive.\n",
"# %cd /content/drive/MyDrive\n",
"## 3. Make a directory \"hamilton-tutorials\"\n",
"# !mkdir hamilton-tutorials\n",
"## 4. Change directory to it.\n",
"# %cd hamilton-tutorials\n",
"## 5. Clone this repository to your google drive\n",
"# !git clone https://github.com/DAGWorks-Inc/hamilton/\n",
"## 6. Move your current directory to the hello_world example\n",
"# %cd hamilton/examples/hello_world\n",
"## 7. Install requirements.\n",
"# %pip install -r requirements.txt\n",
"# clear_output() # optionally clear outputs\n",
"# To check your current working directory you can type `!pwd` in a cell and run it."
]
},
{
"cell_type": "markdown",
"id": "b34cd35f",
"metadata": {},
"source": [
"***\n",
"\n",
"In this example, were going to show how to run a simple `data preprocessing -> model training -> model evaluation` workflow using Hamilton within Prefect tasks.\n",
"\n",
"The functions that support this workflow are logically groupped in the modules `prepare_data`, `train_model`, and `evaluate_model` imported below.\n",
"\n",
"***"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "d80a32fb",
"metadata": {},
"outputs": [],
"source": [
"# We use the autoreload extension that comes with ipython to automatically reload modules when\n",
"# the code in them changes.\n",
"\n",
"# import the jupyter extension\n",
"%load_ext autoreload\n",
"# set it to only reload the modules imported\n",
"%autoreload 1\n",
"# import the function modules you want to reload when they change.\n",
"# i.e. these should be your modules you write your functions in. As you change them,\n",
"# they will be reimported without you having to do anything.\n",
"%aimport prepare_data\n",
"%aimport train_model\n",
"%aimport evaluate_model\n",
"\n",
"import pandas as pd\n",
"from prefect import flow, task\n",
"from hamilton import base, driver"
]
},
{
"cell_type": "markdown",
"id": "9ad8f61e",
"metadata": {},
"source": [
"***\n",
"The Prefect workflow has 2 tasks: `prepare_data_task` and `train_and_evaluate_model_task` that defines how/where our modular functions should be executed.\n",
"***"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "78de12b1",
"metadata": {},
"outputs": [],
"source": [
"# use the @task to define Prefect tasks, which adds logging, retries, etc.\n",
"# the function parameters define the config and inputs needed by Hamilton\n",
"@task\n",
"def prepare_data_task(\n",
" raw_data_location: str,\n",
" hamilton_config: dict,\n",
" label: str,\n",
" results_dir: str,\n",
") -> str:\n",
" \"\"\"Load external data, preprocess dataset, and store cleaned data\"\"\"\n",
" raw_df = pd.read_csv(raw_data_location, sep=\";\")\n",
"\n",
" dr = driver.Driver(hamilton_config, prepare_data)\n",
"\n",
" # prepare_data.ALL_FEATURES is a constant defined in the module\n",
" features_df = dr.execute(\n",
" final_vars=prepare_data.ALL_FEATURES + [label],\n",
" inputs={\"raw_df\": raw_df},\n",
" )\n",
" \n",
" # uncomment these lines to produce a local DAG visualization file:\n",
" # dr.visualize_execution(\n",
" # final_vars=prepare_data.ALL_FEATURES + [label],\n",
" # inputs={\"raw_df\": raw_df},\n",
" # output_file_path=\"hamilton_dag\",\n",
" # render_kwargs={\"format\": \"png\"},\n",
" # )\n",
"\n",
" # save results to local file; for prod, save to an S3 bucket instead\n",
" features_path = f\"{results_dir}/features.csv\"\n",
" features_df.to_csv(features_path)\n",
"\n",
" return features_path\n",
"\n",
"\n",
"@task\n",
"def train_and_evaluate_model_task(\n",
" features_path: str,\n",
" hamilton_config: str,\n",
" label: str,\n",
" feature_set: list[str],\n",
" validation_user_ids: list[str],\n",
") -> None:\n",
" \"\"\"Train and evaluate machine learning model\"\"\"\n",
" dr = driver.Driver(\n",
" hamilton_config,\n",
" train_model,\n",
" evaluate_model,\n",
" adapter=base.SimplePythonGraphAdapter(base.DictResult()),\n",
" )\n",
"\n",
" dr.execute(\n",
" final_vars=[\"save_validation_preds\", \"model_results\"],\n",
" inputs=dict(\n",
" features_path=features_path,\n",
" label=label,\n",
" feature_set=feature_set,\n",
" validation_user_ids=validation_user_ids,\n",
" ),\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "2b3e0434",
"metadata": {},
"outputs": [],
"source": [
"# use @flow to define the Prefect flow.\n",
"# the function parameters define the config and inputs needed by all tasks\n",
"# this way, we prevent having constants being hardcoded in the flow or task body\n",
"@flow(\n",
" name=\"hamilton-absenteeism-prediction\",\n",
" description=\"Predict absenteeism using Hamilton and Prefect\",\n",
")\n",
"def absenteeism_prediction_flow(\n",
" raw_data_location: str = \"./data/Absenteeism_at_work.csv\",\n",
" feature_set: list[str] = [\n",
" \"age_zero_mean_unit_variance\",\n",
" \"has_children\",\n",
" \"has_pet\",\n",
" \"is_summer\",\n",
" \"service_time\",\n",
" ],\n",
" label: str = \"absenteeism_time_in_hours\",\n",
" validation_user_ids: list[str] = [\n",
" \"1\",\n",
" \"2\",\n",
" \"4\",\n",
" \"15\",\n",
" \"17\",\n",
" \"24\",\n",
" \"36\",\n",
" ],\n",
"):\n",
" \"\"\"Predict absenteeism using Hamilton and Prefect\n",
"\n",
" The workflow is composed of 2 tasks, each with its own Hamilton driver.\n",
" Notice that the task `prepare_data_task` relies on the Python module `prepare_data.py`,\n",
" while the task `train_and_evaluate_model_task` relies on two Python modules\n",
" `train_model.py` and `evaluate_model.py`.\n",
" \"\"\"\n",
"\n",
" # the task returns the string value `features_path`, by passing this value\n",
" # to the next task, Prefect is able to generate the dependencies graph\n",
" features_path = prepare_data_task(\n",
" raw_data_location=raw_data_location,\n",
" hamilton_config=dict(\n",
" development_flag=True,\n",
" ),\n",
" label=label,\n",
" results_dir=\"./data\",\n",
" )\n",
"\n",
" train_and_evaluate_model_task(\n",
" features_path=features_path,\n",
" hamilton_config=dict(\n",
" development_flag=True,\n",
" task=\"binary_classification\",\n",
" pred_path=\"./data/predictions.csv\",\n",
" model_config={},\n",
" scorer_name=\"accuracy\",\n",
" bootstrap_iter=1000,\n",
" ),\n",
" label=label,\n",
" feature_set=feature_set,\n",
" validation_user_ids=validation_user_ids,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "be88402d",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:15.077 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | prefect.engine - Created flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> for flow<span style=\"color: #800080; text-decoration-color: #800080; font-weight: bold\"> 'hamilton-absenteeism-prediction'</span>\n",
"</pre>\n"
],
"text/plain": [
"05:33:15.077 | \u001b[36mINFO\u001b[0m | prefect.engine - Created flow run\u001b[35m 'khaki-griffin'\u001b[0m for flow\u001b[1;35m 'hamilton-absenteeism-prediction'\u001b[0m\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:15.090 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - View at <span style=\"color: #0000ff; text-decoration-color: #0000ff\">https://app.prefect.cloud/account/c40f6d89-af0f-4c26-9dc3-3c31718ed274/workspace/af0b793e-5fc7-465b-b2e1-fca738c69108/flow-runs/flow-run/511e17c9-4aaf-4524-92b3-189c1279be60</span>\n",
"</pre>\n"
],
"text/plain": [
"05:33:15.090 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - View at \u001b[94mhttps://app.prefect.cloud/account/c40f6d89-af0f-4c26-9dc3-3c31718ed274/workspace/af0b793e-5fc7-465b-b2e1-fca738c69108/flow-runs/flow-run/511e17c9-4aaf-4524-92b3-189c1279be60\u001b[0m\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:15.658 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - Created task run 'prepare_data_task-0' for task 'prepare_data_task'\n",
"</pre>\n"
],
"text/plain": [
"05:33:15.658 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - Created task run 'prepare_data_task-0' for task 'prepare_data_task'\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:15.663 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - Executing 'prepare_data_task-0' immediately...\n",
"</pre>\n"
],
"text/plain": [
"05:33:15.663 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - Executing 'prepare_data_task-0' immediately...\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:16.280 | <span style=\"color: #d7d700; text-decoration-color: #d7d700\">WARNING</span> | hamilton.telemetry - Note: Hamilton collects completely anonymous data about usage. This will help us improve Hamilton over time. See <span style=\"color: #0000ff; text-decoration-color: #0000ff\">https://github.com/dagworks-inc/hamilton#usage-analytics--data-privacy</span> for details.\n",
"</pre>\n"
],
"text/plain": [
"05:33:16.280 | \u001b[38;5;184mWARNING\u001b[0m | hamilton.telemetry - Note: Hamilton collects completely anonymous data about usage. This will help us improve Hamilton over time. See \u001b[94mhttps://github.com/dagworks-inc/hamilton#usage-analytics--data-privacy\u001b[0m for details.\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:16.494 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Task run 'prepare_data_task-0' - Finished in state <span style=\"color: #008000; text-decoration-color: #008000\">Completed</span>()\n",
"</pre>\n"
],
"text/plain": [
"05:33:16.494 | \u001b[36mINFO\u001b[0m | Task run 'prepare_data_task-0' - Finished in state \u001b[32mCompleted\u001b[0m()\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:16.720 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - Created task run 'train_and_evaluate_model_task-0' for task 'train_and_evaluate_model_task'\n",
"</pre>\n"
],
"text/plain": [
"05:33:16.720 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - Created task run 'train_and_evaluate_model_task-0' for task 'train_and_evaluate_model_task'\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:16.725 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - Executing 'train_and_evaluate_model_task-0' immediately...\n",
"</pre>\n"
],
"text/plain": [
"05:33:16.725 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - Executing 'train_and_evaluate_model_task-0' immediately...\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:17.660 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Task run 'train_and_evaluate_model_task-0' - Finished in state <span style=\"color: #008000; text-decoration-color: #008000\">Completed</span>()\n",
"</pre>\n"
],
"text/plain": [
"05:33:17.660 | \u001b[36mINFO\u001b[0m | Task run 'train_and_evaluate_model_task-0' - Finished in state \u001b[32mCompleted\u001b[0m()\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">05:33:17.867 | <span style=\"color: #008080; text-decoration-color: #008080\">INFO</span> | Flow run<span style=\"color: #800080; text-decoration-color: #800080\"> 'khaki-griffin'</span> - Finished in state <span style=\"color: #008000; text-decoration-color: #008000\">Completed</span>('All states completed.')\n",
"</pre>\n"
],
"text/plain": [
"05:33:17.867 | \u001b[36mINFO\u001b[0m | Flow run\u001b[35m 'khaki-griffin'\u001b[0m - Finished in state \u001b[32mCompleted\u001b[0m('All states completed.')\n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),\n",
" Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"absenteeism_prediction_flow()"
]
},
{
"cell_type": "markdown",
"id": "39201250",
"metadata": {},
"source": [
"***\n",
"For more tips on how to work with Hamilton and Prefect, you can read more [here](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/prefect/README.md#tips)."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "hamilton",
"language": "python",
"name": "hamilton"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}