blob: 292a4b00c66dc2efbb64f0b817d14d89801e1933 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyarrow as pa
import pytest
from datafusion import functions as f
from datafusion import DataFrame, SessionContext, column, literal, udf
@pytest.fixture
def ctx():
return SessionContext()
@pytest.fixture
def df():
ctx = SessionContext()
# create a RecordBatch and a new DataFrame from it
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([8, 5, 8])],
names=["a", "b", "c"],
)
return ctx.create_dataframe([[batch]])
@pytest.fixture
def struct_df():
ctx = SessionContext()
# create a RecordBatch and a new DataFrame from it
batch = pa.RecordBatch.from_arrays(
[pa.array([{"c": 1}, {"c": 2}, {"c": 3}]), pa.array([4, 5, 6])],
names=["a", "b"],
)
return ctx.create_dataframe([[batch]])
@pytest.fixture
def aggregate_df():
ctx = SessionContext()
ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv")
return ctx.sql("select c1, sum(c2) from test group by c1")
def test_select(df):
df = df.select(
column("a") + column("b"),
column("a") - column("b"),
)
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.column(0) == pa.array([5, 7, 9])
assert result.column(1) == pa.array([-3, -3, -3])
def test_select_columns(df):
df = df.select_columns("b", "a")
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.column(0) == pa.array([4, 5, 6])
assert result.column(1) == pa.array([1, 2, 3])
def test_filter(df):
df = df.select(
column("a") + column("b"),
column("a") - column("b"),
).filter(column("a") > literal(2))
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.column(0) == pa.array([9])
assert result.column(1) == pa.array([-3])
def test_sort(df):
df = df.sort(column("b").sort(ascending=False))
table = pa.Table.from_batches(df.collect())
expected = {"a": [3, 2, 1], "b": [6, 5, 4], "c": [8, 5, 8]}
assert table.to_pydict() == expected
def test_limit(df):
df = df.limit(1)
# execute and collect the first (and only) batch
result = df.collect()[0]
assert len(result.column(0)) == 1
assert len(result.column(1)) == 1
def test_with_column(df):
df = df.with_column("c", column("a") + column("b"))
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.schema.field(0).name == "a"
assert result.schema.field(1).name == "b"
assert result.schema.field(2).name == "c"
assert result.column(0) == pa.array([1, 2, 3])
assert result.column(1) == pa.array([4, 5, 6])
assert result.column(2) == pa.array([5, 7, 9])
def test_with_column_renamed(df):
df = df.with_column("c", column("a") + column("b")).with_column_renamed(
"c", "sum"
)
result = df.collect()[0]
assert result.schema.field(0).name == "a"
assert result.schema.field(1).name == "b"
assert result.schema.field(2).name == "sum"
def test_udf(df):
# is_null is a pa function over arrays
is_null = udf(
lambda x: x.is_null(),
[pa.int64()],
pa.bool_(),
volatility="immutable",
)
df = df.select(is_null(column("a")))
result = df.collect()[0].column(0)
assert result == pa.array([False, False, False])
def test_join():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2]), pa.array([8, 10])],
names=["a", "c"],
)
df1 = ctx.create_dataframe([[batch]])
df = df.join(df1, join_keys=(["a"], ["a"]), how="inner")
df = df.sort(column("a").sort(ascending=True))
table = pa.Table.from_batches(df.collect())
expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
assert table.to_pydict() == expected
def test_distinct():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3, 1, 2, 3]), pa.array([4, 5, 6, 4, 5, 6])],
names=["a", "b"],
)
df_a = (
ctx.create_dataframe([[batch]])
.distinct()
.sort(column("a").sort(ascending=True))
)
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df_b = ctx.create_dataframe([[batch]]).sort(
column("a").sort(ascending=True)
)
assert df_a.collect() == df_b.collect()
def test_window_functions(df):
df = df.select(
column("a"),
column("b"),
column("c"),
f.alias(
f.window("row_number", [], order_by=[f.order_by(column("c"))]),
"row",
),
f.alias(
f.window("rank", [], order_by=[f.order_by(column("c"))]),
"rank",
),
f.alias(
f.window("dense_rank", [], order_by=[f.order_by(column("c"))]),
"dense_rank",
),
f.alias(
f.window("percent_rank", [], order_by=[f.order_by(column("c"))]),
"percent_rank",
),
f.alias(
f.window("cume_dist", [], order_by=[f.order_by(column("b"))]),
"cume_dist",
),
f.alias(
f.window(
"ntile", [literal(2)], order_by=[f.order_by(column("c"))]
),
"ntile",
),
f.alias(
f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]),
"previous",
),
f.alias(
f.window(
"lead", [column("b")], order_by=[f.order_by(column("b"))]
),
"next",
),
f.alias(
f.window(
"first_value",
[column("a")],
order_by=[f.order_by(column("b"))],
),
"first_value",
),
f.alias(
f.window(
"last_value", [column("b")], order_by=[f.order_by(column("b"))]
),
"last_value",
),
f.alias(
f.window(
"nth_value",
[column("b"), literal(2)],
order_by=[f.order_by(column("b"))],
),
"2nd_value",
),
)
table = pa.Table.from_batches(df.collect())
expected = {
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [8, 5, 8],
"row": [2, 1, 3],
"rank": [2, 1, 2],
"dense_rank": [2, 1, 2],
"percent_rank": [0.5, 0, 0.5],
"cume_dist": [0.3333333333333333, 0.6666666666666666, 1.0],
"ntile": [1, 1, 2],
"next": [5, 6, None],
"previous": [None, 4, 5],
"first_value": [1, 1, 1],
"last_value": [4, 5, 6],
"2nd_value": [None, 5, 5],
}
assert table.sort_by("a").to_pydict() == expected
def test_get_dataframe(tmp_path):
ctx = SessionContext()
path = tmp_path / "test.csv"
table = pa.Table.from_arrays(
[
[1, 2, 3, 4],
["a", "b", "c", "d"],
[1.1, 2.2, 3.3, 4.4],
],
names=["int", "str", "float"],
)
pa.csv.write_csv(table, path)
ctx.register_csv("csv", path)
df = ctx.table("csv")
assert isinstance(df, DataFrame)
def test_struct_select(struct_df):
df = struct_df.select(
column("a")["c"] + column("b"),
column("a")["c"] - column("b"),
)
# execute and collect the first (and only) batch
result = df.collect()[0]
assert result.column(0) == pa.array([5, 7, 9])
assert result.column(1) == pa.array([-3, -3, -3])
def test_explain(df):
df = df.select(
column("a") + column("b"),
column("a") - column("b"),
)
df.explain()
def test_logical_plan(aggregate_df):
plan = aggregate_df.logical_plan()
expected = "Projection: test.c1, SUM(test.c2)"
assert expected == plan.display()
expected = (
"Projection: test.c1, SUM(test.c2)\n"
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n"
" TableScan: test"
)
assert expected == plan.display_indent()
def test_optimized_logical_plan(aggregate_df):
plan = aggregate_df.optimized_logical_plan()
expected = "Projection: test.c1, SUM(test.c2)"
assert expected == plan.display()
expected = (
"Projection: test.c1, SUM(test.c2)\n"
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n"
" TableScan: test projection=[c1, c2]"
)
assert expected == plan.display_indent()
def test_execution_plan(aggregate_df):
plan = aggregate_df.execution_plan()
expected = (
"ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n"
)
assert expected == plan.display()
expected = (
"ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n"
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n"
" TableScan: test projection=[c1, c2]"
)
indent = plan.display_indent()
# indent plan will be different for everyone due to absolute path
# to filename, so we just check for some expected content
assert "ProjectionExec:" in indent
assert "AggregateExec:" in indent
assert "CoalesceBatchesExec:" in indent
assert "RepartitionExec:" in indent
assert "CsvExec:" in indent
ctx = SessionContext()
stream = ctx.execute(plan, 0)
# get the one and only batch
batch = stream.next()
assert batch is not None
# there should be no more batches
batch = stream.next()
assert batch is None
def test_repartition(df):
df.repartition(2)
def test_repartition_by_hash(df):
df.repartition_by_hash(column("a"), num=2)
def test_intersect():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df_a = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([3, 4, 5]), pa.array([6, 7, 8])],
names=["a", "b"],
)
df_b = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([3]), pa.array([6])],
names=["a", "b"],
)
df_c = ctx.create_dataframe([[batch]]).sort(
column("a").sort(ascending=True)
)
df_a_i_b = df_a.intersect(df_b).sort(column("a").sort(ascending=True))
assert df_c.collect() == df_a_i_b.collect()
def test_except_all():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df_a = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([3, 4, 5]), pa.array([6, 7, 8])],
names=["a", "b"],
)
df_b = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2]), pa.array([4, 5])],
names=["a", "b"],
)
df_c = ctx.create_dataframe([[batch]]).sort(
column("a").sort(ascending=True)
)
df_a_e_b = df_a.except_all(df_b).sort(column("a").sort(ascending=True))
assert df_c.collect() == df_a_e_b.collect()
def test_collect_partitioned():
ctx = SessionContext()
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
def test_union(ctx):
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df_a = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([3, 4, 5]), pa.array([6, 7, 8])],
names=["a", "b"],
)
df_b = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3, 3, 4, 5]), pa.array([4, 5, 6, 6, 7, 8])],
names=["a", "b"],
)
df_c = ctx.create_dataframe([[batch]]).sort(
column("a").sort(ascending=True)
)
df_a_u_b = df_a.union(df_b).sort(column("a").sort(ascending=True))
assert df_c.collect() == df_a_u_b.collect()
def test_union_distinct(ctx):
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
df_a = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([3, 4, 5]), pa.array([6, 7, 8])],
names=["a", "b"],
)
df_b = ctx.create_dataframe([[batch]])
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3, 4, 5]), pa.array([4, 5, 6, 7, 8])],
names=["a", "b"],
)
df_c = ctx.create_dataframe([[batch]]).sort(
column("a").sort(ascending=True)
)
df_a_u_b = df_a.union(df_b, True).sort(column("a").sort(ascending=True))
assert df_c.collect() == df_a_u_b.collect()
assert df_c.collect() == df_a_u_b.collect()
def test_cache(df):
assert df.cache().collect() == df.collect()
def test_count(df):
# Get number of rows
assert df.count() == 3
def test_to_pandas(df):
# Skip test if pandas is not installed
pd = pytest.importorskip("pandas")
# Convert datafusion dataframe to pandas dataframe
pandas_df = df.to_pandas()
assert type(pandas_df) == pd.DataFrame
assert pandas_df.shape == (3, 3)
assert set(pandas_df.columns) == {"a", "b", "c"}