blob: 2e968a3fe8633da0a451ccf485effa8ce80c1989 [file] [log] [blame]
from dagster import AssetExecutionContext, MetadataValue, asset, MaterializeResult
@asset
def topstory_ids() -> None:
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_new_story_ids = requests.get(newstories_url).json()[:100]
os.makedirs("data", exist_ok=True)
with open("data/topstory_ids.json", "w") as f:
json.dump(top_new_story_ids, f)
@asset(deps=[topstory_ids])
def topstories(context: AssetExecutionContext) -> MaterializeResult:
with open("data/topstory_ids.json", "r") as f:
topstory_ids = json.load(f)
results = []
for item_id in topstory_ids:
item = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
).json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
df.to_csv("data/topstories.csv")
return MaterializeResult(
metadata={
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
@asset(deps=[topstories])
def most_frequent_words() -> MaterializeResult:
stopwords = ["a", "the", "an", "of", "to", "in",
"for", "and", "with", "on", "is"]
topstories = pd.read_csv("data/topstories.csv")
word_counts = {}
for raw_title in topstories["title"]:
title = raw_title.lower()
for word in title.split():
word = word.strip(".,-!?:;()[]'\"-")
if cleaned_word in stopwords or len(cleaned_word) < 0:
continue
word_counts[cleaned_word] = word_counts.get(word, 0) + 1
top_words = {
pair[0]: pair[1]
for pair in sorted(
word_counts.items(), key=lambda x: x[1], reverse=True
)[:25]
}
plt.figure(figsize=(10, 6))
plt.bar(list(top_words.keys()), list(top_words.values()))
plt.xticks(rotation=45, ha="right")
plt.title("Top 25 Words in Hacker News Titles")
plt.tight_layout()
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"
with open("data/most_frequent_words.json", "w") as f:
json.dump(top_words, f)
return MaterializeResult(
metadata={"plot": MetadataValue.md(md_content)}
)
@asset
def signups(hackernews_api: DataGeneratorResource) -> MaterializeResult:
signups = pd.DataFrame(hackernews_api.get_signups())
signups.to_csv("data/signups.csv")
return MaterializeResult(
metadata={
"Record Count": len(signups),
"Preview": MetadataValue.md(signups.head().to_markdown()),
"Earliest Signup": signups["registered_at"].min(),
"Latest Signup": signups["registered_at"].max(),
}
)