blob: 05195b485fb0af168183ff6398b72d7ae49f8cf7 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.pandas.config import option_context
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
# This file contains test cases for 'Reshaping, sorting, transposing'
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#reshaping-sorting-transposing
class FrameReshapingMixin:
@property
def pdf(self):
return pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=np.random.rand(9),
)
@property
def df_pair(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
return pdf, psdf
def test_sort_values(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, None, 7], "b": [7, 6, 5, 4, 3, 2, 1]}, index=np.random.rand(7)
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.sort_values("b"), pdf.sort_values("b"))
self.assert_eq(
psdf.sort_values("b", ignore_index=True), pdf.sort_values("b", ignore_index=True)
)
for ascending in [True, False]:
for na_position in ["first", "last"]:
self.assert_eq(
psdf.sort_values("a", ascending=ascending, na_position=na_position),
pdf.sort_values("a", ascending=ascending, na_position=na_position),
)
self.assert_eq(psdf.sort_values(["a", "b"]), pdf.sort_values(["a", "b"]))
self.assert_eq(
psdf.sort_values(["a", "b"], ignore_index=True),
pdf.sort_values(["a", "b"], ignore_index=True),
)
self.assert_eq(
psdf.sort_values(["a", "b"], ascending=[False, True]),
pdf.sort_values(["a", "b"], ascending=[False, True]),
)
self.assertRaises(ValueError, lambda: psdf.sort_values(["b", "a"], ascending=[False]))
self.assert_eq(
psdf.sort_values(["a", "b"], na_position="first"),
pdf.sort_values(["a", "b"], na_position="first"),
)
self.assertRaises(ValueError, lambda: psdf.sort_values(["b", "a"], na_position="invalid"))
pserA = pdf.a
psserA = psdf.a
self.assert_eq(psdf.sort_values("b", inplace=True), pdf.sort_values("b", inplace=True))
self.assert_eq(psdf, pdf)
self.assert_eq(psserA, pserA)
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, None, 7], "b": [7, 6, 5, 4, 3, 2, 1]}, index=np.random.rand(7)
)
psdf = ps.from_pandas(pdf)
pserA = pdf.a
psserA = psdf.a
self.assert_eq(
psdf.sort_values("b", inplace=True, ignore_index=True),
pdf.sort_values("b", inplace=True, ignore_index=True),
)
self.assert_eq(psdf, pdf)
self.assert_eq(psserA, pserA)
# multi-index indexes
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, None, 7], "b": [7, 6, 5, 4, 3, 2, 1]},
index=pd.MultiIndex.from_tuples(
[
("bar", "one"),
("bar", "two"),
("baz", "one"),
("baz", "two"),
("foo", "one"),
("foo", "two"),
("qux", "one"),
]
),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.sort_values("b"), pdf.sort_values("b"))
self.assert_eq(
psdf.sort_values("b", ignore_index=True), pdf.sort_values("b", ignore_index=True)
)
# multi-index columns
pdf = pd.DataFrame(
{("X", 10): [1, 2, 3, 4, 5, None, 7], ("X", 20): [7, 6, 5, 4, 3, 2, 1]},
index=np.random.rand(7),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.sort_values(("X", 20)), pdf.sort_values(("X", 20)))
self.assert_eq(
psdf.sort_values([("X", 20), ("X", 10)]), pdf.sort_values([("X", 20), ("X", 10)])
)
self.assertRaisesRegex(
ValueError,
"For a multi-index, the label must be a tuple with elements",
lambda: psdf.sort_values(["X"]),
)
# non-string names
pdf = pd.DataFrame(
{10: [1, 2, 3, 4, 5, None, 7], 20: [7, 6, 5, 4, 3, 2, 1]}, index=np.random.rand(7)
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.sort_values(20), pdf.sort_values(20))
self.assert_eq(psdf.sort_values([20, 10]), pdf.sort_values([20, 10]))
def test_sort_index(self):
pdf = pd.DataFrame(
{"A": [2, 1, np.nan], "B": [np.nan, 0, np.nan]}, index=["b", "a", np.nan]
)
psdf = ps.from_pandas(pdf)
# Assert invalid parameters
self.assertRaises(NotImplementedError, lambda: psdf.sort_index(axis=1))
self.assertRaises(NotImplementedError, lambda: psdf.sort_index(kind="mergesort"))
self.assertRaises(ValueError, lambda: psdf.sort_index(na_position="invalid"))
# Assert default behavior without parameters
self.assert_eq(psdf.sort_index(), pdf.sort_index())
# Assert ignoring index
self.assert_eq(psdf.sort_index(ignore_index=True), pdf.sort_index(ignore_index=True))
# Assert sorting descending
self.assert_eq(psdf.sort_index(ascending=False), pdf.sort_index(ascending=False))
# Assert sorting NA indices first
self.assert_eq(psdf.sort_index(na_position="first"), pdf.sort_index(na_position="first"))
# Assert sorting descending and NA indices first
self.assert_eq(
psdf.sort_index(ascending=False, na_position="first"),
pdf.sort_index(ascending=False, na_position="first"),
)
# Assert sorting inplace
pserA = pdf.A
psserA = psdf.A
self.assertEqual(psdf.sort_index(inplace=True), pdf.sort_index(inplace=True))
self.assert_eq(psdf, pdf)
self.assert_eq(psserA, pserA)
pserA = pdf.A
psserA = psdf.A
self.assertEqual(
psdf.sort_index(inplace=True, ascending=False, ignore_index=True),
pdf.sort_index(inplace=True, ascending=False, ignore_index=True),
)
self.assert_eq(psdf, pdf)
self.assert_eq(psserA, pserA)
# Assert multi-indices
pdf = pd.DataFrame(
{"A": range(4), "B": range(4)[::-1]}, index=[["b", "b", "a", "a"], [1, 0, 1, 0]]
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.sort_index(), pdf.sort_index())
self.assert_eq(psdf.sort_index(level=[1, 0]), pdf.sort_index(level=[1, 0]))
self.assert_eq(psdf.reset_index().sort_index(), pdf.reset_index().sort_index())
# Assert ignoring index
self.assert_eq(psdf.sort_index(ignore_index=True), pdf.sort_index(ignore_index=True))
# Assert with multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(psdf.sort_index(), pdf.sort_index())
def test_nlargest(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, None, 7], "b": [7, 6, 5, 4, 3, 2, 1], "c": [1, 1, 2, 2, 3, 3, 3]},
index=np.random.rand(7),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.nlargest(5, columns="a"), pdf.nlargest(5, columns="a"))
self.assert_eq(psdf.nlargest(5, columns=["a", "b"]), pdf.nlargest(5, columns=["a", "b"]))
self.assert_eq(psdf.nlargest(5, columns=["c"]), pdf.nlargest(5, columns=["c"]))
self.assert_eq(
psdf.nlargest(5, columns=["c"], keep="first"),
pdf.nlargest(5, columns=["c"], keep="first"),
)
self.assert_eq(
psdf.nlargest(5, columns=["c"], keep="last"),
pdf.nlargest(5, columns=["c"], keep="last"),
)
msg = "`keep`=all is not implemented yet."
with self.assertRaisesRegex(NotImplementedError, msg):
psdf.nlargest(5, columns=["c"], keep="all")
msg = 'keep must be either "first", "last" or "all".'
with self.assertRaisesRegex(ValueError, msg):
psdf.nlargest(5, columns=["c"], keep="xx")
def test_nsmallest(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, None, 7], "b": [7, 6, 5, 4, 3, 2, 1], "c": [1, 1, 2, 2, 3, 3, 3]},
index=np.random.rand(7),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.nsmallest(n=5, columns="a"), pdf.nsmallest(5, columns="a"))
self.assert_eq(
psdf.nsmallest(n=5, columns=["a", "b"]), pdf.nsmallest(5, columns=["a", "b"])
)
self.assert_eq(psdf.nsmallest(n=5, columns=["c"]), pdf.nsmallest(5, columns=["c"]))
self.assert_eq(
psdf.nsmallest(n=5, columns=["c"], keep="first"),
pdf.nsmallest(5, columns=["c"], keep="first"),
)
self.assert_eq(
psdf.nsmallest(n=5, columns=["c"], keep="last"),
pdf.nsmallest(5, columns=["c"], keep="last"),
)
msg = "`keep`=all is not implemented yet."
with self.assertRaisesRegex(NotImplementedError, msg):
psdf.nlargest(5, columns=["c"], keep="all")
msg = 'keep must be either "first", "last" or "all".'
with self.assertRaisesRegex(ValueError, msg):
psdf.nlargest(5, columns=["c"], keep="xx")
def test_stack(self):
pdf_single_level_cols = pd.DataFrame(
[[0, 1], [2, 3]], index=["cat", "dog"], columns=["weight", "height"]
)
psdf_single_level_cols = ps.from_pandas(pdf_single_level_cols)
self.assert_eq(
psdf_single_level_cols.stack().sort_index(), pdf_single_level_cols.stack().sort_index()
)
multicol1 = pd.MultiIndex.from_tuples(
[("weight", "kg"), ("weight", "pounds")], names=["x", "y"]
)
pdf_multi_level_cols1 = pd.DataFrame(
[[1, 2], [2, 4]], index=["cat", "dog"], columns=multicol1
)
psdf_multi_level_cols1 = ps.from_pandas(pdf_multi_level_cols1)
self.assert_eq(
psdf_multi_level_cols1.stack().sort_index(), pdf_multi_level_cols1.stack().sort_index()
)
multicol2 = pd.MultiIndex.from_tuples([("weight", "kg"), ("height", "m")])
pdf_multi_level_cols2 = pd.DataFrame(
[[1.0, 2.0], [3.0, 4.0]], index=["cat", "dog"], columns=multicol2
)
psdf_multi_level_cols2 = ps.from_pandas(pdf_multi_level_cols2)
self.assert_eq(
psdf_multi_level_cols2.stack().sort_index(), pdf_multi_level_cols2.stack().sort_index()
)
pdf = pd.DataFrame(
{
("y", "c"): [True, True],
("x", "b"): [False, False],
("x", "c"): [True, False],
("y", "a"): [False, True],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.stack().sort_index(), pdf.stack().sort_index())
self.assert_eq(psdf[[]].stack().sort_index(), pdf[[]].stack().sort_index(), almost=True)
def test_unstack(self):
pdf = pd.DataFrame(
np.random.randn(3, 3),
index=pd.MultiIndex.from_tuples([("rg1", "x"), ("rg1", "y"), ("rg2", "z")]),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.unstack().sort_index(), pdf.unstack().sort_index(), almost=True)
self.assert_eq(
psdf.unstack().unstack().sort_index(), pdf.unstack().unstack().sort_index(), almost=True
)
def test_explode(self):
pdf = pd.DataFrame(
{"A": [[-1.0, np.nan], [0.0, np.inf], [1.0, -np.inf]], "B": 1}, index=["a", "b", "c"]
)
pdf.index.name = "index"
pdf.columns.name = "columns"
psdf = ps.from_pandas(pdf)
expected_result1, result1 = pdf.explode("A"), psdf.explode("A")
expected_result2, result2 = pdf.explode("B"), psdf.explode("B")
expected_result3, result3 = pdf.explode("A", ignore_index=True), psdf.explode(
"A", ignore_index=True
)
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2, check_exact=False)
self.assert_eq(result1.index.name, expected_result1.index.name)
self.assert_eq(result1.columns.name, expected_result1.columns.name)
self.assert_eq(result3, expected_result3, almost=True)
self.assert_eq(result3.index, expected_result3.index)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
# MultiIndex
midx = pd.MultiIndex.from_tuples(
[("x", "a"), ("x", "b"), ("y", "c")], names=["index1", "index2"]
)
pdf.index = midx
psdf = ps.from_pandas(pdf)
expected_result1, result1 = pdf.explode("A"), psdf.explode("A")
expected_result2, result2 = pdf.explode("B"), psdf.explode("B")
expected_result3, result3 = pdf.explode("A", ignore_index=True), psdf.explode(
"A", ignore_index=True
)
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2, check_exact=False)
self.assert_eq(result1.index.names, expected_result1.index.names)
self.assert_eq(result1.columns.name, expected_result1.columns.name)
self.assert_eq(result3, expected_result3, almost=True)
self.assert_eq(result3.index, expected_result3.index)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
# MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X")], names=["column1", "column2"])
pdf.columns = columns
psdf.columns = columns
expected_result1, result1 = pdf.explode(("A", "Z")), psdf.explode(("A", "Z"))
expected_result2, result2 = pdf.explode(("B", "X")), psdf.explode(("B", "X"))
expected_result3, result3 = pdf.A.explode("Z"), psdf.A.explode("Z")
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2, check_exact=False)
self.assert_eq(result1.index.names, expected_result1.index.names)
self.assert_eq(result1.columns.names, expected_result1.columns.names)
self.assert_eq(result3, expected_result3, almost=True)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
self.assertRaises(ValueError, lambda: psdf.explode("A"))
def test_transpose(self):
# TODO: what if with random index?
pdf1 = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}, columns=["col1", "col2"])
psdf1 = ps.from_pandas(pdf1)
pdf2 = pd.DataFrame(
data={"score": [9, 8], "kids": [0, 0], "age": [12, 22]},
columns=["score", "kids", "age"],
)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.transpose().sort_index(), psdf1.transpose().sort_index())
self.assert_eq(pdf2.transpose().sort_index(), psdf2.transpose().sort_index())
with option_context("compute.max_rows", None):
self.assert_eq(pdf1.transpose().sort_index(), psdf1.transpose().sort_index())
self.assert_eq(pdf2.transpose().sort_index(), psdf2.transpose().sort_index())
pdf3 = pd.DataFrame(
{
("cg1", "a"): [1, 2, 3],
("cg1", "b"): [4, 5, 6],
("cg2", "c"): [7, 8, 9],
("cg3", "d"): [9, 9, 9],
},
index=pd.MultiIndex.from_tuples([("rg1", "x"), ("rg1", "y"), ("rg2", "z")]),
)
psdf3 = ps.from_pandas(pdf3)
self.assert_eq(pdf3.transpose().sort_index(), psdf3.transpose().sort_index())
with option_context("compute.max_rows", None):
self.assert_eq(pdf3.transpose().sort_index(), psdf3.transpose().sort_index())
def test_assign_list(self):
pdf, psdf = self.df_pair
pser = pdf.a
psser = psdf.a
pdf["x"] = [10, 20, 30, 40, 50, 60, 70, 80, 90]
psdf["x"] = [10, 20, 30, 40, 50, 60, 70, 80, 90]
self.assert_eq(psdf.sort_index(), pdf.sort_index())
self.assert_eq(psser, pser)
with self.assertRaisesRegex(ValueError, "Length of values does not match length of index"):
psdf["z"] = [10, 20, 30, 40, 50, 60, 70, 80]
def test_squeeze(self):
axises = [None, 0, 1, "rows", "index", "columns"]
# Multiple columns
pdf = pd.DataFrame([[1, 2], [3, 4]], columns=["a", "b"], index=["x", "y"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Multiple columns with MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X")])
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with single value
pdf = pd.DataFrame([[1]], columns=["a"], index=["x"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with single value with MultiIndex column
columns = pd.MultiIndex.from_tuples([("A", "Z")])
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with multiple values
pdf = pd.DataFrame([1, 2, 3, 4], columns=["a"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with multiple values with MultiIndex column
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
class FrameReshapingTests(
FrameReshapingMixin,
PandasOnSparkTestCase,
SQLTestUtils,
):
pass
if __name__ == "__main__":
from pyspark.pandas.tests.frame.test_reshaping import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)