blob: 7701d32e7344d3a131fe1a2613d8557271bad423 [file] [log] [blame]
"""
This script runs a Hamilton DAG whose intent is to create some features for input to a model.
The way the code is set up, is that you can do one thing:
1. It uses Ray to parallelize computation. Use {"execution": "normal"} for this mode.
The model fitting steps are not represented here, just the feature ingestion and transformation logic.
Use (1) if all your data can fit in memory, and you want to parallelize execution.
With (1) you can scale to data larger than on a laptop - you'll just be limited to the individual memory size of the
machines in the Ray cluster you are running on.
To run:
> python run_ray.py
"""
import logging
import sys
import data_loaders
# we need to tell hamilton where to load function definitions from
import feature_logic
import ray
from hamilton import base, driver
from hamilton.experimental import h_ray
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger(__name__)
# Setup a local cluster.
# By default this sets up 1 worker per core
ray.init()
# passing in execution to help set up the right nodes for the DAG
config = {"location": "Absenteeism_at_work.csv", "execution": "normal"}
rga = h_ray.RayGraphAdapter(result_builder=base.PandasDataFrameResult())
dr = driver.Driver(
config, data_loaders, feature_logic, adapter=rga
) # can pass in multiple modules
# we need to specify what we want in the final dataframe.
output_columns = [
"age",
"age_zero_mean_unit_variance",
"has_children",
"is_summer",
"has_pet",
"day_of_the_week_2",
"day_of_the_week_3",
"day_of_the_week_4",
"day_of_the_week_5",
"day_of_the_week_6",
"seasons_1",
"seasons_2",
"seasons_3",
"seasons_4",
"absenteeism_time_in_hours",
]
# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
# dr.visualize_execution(output_columns, './my_dag.dot', {}, graphviz_kwargs=dict(graph_attr={'ratio': "1"}))
# dr.display_all_functions('./my_full_dag.dot')
# let's create the dataframe!
df = dr.execute(output_columns)
print(df.head().to_string())
ray.shutdown()