blob: cc320b584d56f0e5e44f7e5b752fa2d7359a8f92 [file] [log] [blame]
from pathlib import Path
import dlt
import pandas as pd
import pyarrow as pa
import pytest
from dlt.destinations import filesystem
from hamilton.plugins.dlt_extensions import DltDestinationSaver, DltResourceLoader
def pandas_df():
return pd.DataFrame({"a": [1, 2], "b": [1, 2]})
def iterable():
return [{"a": 1, "b": 3}, {"a": 2, "b": 4}]
def pyarrow_table():
col_a = pa.array([1, 2])
col_b = pa.array([3, 4])
return pa.Table.from_arrays([col_a, col_b], names=["a", "b"])
@pytest.mark.parametrize("data", [iterable(), pandas_df(), pyarrow_table()])
def test_dlt_destination_saver(data, tmp_path):
save_pipe = dlt.pipeline(destination=filesystem(bucket_url=tmp_path.as_uri()))
saver = DltDestinationSaver(pipeline=save_pipe, table_name="test_table")
metadata = saver.save_data(data)
assert len(metadata["dlt_metadata"]["load_packages"]) == 1
assert metadata["dlt_metadata"]["load_packages"][0]["state"] == "loaded"
assert Path(metadata["dlt_metadata"]["load_packages"][0]["jobs"][0]["file_path"]).exists()
def test_dlt_source_loader():
resource = dlt.resource([{"a": 1, "b": 3}, {"a": 2, "b": 4}], name="mock_resource")
loader = DltResourceLoader(resource=resource)
loaded_data, metadata = loader.load_data(pd.DataFrame)
assert len(loaded_data) == len([row for row in resource])
assert "_dlt_load_id" in loaded_data.columns