blob: 876dc3f307b55fd254f2b76c790f3375178d2a81 [file] [log] [blame]
import os
import dlt
import slack
import transform
from hamilton import driver
def main(
selected_channels: list[str],
ingestion_full_refresh: bool = False,
):
"""ELT pipeline to load Slack messages and replies,
then reassemble threads and use an LLM to summarize it.
dlt does "Extract, Load"; Hamilton does "Transform"
"""
# dlt
slack_pipeline = dlt.pipeline(
pipeline_name="slack",
destination="duckdb",
dataset_name="slack_data",
full_refresh=ingestion_full_refresh,
)
dlt_source = slack.slack_source(
selected_channels=selected_channels,
replies=True,
)
load_info = slack_pipeline.run(dlt_source)
print(load_info)
if os.environ.get("OPENAI_API_KEY") is None:
raise KeyError("OPENAI_API_KEY wasn't set.")
# hamilton
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_modules(transform)
.build()
)
dr.display_all_functions("dag_transform.png", orient="TB")
inputs = dict(
selected_channels=selected_channels,
pipeline=slack_pipeline,
)
dr.execute(["insert_threads"], inputs=inputs)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("channels", nargs="+", type=str, help="Slack channels to load.")
parser.add_argument("--full-refresh", action="store_true", help="Reload all Slack data.")
args = parser.parse_args()
main(selected_channels=args.channels, ingestion_full_refresh=args.full_refresh)