| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pyarrow as pa |
| import pytest |
| |
| from datafusion import functions as f |
| from datafusion import DataFrame, SessionContext, column, literal, udf |
| |
| |
| @pytest.fixture |
| def ctx(): |
| return SessionContext() |
| |
| |
| @pytest.fixture |
| def df(): |
| ctx = SessionContext() |
| |
| # create a RecordBatch and a new DataFrame from it |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| |
| return ctx.create_dataframe([[batch]]) |
| |
| |
| @pytest.fixture |
| def struct_df(): |
| ctx = SessionContext() |
| |
| # create a RecordBatch and a new DataFrame from it |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([{"c": 1}, {"c": 2}, {"c": 3}]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| |
| return ctx.create_dataframe([[batch]]) |
| |
| |
| def test_select(df): |
| df = df.select( |
| column("a") + column("b"), |
| column("a") - column("b"), |
| ) |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert result.column(0) == pa.array([5, 7, 9]) |
| assert result.column(1) == pa.array([-3, -3, -3]) |
| |
| |
| def test_select_columns(df): |
| df = df.select_columns("b", "a") |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert result.column(0) == pa.array([4, 5, 6]) |
| assert result.column(1) == pa.array([1, 2, 3]) |
| |
| |
| def test_filter(df): |
| df = df.select( |
| column("a") + column("b"), |
| column("a") - column("b"), |
| ).filter(column("a") > literal(2)) |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert result.column(0) == pa.array([9]) |
| assert result.column(1) == pa.array([-3]) |
| |
| |
| def test_sort(df): |
| df = df.sort(column("b").sort(ascending=False)) |
| |
| table = pa.Table.from_batches(df.collect()) |
| expected = {"a": [3, 2, 1], "b": [6, 5, 4]} |
| |
| assert table.to_pydict() == expected |
| |
| |
| def test_limit(df): |
| df = df.limit(1) |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert len(result.column(0)) == 1 |
| assert len(result.column(1)) == 1 |
| |
| |
| def test_with_column(df): |
| df = df.with_column("c", column("a") + column("b")) |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert result.schema.field(0).name == "a" |
| assert result.schema.field(1).name == "b" |
| assert result.schema.field(2).name == "c" |
| |
| assert result.column(0) == pa.array([1, 2, 3]) |
| assert result.column(1) == pa.array([4, 5, 6]) |
| assert result.column(2) == pa.array([5, 7, 9]) |
| |
| |
| def test_with_column_renamed(df): |
| df = df.with_column("c", column("a") + column("b")).with_column_renamed( |
| "c", "sum" |
| ) |
| |
| result = df.collect()[0] |
| |
| assert result.schema.field(0).name == "a" |
| assert result.schema.field(1).name == "b" |
| assert result.schema.field(2).name == "sum" |
| |
| |
| def test_udf(df): |
| # is_null is a pa function over arrays |
| is_null = udf( |
| lambda x: x.is_null(), |
| [pa.int64()], |
| pa.bool_(), |
| volatility="immutable", |
| ) |
| |
| df = df.select(is_null(column("a"))) |
| result = df.collect()[0].column(0) |
| |
| assert result == pa.array([False, False, False]) |
| |
| |
| def test_join(): |
| ctx = SessionContext() |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2]), pa.array([8, 10])], |
| names=["a", "c"], |
| ) |
| df1 = ctx.create_dataframe([[batch]]) |
| |
| df = df.join(df1, join_keys=(["a"], ["a"]), how="inner") |
| df = df.sort(column("a").sort(ascending=True)) |
| table = pa.Table.from_batches(df.collect()) |
| |
| expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]} |
| assert table.to_pydict() == expected |
| |
| |
| def test_distinct(): |
| ctx = SessionContext() |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3, 1, 2, 3]), pa.array([4, 5, 6, 4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_a = ( |
| ctx.create_dataframe([[batch]]) |
| .distinct() |
| .sort(column("a").sort(ascending=True)) |
| ) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_b = ctx.create_dataframe([[batch]]).sort( |
| column("a").sort(ascending=True) |
| ) |
| |
| assert df_a.collect() == df_b.collect() |
| |
| |
| def test_window_lead(df): |
| df = df.select( |
| column("a"), |
| f.alias( |
| f.window( |
| "lead", [column("b")], order_by=[f.order_by(column("b"))] |
| ), |
| "a_next", |
| ), |
| ) |
| |
| table = pa.Table.from_batches(df.collect()) |
| |
| expected = {"a": [1, 2, 3], "a_next": [5, 6, None]} |
| assert table.to_pydict() == expected |
| |
| |
| def test_get_dataframe(tmp_path): |
| ctx = SessionContext() |
| |
| path = tmp_path / "test.csv" |
| table = pa.Table.from_arrays( |
| [ |
| [1, 2, 3, 4], |
| ["a", "b", "c", "d"], |
| [1.1, 2.2, 3.3, 4.4], |
| ], |
| names=["int", "str", "float"], |
| ) |
| pa.csv.write_csv(table, path) |
| |
| ctx.register_csv("csv", path) |
| |
| df = ctx.table("csv") |
| assert isinstance(df, DataFrame) |
| |
| |
| def test_struct_select(struct_df): |
| df = struct_df.select( |
| column("a")["c"] + column("b"), |
| column("a")["c"] - column("b"), |
| ) |
| |
| # execute and collect the first (and only) batch |
| result = df.collect()[0] |
| |
| assert result.column(0) == pa.array([5, 7, 9]) |
| assert result.column(1) == pa.array([-3, -3, -3]) |
| |
| |
| def test_explain(df): |
| df = df.select( |
| column("a") + column("b"), |
| column("a") - column("b"), |
| ) |
| df.explain() |
| |
| |
| def test_repartition(df): |
| df.repartition(2) |
| |
| |
| def test_repartition_by_hash(df): |
| df.repartition_by_hash(column("a"), num=2) |
| |
| |
| def test_intersect(): |
| ctx = SessionContext() |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_a = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([3, 4, 5]), pa.array([6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_b = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([3]), pa.array([6])], |
| names=["a", "b"], |
| ) |
| df_c = ctx.create_dataframe([[batch]]).sort( |
| column("a").sort(ascending=True) |
| ) |
| |
| df_a_i_b = df_a.intersect(df_b).sort(column("a").sort(ascending=True)) |
| |
| assert df_c.collect() == df_a_i_b.collect() |
| |
| |
| def test_except_all(): |
| ctx = SessionContext() |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_a = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([3, 4, 5]), pa.array([6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_b = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2]), pa.array([4, 5])], |
| names=["a", "b"], |
| ) |
| df_c = ctx.create_dataframe([[batch]]).sort( |
| column("a").sort(ascending=True) |
| ) |
| |
| df_a_e_b = df_a.except_all(df_b).sort(column("a").sort(ascending=True)) |
| |
| assert df_c.collect() == df_a_e_b.collect() |
| |
| |
| def test_collect_partitioned(): |
| ctx = SessionContext() |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| |
| assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned() |
| |
| |
| def test_union(ctx): |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_a = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([3, 4, 5]), pa.array([6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_b = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3, 3, 4, 5]), pa.array([4, 5, 6, 6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_c = ctx.create_dataframe([[batch]]).sort( |
| column("a").sort(ascending=True) |
| ) |
| |
| df_a_u_b = df_a.union(df_b).sort(column("a").sort(ascending=True)) |
| |
| assert df_c.collect() == df_a_u_b.collect() |
| |
| |
| def test_union_distinct(ctx): |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3]), pa.array([4, 5, 6])], |
| names=["a", "b"], |
| ) |
| df_a = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([3, 4, 5]), pa.array([6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_b = ctx.create_dataframe([[batch]]) |
| |
| batch = pa.RecordBatch.from_arrays( |
| [pa.array([1, 2, 3, 4, 5]), pa.array([4, 5, 6, 7, 8])], |
| names=["a", "b"], |
| ) |
| df_c = ctx.create_dataframe([[batch]]).sort( |
| column("a").sort(ascending=True) |
| ) |
| |
| df_a_u_b = df_a.union(df_b, True).sort(column("a").sort(ascending=True)) |
| |
| assert df_c.collect() == df_a_u_b.collect() |
| assert df_c.collect() == df_a_u_b.collect() |