blob: 132fef1c847d831d086c82959793274f791cb810 [file] [log] [blame]
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Execute this cell to install dependencies\n",
"%pip install sf-hamilton[visualization] dlt[duckdb]>=0.3.12 ibis-framework[duckdb] openai pandas polars"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Slack Summaries [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/dlt/notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dagworks-inc/hamilton/blob/main/examples/dlt/notebook.ipynb)\n",
"\n",
"This notebook shows how to ingest Slack messages and generate threads summaries.\n",
"\n",
"NOTE. This notebook uses data generated on a dummy Slack server, so messages, replies, and summaries aren't really insightful.\n",
"\n",
"## Pre-requisites\n",
"The first step is to `dlt init` the Slack pipeline, but this is already done for this repository. All you need to do is configure your Slack application to get a User OAuth Token (see [docs](https://dlthub.com/docs/dlt-ecosystem/verified-sources/slack)). Then, you can it add it to `.dlt/secrets.toml` as follow:\n",
"\n",
"```toml\n",
"[sources.slack]\n",
"access_token = \"xoxb-...\"\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Extract & Load (dlt)\n",
"The \"extract\" and \"load\" steps consist of getting the raw data to destination for further transformations.\n",
"\n",
"### Set up\n",
"Define your dlt `Pipeline` and `Source`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dlt\n",
"import slack\n",
"\n",
"slack_pipeline = dlt.pipeline(\n",
" pipeline_name=\"slack\",\n",
" destination='duckdb',\n",
" dataset_name=\"slack_data\",\n",
" full_refresh=True,\n",
")\n",
"\n",
"dlt_source = slack.slack_source(\n",
" selected_channels=[\"general\", \"dlt\"],\n",
" replies=True,\n",
")\n",
"\n",
"print(f\"\"\"{slack_pipeline.dataset_name=:}\n",
"{slack_pipeline.pipeline_name=:}\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execution\n",
"Run your dlt pipeline. `load_info` will contain a lot metadata about execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"load_info = slack_pipeline.run(dlt_source)\n",
"print(load_info)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Transform (Hamilton + Ibis)\n",
"\n",
"Now that raw data is loaded (in DuckDB), we use Hamilton to organize a dataflow of Ibis data transformations. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define\n",
"\n",
"In the next cells, we use Python functions to define a Hamilton dataflow. \n",
"\n",
"By using the Hamilton Jupyter plugin, we can define the dataflow interactively. Adding and removing functions to the cell will change its shape. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%load_ext hamilton.plugins.jupyter_magic\n",
"from IPython.display import display"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%cell_to_module -m jupyter_transform -d\n",
"\n",
"import textwrap\n",
"\n",
"import dlt\n",
"import ibis\n",
"import ibis.expr.types as ir\n",
"import openai\n",
"from hamilton.function_modifiers import pipe, source, step\n",
"from hamilton.htypes import Parallelizable, Collect\n",
"\n",
"\n",
"def db_con(pipeline: dlt.Pipeline) -> ibis.BaseBackend:\n",
" backend = ibis.connect(f\"{pipeline.pipeline_name}.duckdb\")\n",
" ibis.set_backend(backend)\n",
" return backend\n",
"\n",
"\n",
"def channel(selected_channels: list[str]) -> Parallelizable[str]:\n",
" for channel in selected_channels:\n",
" yield channel\n",
"\n",
"\n",
"def _epoch_microseconds(timestamp: ir.Column) -> ir.Column:\n",
" seconds_from_epoch = timestamp.epoch_seconds()\n",
" microseconds = timestamp.microsecond() / int(10e5)\n",
" return seconds_from_epoch + microseconds\n",
" \n",
"\n",
"def channel_message(\n",
" channel: str,\n",
" db_con: ibis.BaseBackend, \n",
" pipeline: dlt.Pipeline,\n",
") -> ir.Table:\n",
" \"\"\"Load table containing parent messages of a channel.\n",
" the timestamps `thread_ts` and `ts` are converted to strings.\n",
" `thread_ts` is not None if the message has replies / started a thread. Otherwise,\n",
" `thread_ts` == `ts`. Coalesce is used to fill these None values with `ts`\n",
"\n",
" Slack reference: https://api.slack.com/messaging/retrieving#finding_threads\n",
" \"\"\"\n",
" return (\n",
" db_con.table(\n",
" f\"{channel}_message\",\n",
" schema=pipeline.dataset_name,\n",
" database=pipeline.pipeline_name,\n",
" )\n",
" .mutate(\n",
" thread_ts=_epoch_microseconds(ibis._.thread_ts).cast(str),\n",
" ts=_epoch_microseconds(ibis._.ts).cast(str),\n",
" )\n",
" .mutate(thread_ts=ibis.coalesce(ibis._.thread_ts, ibis._.ts))\n",
" )\n",
"\n",
"\n",
"def channel_replies(\n",
" channel: str,\n",
" db_con: ibis.BaseBackend, \n",
" pipeline: dlt.Pipeline,\n",
") -> ir.Table:\n",
" \"\"\"Create table for replies\"\"\"\n",
" return db_con.table(\n",
" f\"{channel}_replies_message\",\n",
" schema=pipeline.dataset_name,\n",
" database=pipeline.pipeline_name,\n",
" )\n",
" \n",
"\n",
"def channel_threads(channel_message: ir.Table, channel_replies: ir.Table) -> ir.Table:\n",
" \"\"\"Union of parent messages and replies. Sort by thread start, then message timestamp\"\"\"\n",
" columns = [\"channel\", \"thread_ts\", \"ts\", \"user\", \"text\", \"_dlt_load_id\", \"_dlt_id\"]\n",
" return (\n",
" ibis.union(\n",
" channel_message.select(columns),\n",
" channel_replies.select(columns),\n",
" )\n",
" .order_by([ibis._.thread_ts, ibis._.ts])\n",
" )\n",
"\n",
"\n",
"def channels_collection(channel_threads: Collect[ir.Table]) -> ir.Table:\n",
" \"\"\"Collect `channel_threads` for all channels\"\"\"\n",
" return ibis.union(*channel_threads)\n",
"\n",
"\n",
"def _format_messages(threads: ir.Table) -> ir.Table:\n",
" \"\"\"Assign a user id per thread and prefix messages with it\"\"\"\n",
" thread_user_id_expr = (ibis.dense_rank().over(order_by=\"user\") + 1).cast(str)\n",
" return threads.group_by(\"thread_ts\").mutate(\n",
" message=thread_user_id_expr.concat(\": \", ibis._.text)\n",
" )\n",
"\n",
"\n",
"def _aggregate_thread(threads: ir.Table) -> ir.Table:\n",
" \"\"\"Create threads as a single string by concatenating messages\n",
"\n",
" Functions decorates with `@ibis.udf` are loaded by the Ibis backend.\n",
" They aren't meant to be called directly.\n",
" ref: https://ibis-project.org/how-to/extending/builtin\n",
" \"\"\"\n",
"\n",
" @ibis.udf.agg.builtin(name=\"string_agg\")\n",
" def _string_agg(arg, sep: str = \"\\n \") -> str:\n",
" raise NotImplementedError\n",
"\n",
" @ibis.udf.agg.builtin(name=\"array_agg\")\n",
" def _array_agg(arg) -> list[str]:\n",
" raise NotImplementedError\n",
"\n",
" return threads.group_by(\"thread_ts\").agg(\n",
" thread=_string_agg(ibis._.message),\n",
" num_messages=ibis._.count(),\n",
" users=_array_agg(ibis._.user).unique(),\n",
" _dlt_load_id=ibis._._dlt_load_id.max(),\n",
" _dlt_id=_array_agg(ibis._._dlt_id),\n",
" )\n",
"\n",
"\n",
"def summary_prompt() -> str:\n",
" \"\"\"LLM prompt to summarize Slack thread\"\"\"\n",
" return textwrap.dedent(\n",
" \"\"\"Hamilton is an open source library to write dataflows in Python. It is used by developers for data engineering, data science, machine learning, and LLM workflows.\n",
" Next is a discussion thread about Hamilton started by User1. Complete these tasks: identify the issue raised by User1, summarize the discussion, indicate if you think the issue was resolved.\n",
"\n",
" DISCUSSION THREAD\n",
" {text}\n",
" \"\"\"\n",
" )\n",
"\n",
"def _summary(threads: ir.Table, prompt: str) -> ir.Table:\n",
" \"\"\"Generate a summary for each thread.\n",
" Uses a scalar Python UDF executed by the backend.\n",
" \"\"\"\n",
" # Ibis requires `str` type hint even if None is allowed\n",
" @ibis.udf.scalar.python\n",
" def _openai_completion_udf(text: str, prompt_template: str) -> str:\n",
" \"\"\"Fill `prompt` with `text` and use OpenAI chat completion.\n",
" Returns None if:\n",
" - `text` is empty\n",
" - `content` is too long\n",
" - OpenAI call fails\n",
" \"\"\"\n",
" if len(text) == 0:\n",
" return None\n",
"\n",
" content = prompt_template.format(text=text)\n",
"\n",
" if len(content) // 4 > 8191:\n",
" return None\n",
"\n",
" client = openai.OpenAI()\n",
"\n",
" response = client.chat.completions.create(\n",
" model=\"gpt-3.5-turbo\",\n",
" messages=[{\"role\": \"user\", \"content\": content}],\n",
" )\n",
" try:\n",
" output = response.choices[0].message.content\n",
" except Exception:\n",
" output = None\n",
"\n",
" return output\n",
" \n",
" return threads.mutate(summary=_openai_completion_udf(threads.thread, prompt))\n",
"\n",
"\n",
"# @pipe operator facilitates managing the function/node namespace\n",
"@pipe(\n",
" step(_format_messages), step(_aggregate_thread), step(_summary, prompt=source(\"summary_prompt\"))\n",
")\n",
"def threads(channels_collection: ir.Table) -> ir.Table:\n",
" \"\"\"Create `threads` table by formatting, aggregating messages,\n",
" and generating summaries.\n",
" \"\"\"\n",
" return channels_collection\n",
"\n",
"\n",
"def insert_threads(threads: ir.Table) -> int:\n",
" \"\"\"Save `threads` table and return row count.\"\"\"\n",
" db_con = ibis.get_backend()\n",
" threads_table = db_con.create_table(\"threads\", threads)\n",
" db_con.insert(\"threads\", threads)\n",
" return int(threads_table.count().execute())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Execute\n",
"Create a Hamilton Driver with the defined dataflow module and request nodes to compute. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Driver definition\n",
"\n",
"We pass the module `jupyter_transform` that we defined in this notebook, but it could be modules stored in `.py` files. The `.enable_dynamic_execution()` statement is required because we're using Hamilton's `Parallelizable/Collect` feature."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hamilton import driver\n",
"import jupyter_transform\n",
"\n",
"dr = (\n",
" driver.Builder()\n",
" .enable_dynamic_execution(allow_experimental_mode=True)\n",
" .with_modules(jupyter_transform)\n",
" .build()\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Driver execution\n",
"We create a dictionary to hold the inputs `pipeline` and `selected_channels` (the same passed to the dlt pipeline) required by the Hamilton dataflow.\n",
"\n",
"Then, we call `dr.execute()` with the list of nodes to compute and the inputs. This returns a dictionary of results."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inputs = dict(\n",
" pipeline=slack_pipeline,\n",
" selected_channels=[\"general\", \"dlt\"],\n",
") \n",
"final_vars = [\"threads.with_aggregate_thread\"]\n",
"\n",
"results = dr.execute(final_vars, inputs=inputs)\n",
"# `threads.with_aggregate_thread` is an ibis expr, execute it to return a pandas DataFrame\n",
"df = results[\"threads.with_aggregate_thread\"].to_pandas()\n",
"\n",
"display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The next cells require your OpenAI key to execute the full dataflow and compute the summaries. By requesting `insert_threads`, we could directly save the Ibis results without leaving DuckDB!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os \n",
"import getpass\n",
"\n",
"os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI key\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"final_vars = [\"threads\"] # replace by `\"insert_threads\"` to directly store results\n",
"results = dr.execute(final_vars, inputs=inputs)\n",
"df2 = results[\"threads\"].to_pandas()\n",
"\n",
"display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df2)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}