blob: 87eaf2259d80767509ee1b1708d569dce0d498e9 [file] [log] [blame]
import typing
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pytest
from dask.delayed import delayed
from distributed import Client
from hamilton import driver
from hamilton.plugins import h_dask
from .resources import example_module, smoke_screen_module
@pytest.fixture(scope="session")
def client():
with Client(set_as_default=False).as_current() as client:
yield client
def test_dask_graph_adapter_simple(client):
initial_columns = {
# NOTE: here you could load individual columns from a parquet file
# in delayed manner, e.g., using sth. along the lines of
# delayed(lambda: pd.read_parquet(path, columns=[col])[col])()
"signups": delayed(lambda: pd.Series([1, 10, 50, 100, 200, 400]))(),
"spend": delayed(lambda: pd.Series([10, 10, 20, 40, 40, 50]))(),
}
dr = driver.Driver(initial_columns, example_module, adapter=h_dask.DaskGraphAdapter(client))
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
]
df = dr.execute(output_columns)
assert set(df) == set(output_columns)
expected_column = pd.Series(
[0.0, 0.0, 13.33333, 23.33333, 33.33333, 43.33333], name="avg_3wk_spend"
)
pd.testing.assert_series_equal(
df.avg_3wk_spend.fillna(0.0), expected_column
) # fill na to get around NaN
# TODO: do some more asserting?
def test_smoke_screen_module(client):
config = {"region": "US"}
dr = driver.Driver(config, smoke_screen_module, adapter=h_dask.DaskGraphAdapter(client))
output_columns = [
"raw_acquisition_cost",
"pessimistic_net_acquisition_cost",
"neutral_net_acquisition_cost",
"optimistic_net_acquisition_cost",
"series_with_start_date_end_date",
]
df = dr.execute(
inputs={"date_range": {"start_date": "20200101", "end_date": "20220801"}},
final_vars=output_columns,
)
epsilon = 0.00001
assert abs(df["raw_acquisition_cost"].mean() - 0.393808) < epsilon
assert abs(df["pessimistic_net_acquisition_cost"].mean() - 0.420769) < epsilon
assert abs(df["neutral_net_acquisition_cost"].mean() - 0.405582) < epsilon
assert abs(df["optimistic_net_acquisition_cost"].mean() - 0.399363) < epsilon
assert df["series_with_start_date_end_date"].iloc[0] == "date_20200101_date_20220801"
# The following tests are for exercising the DaskDataFrameResult.build_result function
dd_test_cases = [
# single_scalar
({"a": 1}, pd.DataFrame({"a": [1]})),
# single_series
({"a": pd.Series([1, 2, 3])}, pd.DataFrame({"a": [1, 2, 3]})),
# single_dataframe
({"a": pd.DataFrame({"a": [1, 2, 3]})}, pd.DataFrame({"a.a": [1, 2, 3]})),
# multiple_series
(
{"a": pd.Series([1, 2, 3]), "b": pd.Series([4, 5, 6])},
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
),
# multiple_dataframe
(
{"a": pd.DataFrame({"a": [1, 2, 3]}), "b": pd.DataFrame({"b": [4, 5, 6]})},
pd.DataFrame({"a.a": [1, 2, 3], "b.b": [4, 5, 6]}),
),
# series_and_dataframe
(
{"a": pd.Series([1, 2, 3]), "b": pd.DataFrame({"b": [4, 5, 6]})},
pd.DataFrame({"a": [1, 2, 3], "b.b": [4, 5, 6]}),
),
# series_and_scalar
(
{"b": pd.Series([4, 5, 6]), "a": 1, "c": "string"},
pd.DataFrame({"b": [4, 5, 6], "a": [1, 1, 1], "c": ["string", "string", "string"]}),
),
# dataframe_and_scalar
(
{"a": pd.DataFrame({"a": [1, 2, 3]}), "b": 1},
pd.DataFrame({"a.a": [1, 2, 3], "b": [1, 1, 1]}),
),
# dataframe_and_series
(
{
"a": pd.Series([1, 2, 3]),
"b": pd.DataFrame({"b": [1, 2, 3], "c": [1, 1, 1]}),
},
pd.DataFrame({"a": [1, 2, 3], "b.b": [1, 2, 3], "b.c": [1, 1, 1]}),
),
# multiple_series_and_scalar
(
{"a": pd.Series([1, 2, 3]), "b": pd.Series([4, 5, 6]), "c": 12},
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [12, 12, 12]}),
),
# multiple_dataframes_and_scalar
(
{
"a": pd.DataFrame({"a": [1, 2, 3]}),
"b": pd.DataFrame({"b": [4, 5, 6], "d": [2, 2, 2]}),
"c": 12,
},
pd.DataFrame({"a.a": [1, 2, 3], "b.b": [4, 5, 6], "b.d": [2, 2, 2], "c": [12, 12, 12]}),
),
# multiple_series_and_dataframe
(
{
"d": pd.Series([11, 22, 33]),
"e": pd.Series([44, 55, 66]),
"c": pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
},
pd.DataFrame(
{
"d": [11, 22, 33],
"e": [44, 55, 66],
"c.a": [1, 2, 3],
"c.b": [4, 5, 6],
}
),
),
# multiple_series_and_dataframe_and_scalar
(
{
"d": pd.Series([11, 22, 33]),
"e": pd.Series([44, 55, 66]),
"c": pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
"f": 12,
},
pd.DataFrame(
{
"d": [11, 22, 33],
"e": [44, 55, 66],
"c.a": [1, 2, 3],
"c.b": [4, 5, 6],
"f": [12, 12, 12],
}
),
),
# numpy_arrays
(
{"a": np.array([1, 2, 3]), "b": np.array([4, 5, 6])},
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
),
# lists
({"a": [1, 2, 3], "b": [4, 5, 6]}, pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})),
# tuples
({"a": (1, 2, 3), "b": (4, 5, 6)}, pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})),
]
dd_test_case_ids = [
"single_scalar",
"single_series",
"single_dataframe",
"multiple_series",
"multiple_dataframes",
"series_and_dataframe",
"series_and_scalar",
"dataframe_and_scalar",
"dataframe_and_series",
"multiple_series_and_scalar",
"multiple_dataframes_and_scalar",
"multiple_series_and_dataframe",
"multiple_series_and_dataframe_and_scalar",
"numpy_arrays",
"lists",
"tuples",
]
@pytest.mark.parametrize("outputs, expected", dd_test_cases, ids=dd_test_case_ids)
def test_DDFR_build_result_pandas(
client, outputs: typing.Dict[str, typing.Any], expected: dd.DataFrame
):
"""Tests using pandas objects works"""
actual = h_dask.DaskDataFrameResult.build_result(**outputs)
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)
@pytest.mark.parametrize("outputs, expected", dd_test_cases, ids=dd_test_case_ids)
def test_DDFR_build_result_dask(
client, outputs: typing.Dict[str, typing.Any], expected: dd.DataFrame
):
"""Tests that using dask objects works."""
dask_outputs = {}
for k, v in outputs.items():
if isinstance(v, (pd.DataFrame, pd.Series)):
dask_outputs[k] = dd.from_pandas(v, npartitions=2)
else:
dask_outputs[k] = v
actual = h_dask.DaskDataFrameResult.build_result(**dask_outputs)
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)
def test_DDFR_build_result_mixed(client):
"""Tests that using dask & pandas objects works."""
outputs = {
"d": dd.from_pandas(pd.Series([11, 22, 33]), npartitions=2),
"e": pd.Series([44, 55, 66]),
"c": pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}),
"g": dd.from_pandas(pd.DataFrame({"a2": [1, 2, 3], "b2": [4, 5, 6]}), npartitions=2),
"f": 12,
}
expected_df = pd.DataFrame(
{
"d": [11, 22, 33],
"e": [44, 55, 66],
"c.a": [1, 2, 3],
"c.b": [4, 5, 6],
"g.a2": [1, 2, 3],
"g.b2": [4, 5, 6],
"f": [12, 12, 12],
}
)
actual = h_dask.DaskDataFrameResult.build_result(**outputs)
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected_df.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)
def test_DDFR_build_result_scalar_first_edge_case(client):
"""Tests the scalar edge case where we don't know how long to make the series from it."""
actual = h_dask.DaskDataFrameResult.build_result(
**{"a": 1, "b": dd.from_pandas(pd.Series([4, 5, 6]), npartitions=1)}
)
expected = pd.DataFrame({"a": [1, pd.NA, pd.NA], "b": [4, 5, 6]})
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)
def test_DDFR_build_result_varied_index_lengths(client):
"""Tests what happens when the index lengths are different.
So this test enshrines the current logic of how we handle scalars and series/dfs with different
lengths. We could do smarter things here (e.g. make scalars=max(len(...))), but this is the current behavior.
Our thinking is that you should control this via the order of the outputs requested for now.
i.e. re-arranging the outputs dictionary should give you the desired result that you want.
"""
outputs = {
"a": 1,
"b": dd.from_pandas(pd.Series([4, 5, 6]), npartitions=1),
"c": dd.from_pandas(pd.Series([10, 6]), npartitions=1),
"d": pd.DataFrame({"a": [11, 22, 33, 44], "b": [44, 55, 66, 77]}),
"e": "a",
}
actual = h_dask.DaskDataFrameResult.build_result(**outputs)
expected = pd.DataFrame(
{
"a": [1, pd.NA, pd.NA, pd.NA],
"b": [4, 5, 6, pd.NA],
"c": [10, 6, pd.NA, pd.NA],
"d.a": [11, 22, 33, 44],
"d.b": [44, 55, 66, 77],
"e": ["a", "a", "a", pd.NA],
}
)
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)
def test_DDFR_build_result_dd_scalar(client):
"""Tests that dask scalars work as expected."""
outputs = {
"a": dd.from_pandas(pd.Series([4, 5, 6]), npartitions=1),
"b": dd.from_pandas(pd.Series([10, 6, 3]), npartitions=1),
}
outputs["c"] = outputs["a"].sum()
actual = h_dask.DaskDataFrameResult.build_result(**outputs)
expected = pd.DataFrame(
{
"a": [4, 5, 6],
"b": [10, 6, 3],
"c": [15, 15, 15],
}
)
actual_pdf = actual.compute().convert_dtypes(dtype_backend="pyarrow")
expected_pdf = expected.convert_dtypes(dtype_backend="pyarrow")
pd.testing.assert_frame_equal(actual_pdf, expected_pdf)