| { |
| "cells": [ |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "cb3cf1fc", |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Execute this cell to install dependencies\n", |
| "%pip install sf-hamilton[sdk,visualization] falkordb openai" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "4682d46e", |
| "metadata": {}, |
| "source": [ |
| "# Ingestion Notebook [](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/LLM_Workflows/GraphRAG/ingest_notebook.ipynb) [](https://github.com/dagworks-inc/hamilton/blob/main/examples/LLM_Workflows/GraphRAG/ingest_notebook.ipynb)\n", |
| "\n", |
| "\n", |
| "In this notebook we see how to load data into FalkorDB using [Hamilton](https://github.com/dagWorks-Inc/hamilton)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "id": "9c81ff55", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:05:26.275402Z", |
| "start_time": "2024-05-26T05:05:25.579866Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "# load jupyter magic to help display Hamilton in a notebook\n", |
| "%load_ext hamilton.plugins.jupyter_magic" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "ee990a80", |
| "metadata": {}, |
| "source": [ |
| "## Fighter ingestion pipeline\n", |
| "\n", |
| "We define loading a CSV, then loading each row/fighter into FalkorDB.\n", |
| "\n", |
| "We use Hamilton to parallelize & iterate over each fighter to keep our code more unit testable and manageable. " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "id": "cd97754d", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:12:04.660817Z", |
| "start_time": "2024-05-26T05:12:04.295564Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "%%cell_to_module -m load_fighters --display\n", |
| "\n", |
| "import pandas as pd\n", |
| "from hamilton.htypes import Parallelizable, Collect\n", |
| "import falkordb\n", |
| "import utils\n", |
| "\n", |
| "def raw_fighter_details() -> pd.DataFrame:\n", |
| " \"\"\"Load fighter data\"\"\"\n", |
| " _df = pd.read_csv('../data/raw_fighter_details.csv')\n", |
| " return _df\n", |
| "\n", |
| "def fighter(raw_fighter_details: pd.DataFrame) -> Parallelizable[pd.Series]:\n", |
| " \"\"\"We then want to do something for each record. That's what this code sets up\"\"\"\n", |
| " for idx, row in raw_fighter_details.iterrows():\n", |
| " yield row\n", |
| "\n", |
| "def record(fighter: pd.Series) -> dict:\n", |
| " \"\"\"Given a single fighter record, process it into a dictionary.\"\"\"\n", |
| " attrs = {}\n", |
| " attrs['Name'] = fighter.fighter_name\n", |
| " \n", |
| " if isinstance(fighter.Height, str) and fighter.Height != \"\":\n", |
| " attrs['Height'] = utils.height_to_cm(fighter.Height)\n", |
| " \n", |
| " if isinstance(fighter.Weight, str) and fighter.Weight != \"\":\n", |
| " Weight = int(fighter.Weight.replace(' lbs.', ''))\n", |
| " attrs['Weight'] = Weight\n", |
| " \n", |
| " if isinstance(fighter.Reach, str) and fighter.Reach != \"\":\n", |
| " attrs['Reach'] = utils.reach_to_cm(fighter.Reach)\n", |
| " \n", |
| " if isinstance(fighter.Stance, str) and fighter.Stance != \"\":\n", |
| " attrs['Stance'] = fighter.Stance\n", |
| " \n", |
| " if isinstance(fighter.DOB, str) and fighter.DOB != \"\":\n", |
| " attrs['DOB'] = utils.date_to_timestamp(fighter.DOB)\n", |
| " \n", |
| " # Significant Strikes Landed per Minute\n", |
| " attrs['SLpM'] = float(fighter.SLpM)\n", |
| " \n", |
| " # Strike accuracy\n", |
| " attrs['Str_Acc'] = utils.percentage_to_float(fighter.Str_Acc)\n", |
| " \n", |
| " # Significant Strikes Absorbed per Minute.\n", |
| " attrs['SApM'] = float(fighter.SApM)\n", |
| " \n", |
| " # strikes defended\n", |
| " attrs['Str_Def'] = utils.percentage_to_float(fighter.Str_Def)\n", |
| " \n", |
| " # Takedown average\n", |
| " attrs['TD_Avg'] = float(fighter.TD_Avg)\n", |
| " \n", |
| " # Takedown accuracy\n", |
| " attrs['TD_Acc'] = utils.percentage_to_float(fighter.TD_Acc)\n", |
| " \n", |
| " # Takedown defense\n", |
| " attrs['TD_Def'] = utils.percentage_to_float(fighter.TD_Def)\n", |
| " \n", |
| " # Submission average\n", |
| " attrs['Sub_Avg'] = float(fighter.Sub_Avg)\n", |
| " return attrs\n", |
| "\n", |
| "def write_to_graph(record: Collect[dict], graph: falkordb.Graph) -> int:\n", |
| " \"\"\"Take all records and then push to the DB\"\"\"\n", |
| " records = list(record)\n", |
| " # Load all fighters in one go.\n", |
| " q = \"UNWIND $fighters as fighter CREATE (f:Fighter) SET f = fighter\"\n", |
| " graph.query(q, {'fighters': records})\n", |
| " return len(records)\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "f56e3fa6", |
| "metadata": {}, |
| "source": [ |
| "# Fights ingestion pipeline\n", |
| "\n", |
| "This code then takes the fight data and creates the graph structure required by\n", |
| "FalkorDB to link things together.\n", |
| "\n", |
| "It then uses the Hamilton construct to iterate over each fight and push that individually\n", |
| "to FalkorDB." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "id": "c1ae988c", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:44:01.899490Z", |
| "start_time": "2024-05-26T05:44:01.430713Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "%%cell_to_module -m load_fights --display\n", |
| "\n", |
| "import pandas as pd\n", |
| "import utils\n", |
| "from hamilton.htypes import Parallelizable, Collect\n", |
| "import falkordb\n", |
| "\n", |
| "def raw_total_fight_data() -> pd.DataFrame:\n", |
| " \"\"\"Loads the raw fight data\"\"\"\n", |
| " _df = pd.read_csv('../data/raw_total_fight_data.csv', delimiter=\";\")\n", |
| " return _df\n", |
| "\n", |
| "def transformed_data(raw_total_fight_data: pd.DataFrame) -> pd.DataFrame:\n", |
| " \"\"\"Does some light transformation on the data and adds some columns\"\"\"\n", |
| " _df = raw_total_fight_data\n", |
| " _df.last_round = _df.last_round.astype(int)\n", |
| " _df.last_round_time = _df.last_round_time.apply(lambda x: utils.time_to_seconds(x))\n", |
| " _df.date = _df.date.apply(lambda x: utils.date_to_timestamp(x))\n", |
| " _df[\"Loser\"] = _df.apply(\n", |
| " lambda x: x[\"B_fighter\"] if x[\"Winner\"] == x[\"R_fighter\"] else x[\"R_fighter\"], axis=1)\n", |
| " return _df\n", |
| "\n", |
| "\n", |
| "def columns_of_interest() -> list[str]:\n", |
| " \"\"\"Returns the columns that we're interested in processing\"\"\"\n", |
| " return [\n", |
| " \"R_fighter\", \"B_fighter\", \n", |
| " # R_KD, B_KD, R_SIG_STR.,B_SIG_STR.,\n", |
| " # R_SIG_STR_pct, B_SIG_STR_pct, R_TOTAL_STR., B_TOTAL_STR.,\n", |
| " # R_TD, B_TD, R_TD_pct, B_TD_pct, R_SUB_ATT, B_SUB_ATT,\n", |
| " # R_REV, B_REV, R_CTRL, B_CTRL, R_HEAD, B_HEAD, R_BODY,\n", |
| " # B_BODY, R_LEG, B_LEG, R_DISTANCE, B_DISTANCE, R_CLINCH,\n", |
| " # B_CLINCH, R_GROUND, B_GROUND, win_by, \n", |
| " \"last_round\",\n", |
| " \"last_round_time\", \"Format\", \"Referee\", \"date\", \"location\",\n", |
| " \"Fight_type\", \"Winner\", \"Loser\"\n", |
| " ]\n", |
| "\n", |
| "\n", |
| "def fight(transformed_data: pd.DataFrame, \n", |
| " columns_of_interest: list[str],\n", |
| " ) -> Parallelizable[pd.Series]:\n", |
| " \"\"\"Enables us to process each fight. We pass in a client along with each row\"\"\"\n", |
| " for _idx, _row in transformed_data[columns_of_interest].iterrows():\n", |
| " yield _row\n", |
| "\n", |
| "def write_to_graph(fight: pd.Series, graph: falkordb.Graph) -> str:\n", |
| " _row, _graph = fight, graph\n", |
| " # create referee\n", |
| " q = \"MERGE (:Referee {Name: $name})\"\n", |
| " _graph.query(q, {'name': _row.Referee if isinstance(_row.Referee, str) else \"\"})\n", |
| "\n", |
| " # create card\n", |
| " q = \"MERGE (c:Card {Date: $date, Location: $location})\"\n", |
| " _graph.query(q, {'date': _row.date, 'location': _row.location})\n", |
| "\n", |
| " # create fight\n", |
| " q = \"\"\"MATCH (c:Card {Date: $date, Location: $location})\n", |
| " MATCH (ref:Referee {Name: $referee})\n", |
| " MATCH (r:Fighter {Name:$R_fighter})\n", |
| " MATCH (b:Fighter {Name:$B_fighter})\n", |
| " CREATE (f:Fight)-[:PART_OF]->(c)\n", |
| " SET f = $fight\n", |
| " CREATE (f)-[:RED]->(r)\n", |
| " CREATE (f)-[:BLUE]->(b)\n", |
| " CREATE (ref)-[:REFEREED]->(f)\n", |
| " RETURN ID(f)\n", |
| " \"\"\"\n", |
| " f_id = _graph.query(q, {'date': _row.date, 'location': _row.location,\n", |
| " 'referee': _row.Referee if isinstance(_row.Referee, str) else \"\", 'R_fighter': _row.R_fighter,\n", |
| " 'B_fighter': _row.B_fighter, 'fight': {'Last_round': _row.last_round,\n", |
| " 'Last_round_time': _row.last_round_time, 'Format': _row.Format,\n", |
| " 'Fight_type': _row.Fight_type}\n", |
| " }).result_set[0][0]\n", |
| " \n", |
| " # mark winner & loser\n", |
| " q = \"\"\"MATCH (f:Fight) WHERE ID(f) = $fight_id\n", |
| " MATCH (l:Fighter {Name:$loser})\n", |
| " MATCH (w:Fighter {Name:$winner})\n", |
| " CREATE (w)-[:WON]->(f), (l)-[:LOST]->(f)\n", |
| " \"\"\"\n", |
| " _graph.query(q, {'fight_id': f_id, 'loser': _row.Loser, 'winner': _row.Winner if isinstance(_row.Winner, str) else \"\"})\n", |
| " return \"success\"\n", |
| "\n", |
| "def collect_writes(write_to_graph: Collect[str]) -> int:\n", |
| " return len(list(write_to_graph))" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "8b1a2966", |
| "metadata": {}, |
| "source": [ |
| "# Exercise the pipelines\n", |
| "\n", |
| "We set up the requisite objects and clients to run the pipelines." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "id": "9c7fdd81", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:44:05.459158Z", |
| "start_time": "2024-05-26T05:44:05.448476Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "import falkordb\n", |
| "from hamilton import driver\n", |
| "from hamilton.execution import executors\n", |
| "\n", |
| "db = falkordb.FalkorDB(host='localhost', port=6379)\n", |
| "g = db.select_graph(\"UFC\")\n", |
| "\n", |
| "# Clear previous graph\n", |
| "if \"UFC\" in db.list_graphs():\n", |
| " g.delete()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "id": "0836b087", |
| "metadata": {}, |
| "source": [ |
| "### Note about the Hamilton UI\n", |
| "Hamilton comes with an [open source UI](https://hamilton.dagworks.io/en/latest/hamilton-ui/) that you can\n", |
| "surface information about your Hamilton executions. If you pull the docker containers\n", |
| "locally and uncomment adding the HamiltonTracker, then you'll see runs logged to it." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 8, |
| "id": "f5443a7f", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:44:33.832688Z", |
| "start_time": "2024-05-26T05:44:06.038046Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "# if you have the Hamilton UI you can see progress:\n", |
| "# from hamilton_sdk import adapters\n", |
| "# tracker = adapters.HamiltonTracker(\n", |
| "# project_id=44, # modify this as needed\n", |
| "# username=\"elijah@dagworks.io\",\n", |
| "# dag_name=\"load_fighters\",\n", |
| "# tags={\"environment\": \"DEV\", \"team\": \"MY_TEAM\", \"version\": \"X\"}\n", |
| "# )\n", |
| "\n", |
| "\n", |
| "# build the hamilton Driver\n", |
| "fighter_loader = (\n", |
| " driver.Builder()\n", |
| " .with_modules(load_fighters)\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_remote_executor(executors.MultiThreadingExecutor(5))\n", |
| " # .with_adapters(tracker) # <--- uncomment this to add tracker.\n", |
| " .build()\n", |
| ")\n", |
| "\n", |
| "fighter_loader.execute([\"write_to_graph\"], inputs={\"graph\": g})" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "id": "3107f89b", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:49:42.579219Z", |
| "start_time": "2024-05-26T05:44:33.835479Z" |
| } |
| }, |
| "outputs": [], |
| "source": [ |
| "# if you have the Hamilton UI you can see progress:\n", |
| "# from hamilton_sdk import adapters\n", |
| "# tracker = adapters.HamiltonTracker(\n", |
| "# project_id=44, # modify this as needed\n", |
| "# username=\"elijah@dagworks.io\",\n", |
| "# dag_name=\"load_fights\",\n", |
| "# tags={\"environment\": \"DEV\", \"team\": \"MY_TEAM\", \"version\": \"X\"}\n", |
| "# )\n", |
| "\n", |
| "fights_loader = (\n", |
| " driver.Builder()\n", |
| " .with_modules(load_fights)\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_remote_executor(executors.MultiThreadingExecutor(5)) # this will do 5 concurrent inserts\n", |
| " # .with_adapters(tracker) # <--- uncomment this to add tracker.\n", |
| " .build()\n", |
| ")\n", |
| "fights_loader.execute([\"collect_writes\"], inputs={\"graph\": g})" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "b0a0d9fe", |
| "metadata": { |
| "ExecuteTime": { |
| "end_time": "2024-05-26T05:49:42.591409Z", |
| "start_time": "2024-05-26T05:49:42.580724Z" |
| } |
| }, |
| "outputs": [], |
| "source": [] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "id": "4c83c11d", |
| "metadata": {}, |
| "outputs": [], |
| "source": [] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3 (ipykernel)", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.10.13" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 5 |
| } |