blob: ca050eecad4b5193c84f54049119af54d821b804 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from distutils.version import LooseVersion
from itertools import product
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
class GroupBySlowTest(PandasOnSparkTestCase, TestUtils):
def test_split_apply_combine_on_series(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7],
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2],
"c": [4, 2, 7, 3, None, 1, 1, 1, 2],
"d": list("abcdefght"),
},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
psdf = ps.from_pandas(pdf)
funcs = [
((True, False), ["sum", "min", "max", "count", "first", "last"]),
((True, True), ["mean"]),
((False, False), ["var", "std", "skew"]),
]
funcs = [(check_exact, almost, f) for (check_exact, almost), fs in funcs for f in fs]
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(list(df.columns)).reset_index(drop=True)
for check_exact, almost, func in funcs:
for kkey, pkey in [("b", "b"), (psdf.b, pdf.b)]:
with self.subTest(as_index=as_index, func=func, key=pkey):
if as_index is True or func != "std":
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index).a, func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index), func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
check_exact=check_exact,
almost=almost,
)
else:
# seems like a pandas' bug for as_index=False and func == "std"?
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index).a, func)()),
sort(pdf.groupby(pkey, as_index=True).a.std().reset_index()),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index), func)()),
sort(pdf.groupby(pkey, as_index=True).std().reset_index()),
check_exact=check_exact,
almost=almost,
)
for kkey, pkey in [(psdf.b + 1, pdf.b + 1), (psdf.copy().b, pdf.copy().b)]:
with self.subTest(as_index=as_index, func=func, key=pkey):
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index).a, func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
sort(getattr(psdf.groupby(kkey, as_index=as_index), func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
check_exact=check_exact,
almost=almost,
)
for check_exact, almost, func in funcs:
for i in [0, 4, 7]:
with self.subTest(as_index=as_index, func=func, i=i):
self.assert_eq(
sort(getattr(psdf.groupby(psdf.b > i, as_index=as_index).a, func)()),
sort(getattr(pdf.groupby(pdf.b > i, as_index=as_index).a, func)()),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
sort(getattr(psdf.groupby(psdf.b > i, as_index=as_index), func)()),
sort(getattr(pdf.groupby(pdf.b > i, as_index=as_index), func)()),
check_exact=check_exact,
almost=almost,
)
for check_exact, almost, func in funcs:
for kkey, pkey in [
(psdf.b, pdf.b),
(psdf.b + 1, pdf.b + 1),
(psdf.copy().b, pdf.copy().b),
(psdf.b.rename(), pdf.b.rename()),
]:
with self.subTest(func=func, key=pkey):
self.assert_eq(
getattr(psdf.a.groupby(kkey), func)().sort_index(),
getattr(pdf.a.groupby(pkey), func)().sort_index(),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
getattr((psdf.a + 1).groupby(kkey), func)().sort_index(),
getattr((pdf.a + 1).groupby(pkey), func)().sort_index(),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
getattr((psdf.b + 1).groupby(kkey), func)().sort_index(),
getattr((pdf.b + 1).groupby(pkey), func)().sort_index(),
check_exact=check_exact,
almost=almost,
)
self.assert_eq(
getattr(psdf.a.rename().groupby(kkey), func)().sort_index(),
getattr(pdf.a.rename().groupby(pkey), func)().sort_index(),
check_exact=check_exact,
almost=almost,
)
def test_aggregate(self):
pdf = pd.DataFrame(
{"A": [1, 1, 2, 2], "B": [1, 2, 3, 4], "C": [0.362, 0.227, 1.267, -0.562]}
)
psdf = ps.from_pandas(pdf)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(list(df.columns)).reset_index(drop=True)
for kkey, pkey in [("A", "A"), (psdf.A, pdf.A)]:
with self.subTest(as_index=as_index, key=pkey):
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg("sum")),
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
)
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
)
self.assert_eq(
sort(
psdf.groupby(kkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
sort(
pdf.groupby(pkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
)
if as_index:
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg(["sum"])),
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
)
else:
# seems like a pandas' bug for as_index=False and func_or_funcs is list?
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg(["sum"])),
sort(pdf.groupby(pkey, as_index=True).agg(["sum"]).reset_index()),
)
for kkey, pkey in [(psdf.A + 1, pdf.A + 1), (psdf.copy().A, pdf.copy().A)]:
with self.subTest(as_index=as_index, key=pkey):
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg("sum")),
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
)
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
)
self.assert_eq(
sort(
psdf.groupby(kkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
sort(
pdf.groupby(pkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
)
self.assert_eq(
sort(psdf.groupby(kkey, as_index=as_index).agg(["sum"])),
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
)
expected_error_message = (
r"aggs must be a dict mapping from column name to aggregate functions "
r"\(string or list of strings\)."
)
with self.assertRaisesRegex(ValueError, expected_error_message):
psdf.groupby("A", as_index=as_index).agg(0)
# multi-index columns
columns = pd.MultiIndex.from_tuples([(10, "A"), (10, "B"), (20, "C")])
pdf.columns = columns
psdf.columns = columns
for as_index in [True, False]:
stats_psdf = psdf.groupby((10, "A"), as_index=as_index).agg(
{(10, "B"): "min", (20, "C"): "sum"}
)
stats_pdf = pdf.groupby((10, "A"), as_index=as_index).agg(
{(10, "B"): "min", (20, "C"): "sum"}
)
self.assert_eq(
stats_psdf.sort_values(by=[(10, "B"), (20, "C")]).reset_index(drop=True),
stats_pdf.sort_values(by=[(10, "B"), (20, "C")]).reset_index(drop=True),
)
stats_psdf = psdf.groupby((10, "A")).agg({(10, "B"): ["min", "max"], (20, "C"): "sum"})
stats_pdf = pdf.groupby((10, "A")).agg({(10, "B"): ["min", "max"], (20, "C"): "sum"})
self.assert_eq(
stats_psdf.sort_values(
by=[(10, "B", "min"), (10, "B", "max"), (20, "C", "sum")]
).reset_index(drop=True),
stats_pdf.sort_values(
by=[(10, "B", "min"), (10, "B", "max"), (20, "C", "sum")]
).reset_index(drop=True),
)
# non-string names
pdf.columns = [10, 20, 30]
psdf.columns = [10, 20, 30]
for as_index in [True, False]:
stats_psdf = psdf.groupby(10, as_index=as_index).agg({20: "min", 30: "sum"})
stats_pdf = pdf.groupby(10, as_index=as_index).agg({20: "min", 30: "sum"})
self.assert_eq(
stats_psdf.sort_values(by=[20, 30]).reset_index(drop=True),
stats_pdf.sort_values(by=[20, 30]).reset_index(drop=True),
)
stats_psdf = psdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
stats_pdf = pdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
self.assert_eq(
stats_psdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
stats_pdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
)
def test_aggregate_func_str_list(self):
# this is test for cases where only string or list is assigned
pdf = pd.DataFrame(
{
"kind": ["cat", "dog", "cat", "dog"],
"height": [9.1, 6.0, 9.5, 34.0],
"weight": [7.9, 7.5, 9.9, 198.0],
}
)
psdf = ps.from_pandas(pdf)
agg_funcs = ["max", "min", ["min", "max"]]
for aggfunc in agg_funcs:
# Since in Koalas groupby, the order of rows might be different
# so sort on index to ensure they have same output
sorted_agg_psdf = psdf.groupby("kind").agg(aggfunc).sort_index()
sorted_agg_pdf = pdf.groupby("kind").agg(aggfunc).sort_index()
self.assert_eq(sorted_agg_psdf, sorted_agg_pdf)
# test on multi index column case
pdf = pd.DataFrame(
{"A": [1, 1, 2, 2], "B": [1, 2, 3, 4], "C": [0.362, 0.227, 1.267, -0.562]}
)
psdf = ps.from_pandas(pdf)
columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B"), ("Y", "C")])
pdf.columns = columns
psdf.columns = columns
for aggfunc in agg_funcs:
sorted_agg_psdf = psdf.groupby(("X", "A")).agg(aggfunc).sort_index()
sorted_agg_pdf = pdf.groupby(("X", "A")).agg(aggfunc).sort_index()
self.assert_eq(sorted_agg_psdf, sorted_agg_pdf)
def test_aggregate_relabel(self):
# this is to test named aggregation in groupby
pdf = pd.DataFrame({"group": ["a", "a", "b", "b"], "A": [0, 1, 2, 3], "B": [5, 6, 7, 8]})
psdf = ps.from_pandas(pdf)
# different agg column, same function
agg_pdf = pdf.groupby("group").agg(a_max=("A", "max"), b_max=("B", "max")).sort_index()
agg_psdf = psdf.groupby("group").agg(a_max=("A", "max"), b_max=("B", "max")).sort_index()
self.assert_eq(agg_pdf, agg_psdf)
# same agg column, different functions
agg_pdf = pdf.groupby("group").agg(b_max=("B", "max"), b_min=("B", "min")).sort_index()
agg_psdf = psdf.groupby("group").agg(b_max=("B", "max"), b_min=("B", "min")).sort_index()
self.assert_eq(agg_pdf, agg_psdf)
# test on NamedAgg
agg_pdf = (
pdf.groupby("group").agg(b_max=pd.NamedAgg(column="B", aggfunc="max")).sort_index()
)
agg_psdf = (
psdf.groupby("group").agg(b_max=ps.NamedAgg(column="B", aggfunc="max")).sort_index()
)
self.assert_eq(agg_psdf, agg_pdf)
# test on NamedAgg multi columns aggregation
agg_pdf = (
pdf.groupby("group")
.agg(
b_max=pd.NamedAgg(column="B", aggfunc="max"),
b_min=pd.NamedAgg(column="B", aggfunc="min"),
)
.sort_index()
)
agg_psdf = (
psdf.groupby("group")
.agg(
b_max=ps.NamedAgg(column="B", aggfunc="max"),
b_min=ps.NamedAgg(column="B", aggfunc="min"),
)
.sort_index()
)
self.assert_eq(agg_psdf, agg_pdf)
def test_dropna(self):
pdf = pd.DataFrame(
{"A": [None, 1, None, 1, 2], "B": [1, 2, 3, None, None], "C": [4, 5, 6, 7, None]}
)
psdf = ps.from_pandas(pdf)
# pd.DataFrame.groupby with dropna parameter is implemented since pandas 1.1.0
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values("A").reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index, dropna=dropna).std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna).std()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index, dropna=dropna).B.std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna).B.std()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index, dropna=dropna)["B"].std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna)["B"].std()),
)
self.assert_eq(
sort(
psdf.groupby("A", as_index=as_index, dropna=dropna).agg(
{"B": "min", "C": "std"}
)
),
sort(
pdf.groupby("A", as_index=as_index, dropna=dropna).agg(
{"B": "min", "C": "std"}
)
),
)
for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(["A", "B"]).reset_index(drop=True)
self.assert_eq(
sort(
psdf.groupby(["A", "B"], as_index=as_index, dropna=dropna).agg(
{"C": ["min", "std"]}
)
),
sort(
pdf.groupby(["A", "B"], as_index=as_index, dropna=dropna).agg(
{"C": ["min", "std"]}
)
),
almost=True,
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B"), ("Y", "C")])
pdf.columns = columns
psdf.columns = columns
for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(("X", "A")).reset_index(drop=True)
sorted_stats_psdf = sort(
psdf.groupby(("X", "A"), as_index=as_index, dropna=dropna).agg(
{("X", "B"): "min", ("Y", "C"): "std"}
)
)
sorted_stats_pdf = sort(
pdf.groupby(("X", "A"), as_index=as_index, dropna=dropna).agg(
{("X", "B"): "min", ("Y", "C"): "std"}
)
)
self.assert_eq(sorted_stats_psdf, sorted_stats_pdf)
else:
# Testing dropna=True (pandas default behavior)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values("A").reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index, dropna=True)["B"].min()),
sort(pdf.groupby("A", as_index=as_index)["B"].min()),
)
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(["A", "B"]).reset_index(drop=True)
self.assert_eq(
sort(
psdf.groupby(["A", "B"], as_index=as_index, dropna=True).agg(
{"C": ["min", "std"]}
)
),
sort(pdf.groupby(["A", "B"], as_index=as_index).agg({"C": ["min", "std"]})),
almost=True,
)
# Testing dropna=False
index = pd.Index([1.0, 2.0, np.nan], name="A")
expected = pd.Series([2.0, np.nan, 1.0], index=index, name="B")
result = psdf.groupby("A", as_index=True, dropna=False)["B"].min().sort_index()
self.assert_eq(expected, result)
expected = pd.DataFrame({"A": [1.0, 2.0, np.nan], "B": [2.0, np.nan, 1.0]})
result = (
psdf.groupby("A", as_index=False, dropna=False)["B"]
.min()
.sort_values("A")
.reset_index(drop=True)
)
self.assert_eq(expected, result)
index = pd.MultiIndex.from_tuples(
[(1.0, 2.0), (1.0, None), (2.0, None), (None, 1.0), (None, 3.0)], names=["A", "B"]
)
expected = pd.DataFrame(
{
("C", "min"): [5.0, 7.0, np.nan, 4.0, 6.0],
("C", "std"): [np.nan, np.nan, np.nan, np.nan, np.nan],
},
index=index,
)
result = (
psdf.groupby(["A", "B"], as_index=True, dropna=False)
.agg({"C": ["min", "std"]})
.sort_index()
)
self.assert_eq(expected, result)
expected = pd.DataFrame(
{
("A", ""): [1.0, 1.0, 2.0, np.nan, np.nan],
("B", ""): [2.0, np.nan, np.nan, 1.0, 3.0],
("C", "min"): [5.0, 7.0, np.nan, 4.0, 6.0],
("C", "std"): [np.nan, np.nan, np.nan, np.nan, np.nan],
}
)
result = (
psdf.groupby(["A", "B"], as_index=False, dropna=False)
.agg({"C": ["min", "std"]})
.sort_values(["A", "B"])
.reset_index(drop=True)
)
self.assert_eq(expected, result)
def test_describe(self):
# support for numeric type, not support for string type yet
datas = []
datas.append({"a": [1, 1, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
datas.append({"a": [-1, -1, -3], "b": [-4, -5, -6], "c": [-7, -8, -9]})
datas.append({"a": [0, 0, 0], "b": [0, 0, 0], "c": [0, 8, 0]})
# it is okay if string type column as a group key
datas.append({"a": ["a", "a", "c"], "b": [4, 5, 6], "c": [7, 8, 9]})
percentiles = [0.25, 0.5, 0.75]
formatted_percentiles = ["25%", "50%", "75%"]
non_percentile_stats = ["count", "mean", "std", "min", "max"]
for data in datas:
pdf = pd.DataFrame(data)
psdf = ps.from_pandas(pdf)
describe_pdf = pdf.groupby("a").describe().sort_index()
describe_psdf = psdf.groupby("a").describe().sort_index()
# since the result of percentile columns are slightly difference from pandas,
# we should check them separately: non-percentile columns & percentile columns
# 1. Check that non-percentile columns are equal.
agg_cols = [col.name for col in psdf.groupby("a")._agg_columns]
self.assert_eq(
describe_psdf.drop(columns=list(product(agg_cols, formatted_percentiles))),
describe_pdf.drop(columns=formatted_percentiles, level=1),
check_exact=False,
)
# 2. Check that percentile columns are equal.
# The interpolation argument is yet to be implemented in Koalas.
quantile_pdf = pdf.groupby("a").quantile(percentiles, interpolation="nearest")
quantile_pdf = quantile_pdf.unstack(level=1).astype(float)
self.assert_eq(
describe_psdf.drop(columns=list(product(agg_cols, non_percentile_stats))),
quantile_pdf.rename(columns="{:.0%}".format, level=1),
)
# not support for string type yet
datas = []
datas.append({"a": ["a", "a", "c"], "b": ["d", "e", "f"], "c": ["g", "h", "i"]})
datas.append({"a": ["a", "a", "c"], "b": [4, 0, 1], "c": ["g", "h", "i"]})
for data in datas:
pdf = pd.DataFrame(data)
psdf = ps.from_pandas(pdf)
self.assertRaises(
NotImplementedError, lambda: psdf.groupby("a").describe().sort_index()
)
# multi-index columns
pdf = pd.DataFrame({("x", "a"): [1, 1, 3], ("x", "b"): [4, 5, 6], ("y", "c"): [7, 8, 9]})
psdf = ps.from_pandas(pdf)
describe_pdf = pdf.groupby(("x", "a")).describe().sort_index()
describe_psdf = psdf.groupby(("x", "a")).describe().sort_index()
# 1. Check that non-percentile columns are equal.
agg_column_labels = [col._column_label for col in psdf.groupby(("x", "a"))._agg_columns]
self.assert_eq(
describe_psdf.drop(
columns=[
tuple(list(label) + [s])
for label, s in product(agg_column_labels, formatted_percentiles)
]
),
describe_pdf.drop(columns=formatted_percentiles, level=2),
check_exact=False,
)
# 2. Check that percentile columns are equal.
# The interpolation argument is yet to be implemented in Koalas.
quantile_pdf = pdf.groupby(("x", "a")).quantile(percentiles, interpolation="nearest")
quantile_pdf = quantile_pdf.unstack(level=1).astype(float)
self.assert_eq(
describe_psdf.drop(
columns=[
tuple(list(label) + [s])
for label, s in product(agg_column_labels, non_percentile_stats)
]
),
quantile_pdf.rename(columns="{:.0%}".format, level=2),
)
def test_aggregate_relabel_multiindex(self):
pdf = pd.DataFrame({"A": [0, 1, 2, 3], "B": [5, 6, 7, 8], "group": ["a", "a", "b", "b"]})
pdf.columns = pd.MultiIndex.from_tuples([("y", "A"), ("y", "B"), ("x", "group")])
psdf = ps.from_pandas(pdf)
agg_pdf = pdf.groupby(("x", "group")).agg(a_max=(("y", "A"), "max")).sort_index()
agg_psdf = psdf.groupby(("x", "group")).agg(a_max=(("y", "A"), "max")).sort_index()
self.assert_eq(agg_pdf, agg_psdf)
# same column, different methods
agg_pdf = (
pdf.groupby(("x", "group"))
.agg(a_max=(("y", "A"), "max"), a_min=(("y", "A"), "min"))
.sort_index()
)
agg_psdf = (
psdf.groupby(("x", "group"))
.agg(a_max=(("y", "A"), "max"), a_min=(("y", "A"), "min"))
.sort_index()
)
self.assert_eq(agg_pdf, agg_psdf)
# different column, different methods
agg_pdf = (
pdf.groupby(("x", "group"))
.agg(a_max=(("y", "B"), "max"), a_min=(("y", "A"), "min"))
.sort_index()
)
agg_psdf = (
psdf.groupby(("x", "group"))
.agg(a_max=(("y", "B"), "max"), a_min=(("y", "A"), "min"))
.sort_index()
)
self.assert_eq(agg_pdf, agg_psdf)
def test_all_any(self):
pdf = pd.DataFrame(
{
"A": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5],
"B": [True, True, True, False, False, False, None, True, None, False],
}
)
psdf = ps.from_pandas(pdf)
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values("A").reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).all()),
sort(pdf.groupby("A", as_index=as_index).all()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).any()),
sort(pdf.groupby("A", as_index=as_index).any()),
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).all()).B,
sort(pdf.groupby("A", as_index=as_index).all()).B,
)
self.assert_eq(
sort(psdf.groupby("A", as_index=as_index).any()).B,
sort(pdf.groupby("A", as_index=as_index).any()).B,
)
self.assert_eq(
psdf.B.groupby(psdf.A).all().sort_index(), pdf.B.groupby(pdf.A).all().sort_index()
)
self.assert_eq(
psdf.B.groupby(psdf.A).any().sort_index(), pdf.B.groupby(pdf.A).any().sort_index()
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("Y", "B")])
pdf.columns = columns
psdf.columns = columns
for as_index in [True, False]:
if as_index:
def sort(df):
return df.sort_index()
else:
def sort(df):
return df.sort_values(("X", "A")).reset_index(drop=True)
self.assert_eq(
sort(psdf.groupby(("X", "A"), as_index=as_index).all()),
sort(pdf.groupby(("X", "A"), as_index=as_index).all()),
)
self.assert_eq(
sort(psdf.groupby(("X", "A"), as_index=as_index).any()),
sort(pdf.groupby(("X", "A"), as_index=as_index).any()),
)
# Test skipna
pdf = pd.DataFrame({"A": [True, True], "B": [1, np.nan], "C": [True, None]})
pdf.name = "x"
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("A").all(skipna=False).sort_index(),
pdf.groupby("A").all(skipna=False).sort_index(),
)
self.assert_eq(
psdf.groupby("A").all(skipna=True).sort_index(),
pdf.groupby("A").all(skipna=True).sort_index(),
)
def test_raises(self):
psdf = ps.DataFrame(
{"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
# test raises with incorrect key
self.assertRaises(ValueError, lambda: psdf.groupby([]))
self.assertRaises(KeyError, lambda: psdf.groupby("x"))
self.assertRaises(KeyError, lambda: psdf.groupby(["a", "x"]))
self.assertRaises(KeyError, lambda: psdf.groupby("a")["x"])
self.assertRaises(KeyError, lambda: psdf.groupby("a")["b", "x"])
self.assertRaises(KeyError, lambda: psdf.groupby("a")[["b", "x"]])
def test_nunique(self):
pdf = pd.DataFrame(
{"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0], "b": [2, 2, 2, 3, 3, 4, 4, 5, 5, 5]}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("a").agg({"b": "nunique"}).sort_index(),
pdf.groupby("a").agg({"b": "nunique"}).sort_index(),
)
if LooseVersion(pd.__version__) < LooseVersion("1.1.0"):
expected = ps.DataFrame({"b": [2, 2]}, index=pd.Index([0, 1], name="a"))
self.assert_eq(psdf.groupby("a").nunique().sort_index(), expected)
self.assert_eq(
psdf.groupby("a").nunique(dropna=False).sort_index(),
expected,
)
else:
self.assert_eq(
psdf.groupby("a").nunique().sort_index(), pdf.groupby("a").nunique().sort_index()
)
self.assert_eq(
psdf.groupby("a").nunique(dropna=False).sort_index(),
pdf.groupby("a").nunique(dropna=False).sort_index(),
)
self.assert_eq(
psdf.groupby("a")["b"].nunique().sort_index(),
pdf.groupby("a")["b"].nunique().sort_index(),
)
self.assert_eq(
psdf.groupby("a")["b"].nunique(dropna=False).sort_index(),
pdf.groupby("a")["b"].nunique(dropna=False).sort_index(),
)
nunique_psdf = psdf.groupby("a", as_index=False).agg({"b": "nunique"})
nunique_pdf = pdf.groupby("a", as_index=False).agg({"b": "nunique"})
self.assert_eq(
nunique_psdf.sort_values(["a", "b"]).reset_index(drop=True),
nunique_pdf.sort_values(["a", "b"]).reset_index(drop=True),
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b")])
pdf.columns = columns
psdf.columns = columns
if LooseVersion(pd.__version__) < LooseVersion("1.1.0"):
expected = ps.DataFrame({("y", "b"): [2, 2]}, index=pd.Index([0, 1], name=("x", "a")))
self.assert_eq(
psdf.groupby(("x", "a")).nunique().sort_index(),
expected,
)
self.assert_eq(
psdf.groupby(("x", "a")).nunique(dropna=False).sort_index(),
expected,
)
else:
self.assert_eq(
psdf.groupby(("x", "a")).nunique().sort_index(),
pdf.groupby(("x", "a")).nunique().sort_index(),
)
self.assert_eq(
psdf.groupby(("x", "a")).nunique(dropna=False).sort_index(),
pdf.groupby(("x", "a")).nunique(dropna=False).sort_index(),
)
def test_unique(self):
for pdf in [
pd.DataFrame(
{"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0], "b": [2, 2, 2, 3, 3, 4, 4, 5, 5, 5]}
),
pd.DataFrame(
{
"a": [1, 1, 1, 1, 1, 0, 0, 0, 0, 0],
"b": ["w", "w", "w", "x", "x", "y", "y", "z", "z", "z"],
}
),
]:
with self.subTest(pdf=pdf):
psdf = ps.from_pandas(pdf)
actual = psdf.groupby("a")["b"].unique().sort_index()._to_pandas()
expect = pdf.groupby("a")["b"].unique().sort_index()
self.assert_eq(len(actual), len(expect))
for act, exp in zip(actual, expect):
self.assertTrue(sorted(act) == sorted(exp))
def test_value_counts(self):
pdf = pd.DataFrame(
{"A": [np.nan, 2, 2, 3, 3, 3], "B": [1, 1, 2, 3, 3, np.nan]}, columns=["A", "B"]
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.groupby("A")["B"].value_counts().sort_index(),
pdf.groupby("A")["B"].value_counts().sort_index(),
)
self.assert_eq(
psdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
pdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
)
self.assert_eq(
psdf.groupby("A", dropna=False)["B"].value_counts(dropna=False).sort_index(),
pdf.groupby("A", dropna=False)["B"].value_counts(dropna=False).sort_index(),
# Returns are the same considering values and types,
# disable check_exact to pass the assert_eq
check_exact=False,
)
self.assert_eq(
psdf.groupby("A")["B"].value_counts(sort=True, ascending=False).sort_index(),
pdf.groupby("A")["B"].value_counts(sort=True, ascending=False).sort_index(),
)
self.assert_eq(
psdf.groupby("A")["B"]
.value_counts(sort=True, ascending=False, dropna=False)
.sort_index(),
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=False, dropna=False)
.sort_index(),
)
self.assert_eq(
psdf.groupby("A")["B"]
.value_counts(sort=True, ascending=True, dropna=False)
.sort_index(),
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=True, dropna=False)
.sort_index(),
)
self.assert_eq(
psdf.B.rename().groupby(psdf.A).value_counts().sort_index(),
pdf.B.rename().groupby(pdf.A).value_counts().sort_index(),
)
self.assert_eq(
psdf.B.rename().groupby(psdf.A, dropna=False).value_counts().sort_index(),
pdf.B.rename().groupby(pdf.A, dropna=False).value_counts().sort_index(),
# Returns are the same considering values and types,
# disable check_exact to pass the assert_eq
check_exact=False,
)
self.assert_eq(
psdf.B.groupby(psdf.A.rename()).value_counts().sort_index(),
pdf.B.groupby(pdf.A.rename()).value_counts().sort_index(),
)
self.assert_eq(
psdf.B.rename().groupby(psdf.A.rename()).value_counts().sort_index(),
pdf.B.rename().groupby(pdf.A.rename()).value_counts().sort_index(),
)
def test_size(self):
pdf = pd.DataFrame({"A": [1, 2, 2, 3, 3, 3], "B": [1, 1, 2, 3, 3, 3]})
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.groupby("A").size().sort_index(), pdf.groupby("A").size().sort_index())
self.assert_eq(
psdf.groupby("A")["B"].size().sort_index(), pdf.groupby("A")["B"].size().sort_index()
)
self.assert_eq(
psdf.groupby("A")[["B"]].size().sort_index(),
pdf.groupby("A")[["B"]].size().sort_index(),
)
self.assert_eq(
psdf.groupby(["A", "B"]).size().sort_index(),
pdf.groupby(["A", "B"]).size().sort_index(),
)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("Y", "B")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("X", "A")).size().sort_index(),
pdf.groupby(("X", "A")).size().sort_index(),
)
self.assert_eq(
psdf.groupby([("X", "A"), ("Y", "B")]).size().sort_index(),
pdf.groupby([("X", "A"), ("Y", "B")]).size().sort_index(),
)
def test_diff(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 3,
"b": [1, 1, 2, 3, 5, 8] * 3,
"c": [1, 4, 9, 16, 25, 36] * 3,
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.groupby("b").diff().sort_index(), pdf.groupby("b").diff().sort_index())
self.assert_eq(
psdf.groupby(["a", "b"]).diff().sort_index(),
pdf.groupby(["a", "b"]).diff().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])["a"].diff().sort_index(),
pdf.groupby(["b"])["a"].diff().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])[["a", "b"]].diff().sort_index(),
pdf.groupby(["b"])[["a", "b"]].diff().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5).diff().sort_index(),
pdf.groupby(pdf.b // 5).diff().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].diff().sort_index(),
pdf.groupby(pdf.b // 5)["a"].diff().sort_index(),
)
self.assert_eq(psdf.groupby("b").diff().sum(), pdf.groupby("b").diff().sum().astype(int))
self.assert_eq(psdf.groupby(["b"])["a"].diff().sum(), pdf.groupby(["b"])["a"].diff().sum())
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("x", "b")).diff().sort_index(),
pdf.groupby(("x", "b")).diff().sort_index(),
)
self.assert_eq(
psdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(),
pdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(),
)
def test_rank(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 3,
"b": [1, 1, 2, 3, 5, 8] * 3,
"c": [1, 4, 9, 16, 25, 36] * 3,
},
index=np.random.rand(6 * 3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.groupby("b").rank().sort_index(), pdf.groupby("b").rank().sort_index())
self.assert_eq(
psdf.groupby(["a", "b"]).rank().sort_index(),
pdf.groupby(["a", "b"]).rank().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])["a"].rank().sort_index(),
pdf.groupby(["b"])["a"].rank().sort_index(),
)
self.assert_eq(
psdf.groupby(["b"])[["a", "c"]].rank().sort_index(),
pdf.groupby(["b"])[["a", "c"]].rank().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5).rank().sort_index(),
pdf.groupby(pdf.b // 5).rank().sort_index(),
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].rank().sort_index(),
pdf.groupby(pdf.b // 5)["a"].rank().sort_index(),
)
self.assert_eq(psdf.groupby("b").rank().sum(), pdf.groupby("b").rank().sum())
self.assert_eq(psdf.groupby(["b"])["a"].rank().sum(), pdf.groupby(["b"])["a"].rank().sum())
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.groupby(("x", "b")).rank().sort_index(),
pdf.groupby(("x", "b")).rank().sort_index(),
)
self.assert_eq(
psdf.groupby([("x", "a"), ("x", "b")]).rank().sort_index(),
pdf.groupby([("x", "a"), ("x", "b")]).rank().sort_index(),
)
if __name__ == "__main__":
from pyspark.pandas.tests.test_groupby_slow import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)