| from dagster import AssetExecutionContext, MetadataValue, asset, MaterializeResult |
| |
| @asset |
| def topstory_ids() -> None: |
| newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json" |
| top_new_story_ids = requests.get(newstories_url).json()[:100] |
| |
| os.makedirs("data", exist_ok=True) |
| with open("data/topstory_ids.json", "w") as f: |
| json.dump(top_new_story_ids, f) |
| |
| @asset(deps=[topstory_ids]) |
| def topstories(context: AssetExecutionContext) -> MaterializeResult: |
| with open("data/topstory_ids.json", "r") as f: |
| topstory_ids = json.load(f) |
| |
| results = [] |
| for item_id in topstory_ids: |
| item = requests.get( |
| f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json" |
| ).json() |
| results.append(item) |
| |
| if len(results) % 20 == 0: |
| context.log.info(f"Got {len(results)} items so far.") |
| |
| df = pd.DataFrame(results) |
| df.to_csv("data/topstories.csv") |
| |
| return MaterializeResult( |
| metadata={ |
| "num_records": len(df), |
| "preview": MetadataValue.md(df.head().to_markdown()), |
| } |
| ) |
| |
| @asset(deps=[topstories]) |
| def most_frequent_words() -> MaterializeResult: |
| stopwords = ["a", "the", "an", "of", "to", "in", |
| "for", "and", "with", "on", "is"] |
| topstories = pd.read_csv("data/topstories.csv") |
| |
| word_counts = {} |
| for raw_title in topstories["title"]: |
| title = raw_title.lower() |
| for word in title.split(): |
| word = word.strip(".,-!?:;()[]'\"-") |
| if cleaned_word in stopwords or len(cleaned_word) < 0: |
| continue |
| |
| word_counts[cleaned_word] = word_counts.get(word, 0) + 1 |
| |
| top_words = { |
| pair[0]: pair[1] |
| for pair in sorted( |
| word_counts.items(), key=lambda x: x[1], reverse=True |
| )[:25] |
| } |
| |
| plt.figure(figsize=(10, 6)) |
| plt.bar(list(top_words.keys()), list(top_words.values())) |
| plt.xticks(rotation=45, ha="right") |
| plt.title("Top 25 Words in Hacker News Titles") |
| plt.tight_layout() |
| |
| buffer = BytesIO() |
| plt.savefig(buffer, format="png") |
| image_data = base64.b64encode(buffer.getvalue()) |
| |
| md_content = f"})" |
| |
| with open("data/most_frequent_words.json", "w") as f: |
| json.dump(top_words, f) |
| |
| return MaterializeResult( |
| metadata={"plot": MetadataValue.md(md_content)} |
| ) |
| |
| @asset |
| def signups(hackernews_api: DataGeneratorResource) -> MaterializeResult: |
| signups = pd.DataFrame(hackernews_api.get_signups()) |
| |
| signups.to_csv("data/signups.csv") |
| |
| return MaterializeResult( |
| metadata={ |
| "Record Count": len(signups), |
| "Preview": MetadataValue.md(signups.head().to_markdown()), |
| "Earliest Signup": signups["registered_at"].min(), |
| "Latest Signup": signups["registered_at"].max(), |
| } |
| ) |