| { |
| "cells": [ |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "# Execute this cell to install dependencies\n", |
| "%pip install sf-hamilton[visualization] dlt[duckdb]>=0.3.12 ibis-framework[duckdb] openai pandas polars" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Slack Summaries [](https://colab.research.google.com/github/dagworks-inc/hamilton/blob/main/examples/dlt/notebook.ipynb) [](https://github.com/dagworks-inc/hamilton/blob/main/examples/dlt/notebook.ipynb)\n", |
| "\n", |
| "This notebook shows how to ingest Slack messages and generate threads summaries.\n", |
| "\n", |
| "NOTE. This notebook uses data generated on a dummy Slack server, so messages, replies, and summaries aren't really insightful.\n", |
| "\n", |
| "## Pre-requisites\n", |
| "The first step is to `dlt init` the Slack pipeline, but this is already done for this repository. All you need to do is configure your Slack application to get a User OAuth Token (see [docs](https://dlthub.com/docs/dlt-ecosystem/verified-sources/slack)). Then, you can it add it to `.dlt/secrets.toml` as follow:\n", |
| "\n", |
| "```toml\n", |
| "[sources.slack]\n", |
| "access_token = \"xoxb-...\"\n", |
| "```" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Extract & Load (dlt)\n", |
| "The \"extract\" and \"load\" steps consist of getting the raw data to destination for further transformations.\n", |
| "\n", |
| "### Set up\n", |
| "Define your dlt `Pipeline` and `Source`" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import dlt\n", |
| "import slack\n", |
| "\n", |
| "slack_pipeline = dlt.pipeline(\n", |
| " pipeline_name=\"slack\",\n", |
| " destination='duckdb',\n", |
| " dataset_name=\"slack_data\",\n", |
| " full_refresh=True,\n", |
| ")\n", |
| "\n", |
| "dlt_source = slack.slack_source(\n", |
| " selected_channels=[\"general\", \"dlt\"],\n", |
| " replies=True,\n", |
| ")\n", |
| "\n", |
| "print(f\"\"\"{slack_pipeline.dataset_name=:}\n", |
| "{slack_pipeline.pipeline_name=:}\"\"\")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### Execution\n", |
| "Run your dlt pipeline. `load_info` will contain a lot metadata about execution." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "load_info = slack_pipeline.run(dlt_source)\n", |
| "print(load_info)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Transform (Hamilton + Ibis)\n", |
| "\n", |
| "Now that raw data is loaded (in DuckDB), we use Hamilton to organize a dataflow of Ibis data transformations. " |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Define\n", |
| "\n", |
| "In the next cells, we use Python functions to define a Hamilton dataflow. \n", |
| "\n", |
| "By using the Hamilton Jupyter plugin, we can define the dataflow interactively. Adding and removing functions to the cell will change its shape. " |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%load_ext hamilton.plugins.jupyter_magic\n", |
| "from IPython.display import display" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "%%cell_to_module -m jupyter_transform -d\n", |
| "\n", |
| "import textwrap\n", |
| "\n", |
| "import dlt\n", |
| "import ibis\n", |
| "import ibis.expr.types as ir\n", |
| "import openai\n", |
| "from hamilton.function_modifiers import pipe, source, step\n", |
| "from hamilton.htypes import Parallelizable, Collect\n", |
| "\n", |
| "\n", |
| "def db_con(pipeline: dlt.Pipeline) -> ibis.BaseBackend:\n", |
| " backend = ibis.connect(f\"{pipeline.pipeline_name}.duckdb\")\n", |
| " ibis.set_backend(backend)\n", |
| " return backend\n", |
| "\n", |
| "\n", |
| "def channel(selected_channels: list[str]) -> Parallelizable[str]:\n", |
| " for channel in selected_channels:\n", |
| " yield channel\n", |
| "\n", |
| "\n", |
| "def _epoch_microseconds(timestamp: ir.Column) -> ir.Column:\n", |
| " seconds_from_epoch = timestamp.epoch_seconds()\n", |
| " microseconds = timestamp.microsecond() / int(10e5)\n", |
| " return seconds_from_epoch + microseconds\n", |
| " \n", |
| "\n", |
| "def channel_message(\n", |
| " channel: str,\n", |
| " db_con: ibis.BaseBackend, \n", |
| " pipeline: dlt.Pipeline,\n", |
| ") -> ir.Table:\n", |
| " \"\"\"Load table containing parent messages of a channel.\n", |
| " the timestamps `thread_ts` and `ts` are converted to strings.\n", |
| " `thread_ts` is not None if the message has replies / started a thread. Otherwise,\n", |
| " `thread_ts` == `ts`. Coalesce is used to fill these None values with `ts`\n", |
| "\n", |
| " Slack reference: https://api.slack.com/messaging/retrieving#finding_threads\n", |
| " \"\"\"\n", |
| " return (\n", |
| " db_con.table(\n", |
| " f\"{channel}_message\",\n", |
| " schema=pipeline.dataset_name,\n", |
| " database=pipeline.pipeline_name,\n", |
| " )\n", |
| " .mutate(\n", |
| " thread_ts=_epoch_microseconds(ibis._.thread_ts).cast(str),\n", |
| " ts=_epoch_microseconds(ibis._.ts).cast(str),\n", |
| " )\n", |
| " .mutate(thread_ts=ibis.coalesce(ibis._.thread_ts, ibis._.ts))\n", |
| " )\n", |
| "\n", |
| "\n", |
| "def channel_replies(\n", |
| " channel: str,\n", |
| " db_con: ibis.BaseBackend, \n", |
| " pipeline: dlt.Pipeline,\n", |
| ") -> ir.Table:\n", |
| " \"\"\"Create table for replies\"\"\"\n", |
| " return db_con.table(\n", |
| " f\"{channel}_replies_message\",\n", |
| " schema=pipeline.dataset_name,\n", |
| " database=pipeline.pipeline_name,\n", |
| " )\n", |
| " \n", |
| "\n", |
| "def channel_threads(channel_message: ir.Table, channel_replies: ir.Table) -> ir.Table:\n", |
| " \"\"\"Union of parent messages and replies. Sort by thread start, then message timestamp\"\"\"\n", |
| " columns = [\"channel\", \"thread_ts\", \"ts\", \"user\", \"text\", \"_dlt_load_id\", \"_dlt_id\"]\n", |
| " return (\n", |
| " ibis.union(\n", |
| " channel_message.select(columns),\n", |
| " channel_replies.select(columns),\n", |
| " )\n", |
| " .order_by([ibis._.thread_ts, ibis._.ts])\n", |
| " )\n", |
| "\n", |
| "\n", |
| "def channels_collection(channel_threads: Collect[ir.Table]) -> ir.Table:\n", |
| " \"\"\"Collect `channel_threads` for all channels\"\"\"\n", |
| " return ibis.union(*channel_threads)\n", |
| "\n", |
| "\n", |
| "def _format_messages(threads: ir.Table) -> ir.Table:\n", |
| " \"\"\"Assign a user id per thread and prefix messages with it\"\"\"\n", |
| " thread_user_id_expr = (ibis.dense_rank().over(order_by=\"user\") + 1).cast(str)\n", |
| " return threads.group_by(\"thread_ts\").mutate(\n", |
| " message=thread_user_id_expr.concat(\": \", ibis._.text)\n", |
| " )\n", |
| "\n", |
| "\n", |
| "def _aggregate_thread(threads: ir.Table) -> ir.Table:\n", |
| " \"\"\"Create threads as a single string by concatenating messages\n", |
| "\n", |
| " Functions decorates with `@ibis.udf` are loaded by the Ibis backend.\n", |
| " They aren't meant to be called directly.\n", |
| " ref: https://ibis-project.org/how-to/extending/builtin\n", |
| " \"\"\"\n", |
| "\n", |
| " @ibis.udf.agg.builtin(name=\"string_agg\")\n", |
| " def _string_agg(arg, sep: str = \"\\n \") -> str:\n", |
| " raise NotImplementedError\n", |
| "\n", |
| " @ibis.udf.agg.builtin(name=\"array_agg\")\n", |
| " def _array_agg(arg) -> list[str]:\n", |
| " raise NotImplementedError\n", |
| "\n", |
| " return threads.group_by(\"thread_ts\").agg(\n", |
| " thread=_string_agg(ibis._.message),\n", |
| " num_messages=ibis._.count(),\n", |
| " users=_array_agg(ibis._.user).unique(),\n", |
| " _dlt_load_id=ibis._._dlt_load_id.max(),\n", |
| " _dlt_id=_array_agg(ibis._._dlt_id),\n", |
| " )\n", |
| "\n", |
| "\n", |
| "def summary_prompt() -> str:\n", |
| " \"\"\"LLM prompt to summarize Slack thread\"\"\"\n", |
| " return textwrap.dedent(\n", |
| " \"\"\"Hamilton is an open source library to write dataflows in Python. It is used by developers for data engineering, data science, machine learning, and LLM workflows.\n", |
| " Next is a discussion thread about Hamilton started by User1. Complete these tasks: identify the issue raised by User1, summarize the discussion, indicate if you think the issue was resolved.\n", |
| "\n", |
| " DISCUSSION THREAD\n", |
| " {text}\n", |
| " \"\"\"\n", |
| " )\n", |
| "\n", |
| "def _summary(threads: ir.Table, prompt: str) -> ir.Table:\n", |
| " \"\"\"Generate a summary for each thread.\n", |
| " Uses a scalar Python UDF executed by the backend.\n", |
| " \"\"\"\n", |
| " # Ibis requires `str` type hint even if None is allowed\n", |
| " @ibis.udf.scalar.python\n", |
| " def _openai_completion_udf(text: str, prompt_template: str) -> str:\n", |
| " \"\"\"Fill `prompt` with `text` and use OpenAI chat completion.\n", |
| " Returns None if:\n", |
| " - `text` is empty\n", |
| " - `content` is too long\n", |
| " - OpenAI call fails\n", |
| " \"\"\"\n", |
| " if len(text) == 0:\n", |
| " return None\n", |
| "\n", |
| " content = prompt_template.format(text=text)\n", |
| "\n", |
| " if len(content) // 4 > 8191:\n", |
| " return None\n", |
| "\n", |
| " client = openai.OpenAI()\n", |
| "\n", |
| " response = client.chat.completions.create(\n", |
| " model=\"gpt-3.5-turbo\",\n", |
| " messages=[{\"role\": \"user\", \"content\": content}],\n", |
| " )\n", |
| " try:\n", |
| " output = response.choices[0].message.content\n", |
| " except Exception:\n", |
| " output = None\n", |
| "\n", |
| " return output\n", |
| " \n", |
| " return threads.mutate(summary=_openai_completion_udf(threads.thread, prompt))\n", |
| "\n", |
| "\n", |
| "# @pipe operator facilitates managing the function/node namespace\n", |
| "@pipe(\n", |
| " step(_format_messages), step(_aggregate_thread), step(_summary, prompt=source(\"summary_prompt\"))\n", |
| ")\n", |
| "def threads(channels_collection: ir.Table) -> ir.Table:\n", |
| " \"\"\"Create `threads` table by formatting, aggregating messages,\n", |
| " and generating summaries.\n", |
| " \"\"\"\n", |
| " return channels_collection\n", |
| "\n", |
| "\n", |
| "def insert_threads(threads: ir.Table) -> int:\n", |
| " \"\"\"Save `threads` table and return row count.\"\"\"\n", |
| " db_con = ibis.get_backend()\n", |
| " threads_table = db_con.create_table(\"threads\", threads)\n", |
| " db_con.insert(\"threads\", threads)\n", |
| " return int(threads_table.count().execute())" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## Execute\n", |
| "Create a Hamilton Driver with the defined dataflow module and request nodes to compute. " |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### Driver definition\n", |
| "\n", |
| "We pass the module `jupyter_transform` that we defined in this notebook, but it could be modules stored in `.py` files. The `.enable_dynamic_execution()` statement is required because we're using Hamilton's `Parallelizable/Collect` feature." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from hamilton import driver\n", |
| "import jupyter_transform\n", |
| "\n", |
| "dr = (\n", |
| " driver.Builder()\n", |
| " .enable_dynamic_execution(allow_experimental_mode=True)\n", |
| " .with_modules(jupyter_transform)\n", |
| " .build()\n", |
| ")" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "### Driver execution\n", |
| "We create a dictionary to hold the inputs `pipeline` and `selected_channels` (the same passed to the dlt pipeline) required by the Hamilton dataflow.\n", |
| "\n", |
| "Then, we call `dr.execute()` with the list of nodes to compute and the inputs. This returns a dictionary of results." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "inputs = dict(\n", |
| " pipeline=slack_pipeline,\n", |
| " selected_channels=[\"general\", \"dlt\"],\n", |
| ") \n", |
| "final_vars = [\"threads.with_aggregate_thread\"]\n", |
| "\n", |
| "results = dr.execute(final_vars, inputs=inputs)\n", |
| "# `threads.with_aggregate_thread` is an ibis expr, execute it to return a pandas DataFrame\n", |
| "df = results[\"threads.with_aggregate_thread\"].to_pandas()\n", |
| "\n", |
| "display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "The next cells require your OpenAI key to execute the full dataflow and compute the summaries. By requesting `insert_threads`, we could directly save the Ibis results without leaving DuckDB!" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import os \n", |
| "import getpass\n", |
| "\n", |
| "os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI key\")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "final_vars = [\"threads\"] # replace by `\"insert_threads\"` to directly store results\n", |
| "results = dr.execute(final_vars, inputs=inputs)\n", |
| "df2 = results[\"threads\"].to_pandas()\n", |
| "\n", |
| "display(dr.visualize_execution(final_vars=final_vars, inputs=inputs), df2)" |
| ] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "venv", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.10.9" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |