blob: 97a464c1c9bb25e3af81a8d65727e37e9aa75053 [file] [log] [blame]
import os
from hamilton import driver
from hamilton.io.materialization import to
from hamilton.plugins import matplotlib_extensions
import dataflow # import module with dataflow definition
from mock_api import DataGeneratorResource
def main():
dr = (
driver.Builder()
.with_modules(dataflow) # pass the module
.build()
)
# load environment variable
num_days = os.environ.get("HACKERNEWS_NUM_DAYS_WINDOW")
inputs = dict( # mock an API connection
hackernews_api=DataGeneratorResource(num_days=num_days),
)
# define I/O operations; decoupled from dataflow def
materializers = [
to.json( # JSON file type
id="most_frequent_words.json",
dependencies=["most_frequent_words"],
path="data/most_frequent_words.json",
),
to.csv( # CSV file type
id="topstories.csv",
dependencies=["topstories"],
path="data/topstories.csv",
),
to.csv(
id="signups.csv",
dependencies=["signups"],
path="data/signups.csv",
),
to.plt( # Use matplotlib.pyplot to render
id="top_25_words_plot.plt",
dependencies=["top_25_words_plot"],
path="data/top_25_words_plot.png",
),
]
# visualize materialization plan without executing code
dr.visualize_materialization(
*materializers,
inputs=inputs,
output_file_path="dataflow.png"
)
# pass I/O operations and inputs to materialize dataflow
dr.materialize(*materializers, inputs=inputs)
if __name__ == "__main__":
main()