blob: 7d38771c020c4c93841f6f8e3dd3fdc0cf72c59f [file] [log] [blame]
import base64
import json
import os
from io import BytesIO
import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
from .resources import DataGeneratorResource
@asset
def topstory_ids() -> None:
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_new_story_ids = requests.get(newstories_url).json()[:100]
os.makedirs("data", exist_ok=True)
with open("data/topstory_ids.json", "w") as f:
json.dump(top_new_story_ids, f)
@asset(deps=[topstory_ids])
def topstories(context: AssetExecutionContext) -> MaterializeResult:
with open("data/topstory_ids.json", "r") as f:
topstory_ids = json.load(f)
results = []
for item_id in topstory_ids:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
df.to_csv("data/topstories.csv")
return MaterializeResult(
metadata={
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
@asset(deps=[topstories])
def most_frequent_words() -> MaterializeResult:
stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]
topstories = pd.read_csv("data/topstories.csv")
word_counts = {}
for raw_title in topstories["title"]:
title = raw_title.lower()
for word in title.split():
cleaned_word = word.strip(".,-!?:;()[]'\"-")
if cleaned_word not in stopwords and len(cleaned_word) > 0:
word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1
top_words = {
pair[0]: pair[1]
for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
}
plt.figure(figsize=(10, 6))
plt.bar(list(top_words.keys()), list(top_words.values()))
plt.xticks(rotation=45, ha="right")
plt.title("Top 25 Words in Hacker News Titles")
plt.tight_layout()
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"
with open("data/most_frequent_words.json", "w") as f:
json.dump(top_words, f)
return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})
@asset
def signups(hackernews_api: DataGeneratorResource) -> MaterializeResult:
signups = pd.DataFrame(hackernews_api.get_signups())
signups.to_csv("data/signups.csv")
return MaterializeResult(
metadata={
"Record Count": len(signups),
"Preview": MetadataValue.md(signups.head().to_markdown()),
"Earliest Signup": signups["registered_at"].min(),
"Latest Signup": signups["registered_at"].max(),
}
)