blob: 252def8157f0c0b89820ffb4b60a5ecf50dee518 [file] [log] [blame]
import logging
import sqlite3
import sys
import pandas as pd
# Add the hamilton module to your path - optional before hamilton import
# project_dir = "### ADD PATH HERE ###"
# sys.path.append(project_dir)
from hamilton import base, driver
from hamilton.io.materialization import to
logging.basicConfig(stream=sys.stdout)
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
# Note: these values don't have to be all series, they could be a scalar.
"signups": pd.Series([1, 10, 50, 100, 200, 400]),
"spend": pd.Series([10, 10, 20, 40, 40, 50]),
}
# we need to tell hamilton where to load function definitions from
# programmatic code to load modules:
# module_name = "my_functions"
# my_functions = importlib.import_module(module_name)
# or import module(s) directly:
import my_functions
df_builder = base.PandasDataFrameResult()
dr = driver.Driver({}, my_functions) # can pass in multiple modules
# we need to specify what we want in the final dataframe. These can be string names, or function references.
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_zero_mean_unit_variance",
]
# set up db connection for sql materializer below
conn = sqlite3.connect("df.db")
# remove an previous instances of the 'test' table that will be created next
conn.cursor().execute("DROP TABLE IF EXISTS test;")
conn.commit()
materializers = [
# materialize the dataframe to a pickle file
to.pickle(
dependencies=output_columns,
id="df_to_pickle",
path="./df.pkl",
combine=df_builder,
),
to.json(
dependencies=output_columns,
id="df_to_json",
filepath_or_buffer="./df.json",
combine=df_builder,
),
to.sql(
dependencies=output_columns,
id="df_to_sql",
table_name="test",
db_connection=conn,
combine=df_builder,
),
to.xml(
dependencies=output_columns,
id="df_to_xml",
path_or_buffer="./df.xml",
combine=df_builder,
),
to.html(
dependencies=output_columns,
id="df_to_html",
buf="./df.html",
combine=df_builder,
),
to.stata(
dependencies=output_columns,
id="df_to_stata",
path="./df.dta",
combine=df_builder,
),
to.feather(
dependencies=output_columns,
id="df_to_feather",
path="./df.feather",
combine=df_builder,
),
to.parquet(
dependencies=output_columns,
id="df_to_parquet",
path="./df.parquet.gzip",
combine=df_builder,
),
to.csv(
dependencies=output_columns,
id="df_to_csv",
path="./df.csv",
combine=df_builder,
),
to.orc(
dependencies=output_columns,
id="df_to_orc",
path="./df.orc",
combine=df_builder,
),
to.excel(
dependencies=output_columns,
id="df_to_excel",
path="./df.xlsx",
combine=df_builder,
),
]
# Visualize what is happening
dr.visualize_materialization(
*materializers,
additional_vars=output_columns,
output_file_path="./dag",
render_kwargs={},
inputs=initial_columns,
)
# Materialize a result, i.e. execute the DAG!
materialization_results, additional_outputs = dr.materialize(
*materializers,
additional_vars=[
"df_to_pickle_build_result",
"df_to_json_build_result",
"df_to_sql_build_result",
"df_to_xml_build_result",
"df_to_html_build_result",
"df_to_stata_build_result",
"df_to_feather_build_result",
"df_to_parquet_build_result",
"df_to_csv_build_result",
"df_to_orc_build_result",
"df_to_excel_build_result",
], # because combine is used, we can get that result here.
inputs=initial_columns,
)
print(materialization_results)
print(additional_outputs["df_to_pickle_build_result"])
print(additional_outputs["df_to_json_build_result"])
print(additional_outputs["df_to_sql_build_result"])
print(additional_outputs["df_to_xml_build_result"])
print(additional_outputs["df_to_html_build_result"])
print(additional_outputs["df_to_stata_build_result"])
print(additional_outputs["df_to_feather_build_result"])
print(additional_outputs["df_to_parquet_build_result"])
print(additional_outputs["df_to_csv_build_result"])
print(additional_outputs["df_to_orc_build_result"])
print(additional_outputs["df_to_excel_build_result"])
conn.close()