blob: 7cc7af6e06955422fdceb2e399f3bb8a7893cdec [file] [log] [blame]
import click
exec("from hamilton.plugins import h_spark")
import csv_data_loaders
import pyspark
import query_1
import query_8
import query_12
from hamilton import base, driver
QUERIES = {"query_1": query_1, "query_8": query_8, "query_12": query_12}
def run_query(query: str, data_dir: str, visualize: bool = True):
"""Runs the given query"""
dr = (
driver.Builder()
.with_modules(QUERIES[query], csv_data_loaders)
.with_adapter(base.DefaultAdapter())
.build()
)
spark = pyspark.sql.SparkSession.builder.getOrCreate()
if visualize:
dr.visualize_execution(
["final_data"], f"./dag-{query}", {}, inputs={"data_dir": data_dir, "spark": spark}
)
df = dr.execute(["final_data"], inputs={"data_dir": data_dir, "spark": spark})["final_data"]
print(df)
@click.command()
@click.option("--data-dir", type=str, help="Base directory for data", required=True)
@click.option(
"--which", type=click.Choice(list(QUERIES.keys())), help="Which query to run", required=True
)
@click.option(
"--visualize",
type=bool,
help="Whether to visualize the execution",
is_flag=True,
)
def run_tpch_query(data_dir: str, which: str, visualize: bool):
"""Placeholder function for running TPCH query"""
# Place logic here for running the TPCH query with the given 'which' value
click.echo(f"Running TPCH query: {which}")
run_query(which, data_dir, visualize)
if __name__ == "__main__":
run_tpch_query()