WIP: adding ponder + hamilton example
Should probably rename to ponder rather than modin.
diff --git a/examples/modin/dataflow.png.png b/examples/modin/dataflow.png.png
new file mode 100644
index 0000000..3f804a2
--- /dev/null
+++ b/examples/modin/dataflow.png.png
Binary files differ
diff --git a/examples/modin/dataflow.py b/examples/modin/dataflow.py
new file mode 100644
index 0000000..465e56c
--- /dev/null
+++ b/examples/modin/dataflow.py
@@ -0,0 +1,32 @@
+import modin.pandas as pd
+
+
+def base_table(tablename: str, db_connection: object) -> pd.DataFrame:
+ # Select Data Source form configured DB
+ df = pd.read_sql(tablename, con=db_connection)
+ return df
+
+
+def filtered_table(base_table: pd.DataFrame) -> pd.DataFrame:
+ # keep columns that have more than 50% non-null values
+ keep = base_table.columns[
+ (((base_table.isnull().sum() / base_table.shape[0])) * 100 < 50).values
+ ]
+ return base_table[keep]
+
+
+def cleaned_table(filtered_table: pd.DataFrame) -> pd.DataFrame:
+ upper_bound = filtered_table.annual_inc.quantile(0.95)
+ lower_bound = filtered_table.annual_inc.quantile(0.05)
+
+ no_outliers = filtered_table[
+ (filtered_table.annual_inc < upper_bound) & (filtered_table.annual_inc > lower_bound)
+ ]
+
+ print("{} outliers removed".format(filtered_table.shape[0] - no_outliers.shape[0]))
+
+ return no_outliers
+
+
+def final_table(cleaned_table: pd.DataFrame) -> pd.DataFrame:
+ return pd.get_dummies(cleaned_table, columns="grade")
diff --git a/examples/modin/requirements.txt b/examples/modin/requirements.txt
new file mode 100644
index 0000000..f7ced9d
--- /dev/null
+++ b/examples/modin/requirements.txt
@@ -0,0 +1,2 @@
+ponder[bigquery]
+sf-hamilton
diff --git a/examples/modin/run.py b/examples/modin/run.py
new file mode 100644
index 0000000..0d89245
--- /dev/null
+++ b/examples/modin/run.py
@@ -0,0 +1,76 @@
+import dataclasses
+import os
+from datetime import datetime
+from typing import Any, Collection, Dict, Type
+
+import dataflow
+import modin.pandas as pd
+import ponder
+
+from hamilton import driver, registry
+from hamilton.io.data_adapters import DataSaver
+from hamilton.io.materialization import to
+
+ponder.init(os.environ["PONDER_API_KEY"])
+
+
+def db_connection() -> object:
+ import json
+
+ from google.cloud import bigquery
+ from google.cloud.bigquery import dbapi
+ from google.oauth2 import service_account
+
+ db_con = dbapi.Connection(
+ bigquery.Client(
+ credentials=service_account.Credentials.from_service_account_info(
+ json.loads(open("my_service_account_key.json").read()),
+ scopes=["https://www.googleapis.com/auth/bigquery"],
+ )
+ )
+ )
+ ponder.configure(default_connection=db_con)
+ return db_con
+
+
+@dataclasses.dataclass
+class BigQuery(DataSaver):
+ table: str
+ db_connection: object
+
+ def save_data(self, data: pd.DataFrame) -> Dict[str, Any]:
+ data.to_sql(self.table, con=self.db_connection, index=False)
+ return {
+ "table": self.table,
+ "db_type": "bigquery",
+ "timestamp": datetime.now().utcnow().timestamp(),
+ }
+
+ @classmethod
+ def applicable_types(cls) -> Collection[Type]:
+ return [pd.DataFrame]
+
+ @classmethod
+ def name(cls) -> str:
+ return "bigquery"
+
+
+for adapter in [BigQuery]:
+ registry.register_adapter(adapter)
+
+_db_con = object() # db_connection()
+
+dr = driver.Driver({}, dataflow)
+dr.display_all_functions("./dataflow.png")
+
+dr.visualize_materialization(
+ to.bigquery(
+ dependencies=["final_table"],
+ id="final_table_to_bigquery",
+ table="lending_club_cleaned",
+ db_connection=_db_con,
+ ),
+ inputs={"db_connection": _db_con, "tablename": "LOANS.ACCEPTED"},
+ output_file_path="./dataflow.png",
+ render_kwargs={"format": "png"},
+)