blob: b58bfddb4b99685547bf79e190a1689070b29df8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.pandas.groupby import is_multi_agg_with_relabel, SeriesGroupBy
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
class GroupByTestsMixin:
def test_groupby_simple(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7],
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2],
"c": [4, 2, 7, 3, None, 1, 1, 1, 2],
"d": list("abcdefght"),
"e": [True, False, True, False, True, False, True, False, True],
},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
psdf = ps.from_pandas(pdf)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values("a").reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index).sum()),
sort(pdf.groupby("a", as_index=as_index).sum()),
)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index).b.sum()),
sort(pdf.groupby("a", as_index=as_index).b.sum()),
)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index)["b"].sum()),
sort(pdf.groupby("a", as_index=as_index)["b"].sum()),
)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index)[["b", "c"]].sum()),
sort(pdf.groupby("a", as_index=as_index)[["b", "c"]].sum()),
)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index)[[]].sum()),
sort(pdf.groupby("a", as_index=as_index)[[]].sum()),
)
self.assert_eq(
sort(psdf.groupby("a", as_index=as_index)["c"].sum()),
sort(pdf.groupby("a", as_index=as_index)["c"].sum()),
)
self.assert_eq(
psdf.groupby("a").a.sum().sort_index(), pdf.groupby("a").a.sum().sort_index()
)
self.assert_eq(
psdf.groupby("a")["a"].sum().sort_index(), pdf.groupby("a")["a"].sum().sort_index()
)
self.assert_eq(
psdf.groupby("a")[["a"]].sum().sort_index(), pdf.groupby("a")[["a"]].sum().sort_index()
)
self.assert_eq(
psdf.groupby("a")[["a", "c"]].sum().sort_index(),
pdf.groupby("a")[["a", "c"]].sum().sort_index(),
)
self.assert_eq(
psdf.a.groupby(psdf.b).sum().sort_index(), pdf.a.groupby(pdf.b).sum().sort_index()
)
for axis in [0, "index"]:
self.assert_eq(
psdf.groupby("a", axis=axis).a.sum().sort_index(),
pdf.groupby("a", axis=axis).a.sum().sort_index(),
)
self.assert_eq(
psdf.groupby("a", axis=axis)["a"].sum().sort_index(),
pdf.groupby("a", axis=axis)["a"].sum().sort_index(),
)
self.assert_eq(
psdf.groupby("a", axis=axis)[["a"]].sum().sort_index(),
pdf.groupby("a", axis=axis)[["a"]].sum().sort_index(),
)
self.assert_eq(
psdf.groupby("a", axis=axis)[["a", "c"]].sum().sort_index(),
pdf.groupby("a", axis=axis)[["a", "c"]].sum().sort_index(),
)
self.assert_eq(
psdf.a.groupby(psdf.b, axis=axis).sum().sort_index(),
pdf.a.groupby(pdf.b, axis=axis).sum().sort_index(),
)
self.assertRaises(ValueError, lambda: psdf.groupby("a", as_index=False).a)
self.assertRaises(ValueError, lambda: psdf.groupby("a", as_index=False)["a"])
self.assertRaises(ValueError, lambda: psdf.groupby("a", as_index=False)[["a"]])
self.assertRaises(ValueError, lambda: psdf.groupby("a", as_index=False)[["a", "c"]])
self.assertRaises(KeyError, lambda: psdf.groupby("z", as_index=False)[["a", "c"]])
self.assertRaises(KeyError, lambda: psdf.groupby(["z"], as_index=False)[["a", "c"]])
self.assertRaises(TypeError, lambda: psdf.a.groupby(psdf.b, as_index=False))
self.assertRaises(NotImplementedError, lambda: psdf.groupby("a", axis=1))
self.assertRaises(NotImplementedError, lambda: psdf.groupby("a", axis="columns"))
self.assertRaises(ValueError, lambda: psdf.groupby("a", "b"))
self.assertRaises(TypeError, lambda: psdf.a.groupby(psdf.a, psdf.b))
# we can't use column name/names as a parameter `by` for `SeriesGroupBy`.
self.assertRaises(KeyError, lambda: psdf.a.groupby(by="a"))
self.assertRaises(KeyError, lambda: psdf.a.groupby(by=["a", "b"]))
self.assertRaises(KeyError, lambda: psdf.a.groupby(by=("a", "b")))
self.assertRaises(KeyError, lambda: psdf.a.groupby(by=[("a", "b")]))
# we can't use DataFrame as a parameter `by` for `DataFrameGroupBy`/`SeriesGroupBy`.
self.assertRaises(ValueError, lambda: psdf.groupby(psdf))
self.assertRaises(ValueError, lambda: psdf.a.groupby(psdf))
self.assertRaises(ValueError, lambda: psdf.a.groupby((psdf,)))
with self.assertRaisesRegex(ValueError, "Grouper for 'list' not 1-dimensional"):
psdf.groupby(by=[["a", "b"]])
# non-string names
pdf = pd.DataFrame(
{
10: [1, 2, 6, 4, 4, 6, 4, 3, 7],
20: [4, 2, 7, 3, 3, 1, 1, 1, 2],
30: [4, 2, 7, 3, None, 1, 1, 1, 2],
40: list("abcdefght"),
},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
psdf = ps.from_pandas(pdf)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(10).reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby(10, as_index=as_index).sum()),
sort(pdf.groupby(10, as_index=as_index).sum()),
)
self.assert_eq(
sort(psdf.groupby(10, as_index=as_index)[20].sum()),
sort(pdf.groupby(10, as_index=as_index)[20].sum()),
)
self.assert_eq(
sort(psdf.groupby(10, as_index=as_index)[[20, 30]].sum()),
sort(pdf.groupby(10, as_index=as_index)[[20, 30]].sum()),
)
def test_shift(self):
pdf = pd.DataFrame(
{
"a": [1, 1, 2, 2, 3, 3] * 3,
"b": [1, 1, 2, 2, 3, 4] * 3,
"c": [1, 4, 9, 16, 25, 36] * 3,
},
index=np.random.rand(6 * 3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("a").shift().sort_index(), pdf.groupby("a").shift().sort_index()
)
self.assert_eq(
psdf.groupby(["a", "b"]).shift(periods=-1, fill_value=0).sort_index(),
pdf.groupby(["a", "b"]).shift(periods=-1, fill_value=0).sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])["a"].shift().sort_index(),
pdf.groupby(["b"])["a"].shift().sort_index(),
)
self.assert_eq(
psdf.groupby(["a", "b"])["c"].shift().sort_index(),
pdf.groupby(["a", "b"])["c"].shift().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5).shift().sort_index(),
pdf.groupby(pdf.b // 5).shift().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].shift().sort_index(),
pdf.groupby(pdf.b // 5)["a"].shift().sort_index(),
)
self.assert_eq(
psdf.a.rename().groupby(psdf.b).shift().sort_index(),
pdf.a.rename().groupby(pdf.b).shift().sort_index(),
)
self.assert_eq(
psdf.a.groupby(psdf.b.rename()).shift().sort_index(),
pdf.a.groupby(pdf.b.rename()).shift().sort_index(),
)
self.assert_eq(
psdf.a.rename().groupby(psdf.b.rename()).shift().sort_index(),
pdf.a.rename().groupby(pdf.b.rename()).shift().sort_index(),
)
self.assert_eq(psdf.groupby("a").shift().sum(), pdf.groupby("a").shift().sum().astype(int))
self.assert_eq(
psdf.a.rename().groupby(psdf.b).shift().sum(),
pdf.a.rename().groupby(pdf.b).shift().sum(),
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("x", "a")).shift().sort_index(),
pdf.groupby(("x", "a")).shift().sort_index(),
)
self.assert_eq(
psdf.groupby([("x", "a"), ("x", "b")]).shift(periods=-1, fill_value=0).sort_index(),
pdf.groupby([("x", "a"), ("x", "b")]).shift(periods=-1, fill_value=0).sort_index(),
)
@staticmethod
def test_is_multi_agg_with_relabel():
assert is_multi_agg_with_relabel(a="max") is False
assert is_multi_agg_with_relabel(a_min=("a", "max"), a_max=("a", "min")) is True
def test_all_any(self):
pdf = pd.DataFrame(
{
"A": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5],
"B": [True, True, True, False, False, False, None, True, None, False],
}
)
psdf = ps.from_pandas(pdf)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values("A").reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).all()),
sort(pdf.groupby("A", as_index=as_index).all()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).any()),
sort(pdf.groupby("A", as_index=as_index).any()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).all()).B,
sort(pdf.groupby("A", as_index=as_index).all()).B,
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).any()).B,
sort(pdf.groupby("A", as_index=as_index).any()).B,
)
self.assert_eq(
psdf.B.groupby(psdf.A).all().sort_index(), pdf.B.groupby(pdf.A).all().sort_index()
)
self.assert_eq(
psdf.B.groupby(psdf.A).any().sort_index(), pdf.B.groupby(pdf.A).any().sort_index()
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("Y", "B")])
pdf.columns = columns
psdf.columns = columns
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(("X", "A")).reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby(("X", "A"), as_index=as_index).all()),
sort(pdf.groupby(("X", "A"), as_index=as_index).all()),
)
self.assert_eq(
sort(psdf.groupby(("X", "A"), as_index=as_index).any()),
sort(pdf.groupby(("X", "A"), as_index=as_index).any()),
)
# Test skipna
pdf = pd.DataFrame({"A": [True, True], "B": [1, np.nan], "C": [True, None]})
pdf.name = "x"
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("A").all(skipna=False).sort_index(),
pdf.groupby("A").all(skipna=False).sort_index(),
)
self.assert_eq(
psdf.groupby("A").all(skipna=True).sort_index(),
pdf.groupby("A").all(skipna=True).sort_index(),
)
def test_nunique(self):
pdf = pd.DataFrame(
{"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0], "b": [2, 2, 2, 3, 3, 4, 4, 5, 5, 5]}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("a").agg({"b": "nunique"}).sort_index(),
pdf.groupby("a").agg({"b": "nunique"}).sort_index(),
)
self.assert_eq(
psdf.groupby("a").nunique().sort_index(), pdf.groupby("a").nunique().sort_index()
)
self.assert_eq(
psdf.groupby("a").nunique(dropna=False).sort_index(),
pdf.groupby("a").nunique(dropna=False).sort_index(),
)
self.assert_eq(
psdf.groupby("a")["b"].nunique().sort_index(),
pdf.groupby("a")["b"].nunique().sort_index(),
)
self.assert_eq(
psdf.groupby("a")["b"].nunique(dropna=False).sort_index(),
pdf.groupby("a")["b"].nunique(dropna=False).sort_index(),
)
nunique_psdf = psdf.groupby("a", as_index=False).agg({"b": "nunique"})
nunique_pdf = pdf.groupby("a", as_index=False).agg({"b": "nunique"})
self.assert_eq(
nunique_psdf.sort_values(["a", "b"]).reset_index(drop=True),
nunique_pdf.sort_values(["a", "b"]).reset_index(drop=True),
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("x", "a")).nunique().sort_index(),
pdf.groupby(("x", "a")).nunique().sort_index(),
)
self.assert_eq(
psdf.groupby(("x", "a")).nunique(dropna=False).sort_index(),
pdf.groupby(("x", "a")).nunique(dropna=False).sort_index(),
)
def test_unique(self):
for pdf in [
pd.DataFrame(
{"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0], "b": [2, 2, 2, 3, 3, 4, 4, 5, 5, 5]}
),
pd.DataFrame(
{
"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0],
"b": ["w", "w", "w", "x", "x", "y", "y", "z", "z", "z"],
}
),
]:
with self.subTest(pdf=pdf):
psdf = ps.from_pandas(pdf)
actual = psdf.groupby("a")["b"].unique().sort_index()._to_pandas()
expect = pdf.groupby("a")["b"].unique().sort_index()
self.assert_eq(len(actual), len(expect))
for act, exp in zip(actual, expect):
self.assertTrue(sorted(act) == sorted(exp))
def test_diff(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 3,
"b": [1, 1, 2, 3, 5, 8] * 3,
"c": [1, 4, 9, 16, 25, 36] * 3,
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.groupby("b").diff().sort_index(), pdf.groupby("b").diff().sort_index())
self.assert_eq(
psdf.groupby(["a", "b"]).diff().sort_index(),
pdf.groupby(["a", "b"]).diff().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])["a"].diff().sort_index(),
pdf.groupby(["b"])["a"].diff().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])[["a", "b"]].diff().sort_index(),
pdf.groupby(["b"])[["a", "b"]].diff().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5).diff().sort_index(),
pdf.groupby(pdf.b // 5).diff().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].diff().sort_index(),
pdf.groupby(pdf.b // 5)["a"].diff().sort_index(),
)
self.assert_eq(psdf.groupby("b").diff().sum(), pdf.groupby("b").diff().sum().astype(int))
self.assert_eq(psdf.groupby(["b"])["a"].diff().sum(), pdf.groupby(["b"])["a"].diff().sum())
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("x", "b")).diff().sort_index(),
pdf.groupby(("x", "b")).diff().sort_index(),
)
self.assert_eq(
psdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(),
pdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(),
)
def test_aggregate_relabel_index_false(self):
pdf = pd.DataFrame(
{
"A": [0, 0, 1, 1, 1],
"B": ["a", "a", "b", "a", "b"],
"C": [10, 15, 10, 20, 30],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.groupby(["B", "A"], as_index=False)
.agg(C_MAX=("C", "max"))
.sort_values(["B", "A"])
.reset_index(drop=True),
psdf.groupby(["B", "A"], as_index=False)
.agg(C_MAX=("C", "max"))
.sort_values(["B", "A"])
.reset_index(drop=True),
)
class GroupByTests(
GroupByTestsMixin,
PandasOnSparkTestCase,
TestUtils,
):
pass
if __name__ == "__main__":
from pyspark.pandas.tests.groupby.test_groupby import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)