blob: 2e7eec8f0a1b3c492f54470024daa393e93d5234 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import decimal
from datetime import datetime
from distutils.version import LooseVersion
import sys
import unittest
from io import StringIO
from typing import List
import numpy as np
import pandas as pd
from pandas.tseries.offsets import DateOffset
from pyspark import StorageLevel
from pyspark.ml.linalg import SparseVector
from pyspark.sql.types import StructType
from pyspark import pandas as ps
from pyspark.pandas.config import option_context
from pyspark.pandas.frame import CachedDataFrame
from pyspark.testing.pandasutils import (
have_tabulate,
ComparisonTestBase,
SPARK_CONF_ARROW_ENABLED,
tabulate_requirement_message,
)
from pyspark.testing.sqlutils import SQLTestUtils
class DataFrameSlowTest(ComparisonTestBase, SQLTestUtils):
@property
def pdf(self):
return pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=np.random.rand(9),
)
@property
def df_pair(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
return pdf, psdf
def test_rank(self):
pdf = pd.DataFrame(
data={"col1": [1, 2, 3, 1], "col2": [3, 4, 3, 1]},
columns=["col1", "col2"],
index=np.random.rand(4),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.rank().sort_index(), psdf.rank().sort_index())
self.assert_eq(pdf.rank().sum(), psdf.rank().sum())
self.assert_eq(
pdf.rank(ascending=False).sort_index(), psdf.rank(ascending=False).sort_index()
)
self.assert_eq(pdf.rank(method="min").sort_index(), psdf.rank(method="min").sort_index())
self.assert_eq(pdf.rank(method="max").sort_index(), psdf.rank(method="max").sort_index())
self.assert_eq(
pdf.rank(method="first").sort_index(), psdf.rank(method="first").sort_index()
)
self.assert_eq(
pdf.rank(method="dense").sort_index(), psdf.rank(method="dense").sort_index()
)
msg = "method must be one of 'average', 'min', 'max', 'first', 'dense'"
with self.assertRaisesRegex(ValueError, msg):
psdf.rank(method="nothing")
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "col1"), ("y", "col2")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.rank().sort_index(), psdf.rank().sort_index())
# non-numeric columns
pdf = pd.DataFrame(
data={"col1": [1, 2, 3, 1], "col2": ["a", "b", "c", "d"]},
index=np.random.rand(4),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.rank(numeric_only=True).sort_index(), psdf.rank(numeric_only=True).sort_index()
)
self.assert_eq(
pdf.rank(numeric_only=False).sort_index(), psdf.rank(numeric_only=False).sort_index()
)
self.assert_eq(
pdf.rank(numeric_only=None).sort_index(), psdf.rank(numeric_only=None).sort_index()
)
self.assert_eq(
pdf[["col2"]].rank(numeric_only=True),
psdf[["col2"]].rank(numeric_only=True),
)
def test_round(self):
pdf = pd.DataFrame(
{
"A": [0.028208, 0.038683, 0.877076],
"B": [0.992815, 0.645646, 0.149370],
"C": [0.173891, 0.577595, 0.491027],
},
columns=["A", "B", "C"],
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
pser = pd.Series([1, 0, 2], index=["A", "B", "C"])
psser = ps.Series([1, 0, 2], index=["A", "B", "C"])
self.assert_eq(pdf.round(2), psdf.round(2))
self.assert_eq(pdf.round({"A": 1, "C": 2}), psdf.round({"A": 1, "C": 2}))
self.assert_eq(pdf.round({"A": 1, "D": 2}), psdf.round({"A": 1, "D": 2}))
self.assert_eq(pdf.round(pser), psdf.round(psser))
msg = "decimals must be an integer, a dict-like or a Series"
with self.assertRaisesRegex(TypeError, msg):
psdf.round(1.5)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B"), ("Y", "C")])
pdf.columns = columns
psdf.columns = columns
pser = pd.Series([1, 0, 2], index=columns)
psser = ps.Series([1, 0, 2], index=columns)
self.assert_eq(pdf.round(2), psdf.round(2))
self.assert_eq(
pdf.round({("X", "A"): 1, ("Y", "C"): 2}), psdf.round({("X", "A"): 1, ("Y", "C"): 2})
)
self.assert_eq(pdf.round({("X", "A"): 1, "Y": 2}), psdf.round({("X", "A"): 1, "Y": 2}))
self.assert_eq(pdf.round(pser), psdf.round(psser))
# non-string names
pdf = pd.DataFrame(
{
10: [0.028208, 0.038683, 0.877076],
20: [0.992815, 0.645646, 0.149370],
30: [0.173891, 0.577595, 0.491027],
},
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.round({10: 1, 30: 2}), psdf.round({10: 1, 30: 2}))
def test_shift(self):
pdf = pd.DataFrame(
{
"Col1": [10, 20, 15, 30, 45],
"Col2": [13, 23, 18, 33, 48],
"Col3": [17, 27, 22, 37, 52],
},
index=np.random.rand(5),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.shift(3), psdf.shift(3))
self.assert_eq(pdf.shift().shift(-1), psdf.shift().shift(-1))
self.assert_eq(pdf.shift().sum().astype(int), psdf.shift().sum())
# Need the expected result since pandas 0.23 does not support `fill_value` argument.
pdf1 = pd.DataFrame(
{"Col1": [0, 0, 0, 10, 20], "Col2": [0, 0, 0, 13, 23], "Col3": [0, 0, 0, 17, 27]},
index=pdf.index,
)
self.assert_eq(pdf1, psdf.shift(periods=3, fill_value=0))
msg = "should be an int"
with self.assertRaisesRegex(TypeError, msg):
psdf.shift(1.5)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "Col1"), ("x", "Col2"), ("y", "Col3")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.shift(3), psdf.shift(3))
self.assert_eq(pdf.shift().shift(-1), psdf.shift().shift(-1))
self.assert_eq(pdf.shift(0), psdf.shift(0))
def test_diff(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
index=np.random.rand(6),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.diff(), psdf.diff())
self.assert_eq(pdf.diff().diff(-1), psdf.diff().diff(-1))
self.assert_eq(pdf.diff().sum().astype(int), psdf.diff().sum())
msg = "should be an int"
with self.assertRaisesRegex(TypeError, msg):
psdf.diff(1.5)
msg = 'axis should be either 0 or "index" currently.'
with self.assertRaisesRegex(NotImplementedError, msg):
psdf.diff(axis=1)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "Col1"), ("x", "Col2"), ("y", "Col3")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.diff(), psdf.diff())
def test_duplicated(self):
pdf = pd.DataFrame(
{"a": [1, 1, 2, 3], "b": [1, 1, 1, 4], "c": [1, 1, 1, 5]}, index=np.random.rand(4)
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.duplicated().sort_index(), psdf.duplicated().sort_index())
self.assert_eq(
pdf.duplicated(keep="last").sort_index(),
psdf.duplicated(keep="last").sort_index(),
)
self.assert_eq(
pdf.duplicated(keep=False).sort_index(),
psdf.duplicated(keep=False).sort_index(),
)
self.assert_eq(
pdf.duplicated(subset="b").sort_index(),
psdf.duplicated(subset="b").sort_index(),
)
self.assert_eq(
pdf.duplicated(subset=["b"]).sort_index(),
psdf.duplicated(subset=["b"]).sort_index(),
)
with self.assertRaisesRegex(ValueError, "'keep' only supports 'first', 'last' and False"):
psdf.duplicated(keep="false")
with self.assertRaisesRegex(KeyError, "'d'"):
psdf.duplicated(subset=["d"])
pdf.index.name = "x"
psdf.index.name = "x"
self.assert_eq(pdf.duplicated().sort_index(), psdf.duplicated().sort_index())
# multi-index
self.assert_eq(
pdf.set_index("a", append=True).duplicated().sort_index(),
psdf.set_index("a", append=True).duplicated().sort_index(),
)
self.assert_eq(
pdf.set_index("a", append=True).duplicated(keep=False).sort_index(),
psdf.set_index("a", append=True).duplicated(keep=False).sort_index(),
)
self.assert_eq(
pdf.set_index("a", append=True).duplicated(subset=["b"]).sort_index(),
psdf.set_index("a", append=True).duplicated(subset=["b"]).sort_index(),
)
# mutli-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.duplicated().sort_index(), psdf.duplicated().sort_index())
self.assert_eq(
pdf.duplicated(subset=("x", "b")).sort_index(),
psdf.duplicated(subset=("x", "b")).sort_index(),
)
self.assert_eq(
pdf.duplicated(subset=[("x", "b")]).sort_index(),
psdf.duplicated(subset=[("x", "b")]).sort_index(),
)
# non-string names
pdf = pd.DataFrame(
{10: [1, 1, 2, 3], 20: [1, 1, 1, 4], 30: [1, 1, 1, 5]}, index=np.random.rand(4)
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.duplicated().sort_index(), psdf.duplicated().sort_index())
self.assert_eq(
pdf.duplicated(subset=10).sort_index(),
psdf.duplicated(subset=10).sort_index(),
)
def test_ffill(self):
idx = np.random.rand(6)
pdf = pd.DataFrame(
{
"x": [np.nan, 2, 3, 4, np.nan, 6],
"y": [1, 2, np.nan, 4, np.nan, np.nan],
"z": [1, 2, 3, 4, np.nan, np.nan],
},
index=idx,
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.ffill(), pdf.ffill())
self.assert_eq(psdf.ffill(limit=1), pdf.ffill(limit=1))
pser = pdf.y
psser = psdf.y
psdf.ffill(inplace=True)
pdf.ffill(inplace=True)
self.assert_eq(psdf, pdf)
self.assert_eq(psser, pser)
self.assert_eq(psser[idx[2]], pser[idx[2]])
def test_bfill(self):
idx = np.random.rand(6)
pdf = pd.DataFrame(
{
"x": [np.nan, 2, 3, 4, np.nan, 6],
"y": [1, 2, np.nan, 4, np.nan, np.nan],
"z": [1, 2, 3, 4, np.nan, np.nan],
},
index=idx,
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.bfill(), pdf.bfill())
self.assert_eq(psdf.bfill(limit=1), pdf.bfill(limit=1))
pser = pdf.x
psser = psdf.x
psdf.bfill(inplace=True)
pdf.bfill(inplace=True)
self.assert_eq(psdf, pdf)
self.assert_eq(psser, pser)
self.assert_eq(psser[idx[0]], pser[idx[0]])
def test_filter(self):
pdf = pd.DataFrame(
{
"aa": ["aa", "bd", "bc", "ab", "ce"],
"ba": [1, 2, 3, 4, 5],
"cb": [1.0, 2.0, 3.0, 4.0, 5.0],
"db": [1.0, np.nan, 3.0, np.nan, 5.0],
}
)
pdf = pdf.set_index("aa")
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.filter(items=["ab", "aa"], axis=0).sort_index(),
pdf.filter(items=["ab", "aa"], axis=0).sort_index(),
)
with option_context("compute.isin_limit", 0):
self.assert_eq(
psdf.filter(items=["ab", "aa"], axis=0).sort_index(),
pdf.filter(items=["ab", "aa"], axis=0).sort_index(),
)
self.assert_eq(
psdf.filter(items=["ba", "db"], axis=1).sort_index(),
pdf.filter(items=["ba", "db"], axis=1).sort_index(),
)
self.assert_eq(psdf.filter(like="b", axis="index"), pdf.filter(like="b", axis="index"))
self.assert_eq(psdf.filter(like="c", axis="columns"), pdf.filter(like="c", axis="columns"))
self.assert_eq(
psdf.filter(regex="b.*", axis="index"), pdf.filter(regex="b.*", axis="index")
)
self.assert_eq(
psdf.filter(regex="b.*", axis="columns"), pdf.filter(regex="b.*", axis="columns")
)
pdf = pdf.set_index("ba", append=True)
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.filter(items=[("aa", 1), ("bd", 2)], axis=0).sort_index(),
pdf.filter(items=[("aa", 1), ("bd", 2)], axis=0).sort_index(),
)
with self.assertRaisesRegex(TypeError, "Unsupported type list"):
psdf.filter(items=[["aa", 1], ("bd", 2)], axis=0)
with self.assertRaisesRegex(ValueError, "The item should not be empty."):
psdf.filter(items=[(), ("bd", 2)], axis=0)
self.assert_eq(psdf.filter(like="b", axis=0), pdf.filter(like="b", axis=0))
self.assert_eq(psdf.filter(regex="b.*", axis=0), pdf.filter(regex="b.*", axis=0))
with self.assertRaisesRegex(ValueError, "items should be a list-like object"):
psdf.filter(items="b")
with self.assertRaisesRegex(ValueError, "No axis named"):
psdf.filter(regex="b.*", axis=123)
with self.assertRaisesRegex(TypeError, "Must pass either `items`, `like`"):
psdf.filter()
with self.assertRaisesRegex(TypeError, "mutually exclusive"):
psdf.filter(regex="b.*", like="aaa")
# multi-index columns
pdf = pd.DataFrame(
{
("x", "aa"): ["aa", "ab", "bc", "bd", "ce"],
("x", "ba"): [1, 2, 3, 4, 5],
("y", "cb"): [1.0, 2.0, 3.0, 4.0, 5.0],
("z", "db"): [1.0, np.nan, 3.0, np.nan, 5.0],
}
)
pdf = pdf.set_index(("x", "aa"))
psdf = ps.from_pandas(pdf)
self.assert_eq(
psdf.filter(items=["ab", "aa"], axis=0).sort_index(),
pdf.filter(items=["ab", "aa"], axis=0).sort_index(),
)
self.assert_eq(
psdf.filter(items=[("x", "ba"), ("z", "db")], axis=1).sort_index(),
pdf.filter(items=[("x", "ba"), ("z", "db")], axis=1).sort_index(),
)
self.assert_eq(psdf.filter(like="b", axis="index"), pdf.filter(like="b", axis="index"))
self.assert_eq(psdf.filter(like="c", axis="columns"), pdf.filter(like="c", axis="columns"))
self.assert_eq(
psdf.filter(regex="b.*", axis="index"), pdf.filter(regex="b.*", axis="index")
)
self.assert_eq(
psdf.filter(regex="b.*", axis="columns"), pdf.filter(regex="b.*", axis="columns")
)
def test_pipe(self):
psdf = ps.DataFrame(
{"category": ["A", "A", "B"], "col1": [1, 2, 3], "col2": [4, 5, 6]},
columns=["category", "col1", "col2"],
)
self.assertRaisesRegex(
ValueError,
"arg is both the pipe target and a keyword argument",
lambda: psdf.pipe((lambda x: x, "arg"), arg="1"),
)
def test_transform(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 100,
"b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
"c": [1, 4, 9, 16, 25, 36] * 100,
},
columns=["a", "b", "c"],
index=np.random.rand(600),
)
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.transform(lambda x: x + 1).sort_index(),
pdf.transform(lambda x: x + 1).sort_index(),
)
self.assert_eq(
psdf.transform(lambda x, y: x + y, y=2).sort_index(),
pdf.transform(lambda x, y: x + y, y=2).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.transform(lambda x: x + 1).sort_index(),
pdf.transform(lambda x: x + 1).sort_index(),
)
self.assert_eq(
psdf.transform(lambda x, y: x + y, y=1).sort_index(),
pdf.transform(lambda x, y: x + y, y=1).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.transform(1)
with self.assertRaisesRegex(
NotImplementedError, 'axis should be either 0 or "index" currently.'
):
psdf.transform(lambda x: x + 1, axis=1)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.transform(lambda x: x + 1).sort_index(),
pdf.transform(lambda x: x + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.transform(lambda x: x + 1).sort_index(),
pdf.transform(lambda x: x + 1).sort_index(),
)
def test_apply(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 100,
"b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
"c": [1, 4, 9, 16, 25, 36] * 100,
},
columns=["a", "b", "c"],
index=np.random.rand(600),
)
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
)
self.assert_eq(
psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
)
self.assert_eq(
psdf.apply(lambda x, b: x + b, b=1).sort_index(),
pdf.apply(lambda x, b: x + b, b=1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
)
self.assert_eq(
psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
)
self.assert_eq(
psdf.apply(lambda x, b: x + b, b=1).sort_index(),
pdf.apply(lambda x, b: x + b, b=1).sort_index(),
)
# returning a Series
self.assert_eq(
psdf.apply(lambda x: len(x), axis=1).sort_index(),
pdf.apply(lambda x: len(x), axis=1).sort_index(),
)
self.assert_eq(
psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.apply(lambda x: len(x), axis=1).sort_index(),
pdf.apply(lambda x: len(x), axis=1).sort_index(),
)
self.assert_eq(
psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.apply(1)
with self.assertRaisesRegex(TypeError, "The given function.*1 or 'column'; however"):
def f1(_) -> ps.DataFrame[int]:
pass
psdf.apply(f1, axis=0)
with self.assertRaisesRegex(TypeError, "The given function.*0 or 'index'; however"):
def f2(_) -> ps.Series[int]:
pass
psdf.apply(f2, axis=1)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
)
# returning a Series
self.assert_eq(
psdf.apply(lambda x: len(x), axis=1).sort_index(),
pdf.apply(lambda x: len(x), axis=1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.apply(lambda x: len(x), axis=1).sort_index(),
pdf.apply(lambda x: len(x), axis=1).sort_index(),
)
def test_apply_with_type(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
def identify1(x) -> ps.DataFrame[int, int]:
return x
# Type hints set the default column names, and we use default index for
# pandas API on Spark. Here we ignore both diff.
actual = psdf.apply(identify1, axis=1)
expected = pdf.apply(identify1, axis=1)
self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]: # noqa: F405
return x
actual = psdf.apply(identify2, axis=1)
expected = pdf.apply(identify2, axis=1)
self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
def test_apply_batch(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 100,
"b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
"c": [1, 4, 9, 16, 25, 36] * 100,
},
columns=["a", "b", "c"],
index=np.random.rand(600),
)
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
(pdf + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
(pdf + 1).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.pandas_on_spark.apply_batch(1)
with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
def f2(_) -> ps.Series[int]:
pass
psdf.pandas_on_spark.apply_batch(f2)
with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
def test_apply_batch_with_type(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
def identify1(x) -> ps.DataFrame[int, int]:
return x
# Type hints set the default column names, and we use default index for
# pandas API on Spark. Here we ignore both diff.
actual = psdf.pandas_on_spark.apply_batch(identify1)
expected = pdf
self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]: # noqa: F405
return x
actual = psdf.pandas_on_spark.apply_batch(identify2)
expected = pdf
self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
index=np.random.rand(9),
)
psdf = ps.from_pandas(pdf)
def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
return x
actual = psdf.pandas_on_spark.apply_batch(identify3)
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
# For NumPy typing, NumPy version should be 1.21+ and Python version should be 3.8+
if sys.version_info >= (3, 8) and LooseVersion(np.__version__) >= LooseVersion("1.21"):
import numpy.typing as ntp
psdf = ps.from_pandas(pdf)
def identify4(
x,
) -> ps.DataFrame[float, [int, ntp.NDArray[int]]]:
return x
actual = psdf.pandas_on_spark.apply_batch(identify4)
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
arrays = [[1, 2, 3, 4, 5, 6, 7, 8, 9], ["a", "b", "c", "d", "e", "f", "g", "h", "i"]]
idx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
index=idx,
)
psdf = ps.from_pandas(pdf)
def identify4(x) -> ps.DataFrame[[int, str], [int, List[int]]]:
return x
actual = psdf.pandas_on_spark.apply_batch(identify4)
actual.index.names = ["number", "color"]
actual.columns = ["a", "b"]
self.assert_eq(actual, pdf)
def identify5(
x,
) -> ps.DataFrame[
[("number", int), ("color", str)], [("a", int), ("b", List[int])] # noqa: F405
]:
return x
actual = psdf.pandas_on_spark.apply_batch(identify5)
self.assert_eq(actual, pdf)
def test_transform_batch(self):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6] * 100,
"b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
"c": [1, 4, 9, 16, 25, 36] * 100,
},
columns=["a", "b", "c"],
index=np.random.rand(600),
)
psdf = ps.DataFrame(pdf)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.c + 1).sort_index(),
(pdf.c + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
(pdf.c + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf + 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.b + 1).sort_index(),
(pdf.b + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf + a, 1).sort_index(),
(pdf + 1).sort_index(),
)
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda pdf, a: pdf.c + a, a=1).sort_index(),
(pdf.c + 1).sort_index(),
)
with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
psdf.pandas_on_spark.transform_batch(1)
with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
psdf.pandas_on_spark.transform_batch(lambda pdf: 1)
with self.assertRaisesRegex(
ValueError, "transform_batch cannot produce aggregated results"
):
psdf.pandas_on_spark.transform_batch(lambda pdf: pd.Series(1))
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
with option_context("compute.shortcut_limit", 500):
self.assert_eq(
psdf.pandas_on_spark.transform_batch(lambda x: x + 1).sort_index(),
(pdf + 1).sort_index(),
)
def test_transform_batch_with_type(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
def identify1(x) -> ps.DataFrame[int, int]:
return x
# Type hints set the default column names, and we use default index for
# pandas API on Spark. Here we ignore both diff.
actual = psdf.pandas_on_spark.transform_batch(identify1)
expected = pdf
self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]: # noqa: F405
return x
actual = psdf.pandas_on_spark.transform_batch(identify2)
expected = pdf
self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
def test_transform_batch_same_anchor(self):
psdf = ps.range(10)
psdf["d"] = psdf.pandas_on_spark.transform_batch(lambda pdf: pdf.id + 1)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),
)
psdf = ps.range(10)
def plus_one(pdf) -> ps.Series[np.int64]:
return pdf.id + 1
psdf["d"] = psdf.pandas_on_spark.transform_batch(plus_one)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),
)
psdf = ps.range(10)
def plus_one(ser) -> ps.Series[np.int64]:
return ser + 1
psdf["d"] = psdf.id.pandas_on_spark.transform_batch(plus_one)
self.assert_eq(
psdf,
pd.DataFrame({"id": list(range(10)), "d": list(range(1, 11))}, columns=["id", "d"]),
)
def test_empty_timestamp(self):
pdf = pd.DataFrame(
{
"t": [
datetime(2019, 1, 1, 0, 0, 0),
datetime(2019, 1, 2, 0, 0, 0),
datetime(2019, 1, 3, 0, 0, 0),
]
},
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf[psdf["t"] != psdf["t"]], pdf[pdf["t"] != pdf["t"]])
self.assert_eq(psdf[psdf["t"] != psdf["t"]].dtypes, pdf[pdf["t"] != pdf["t"]].dtypes)
def test_to_spark(self):
psdf = ps.from_pandas(self.pdf)
with self.assertRaisesRegex(ValueError, "'index_col' cannot be overlapped"):
psdf.to_spark(index_col="a")
with self.assertRaisesRegex(ValueError, "length of index columns.*1.*3"):
psdf.to_spark(index_col=["x", "y", "z"])
def test_keys(self):
pdf = pd.DataFrame(
[[1, 2], [4, 5], [7, 8]],
index=["cobra", "viper", "sidewinder"],
columns=["max_speed", "shield"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.keys(), pdf.keys())
def test_quantile(self):
pdf, psdf = self.df_pair
self.assert_eq(psdf.quantile(0.5), pdf.quantile(0.5))
self.assert_eq(psdf.quantile([0.25, 0.5, 0.75]), pdf.quantile([0.25, 0.5, 0.75]))
self.assert_eq(psdf.loc[[]].quantile(0.5), pdf.loc[[]].quantile(0.5))
self.assert_eq(
psdf.loc[[]].quantile([0.25, 0.5, 0.75]), pdf.loc[[]].quantile([0.25, 0.5, 0.75])
)
with self.assertRaisesRegex(
NotImplementedError, 'axis should be either 0 or "index" currently.'
):
psdf.quantile(0.5, axis=1)
with self.assertRaisesRegex(TypeError, "accuracy must be an integer; however"):
psdf.quantile(accuracy="a")
with self.assertRaisesRegex(TypeError, "q must be a float or an array of floats;"):
psdf.quantile(q="a")
with self.assertRaisesRegex(TypeError, "q must be a float or an array of floats;"):
psdf.quantile(q=["a"])
with self.assertRaisesRegex(
ValueError, r"percentiles should all be in the interval \[0, 1\]"
):
psdf.quantile(q=[1.1])
self.assert_eq(
psdf.quantile(0.5, numeric_only=False), pdf.quantile(0.5, numeric_only=False)
)
self.assert_eq(
psdf.quantile([0.25, 0.5, 0.75], numeric_only=False),
pdf.quantile([0.25, 0.5, 0.75], numeric_only=False),
)
# multi-index column
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(psdf.quantile(0.5), pdf.quantile(0.5))
self.assert_eq(psdf.quantile([0.25, 0.5, 0.75]), pdf.quantile([0.25, 0.5, 0.75]))
pdf = pd.DataFrame({"x": ["a", "b", "c"]})
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.quantile(0.5), pdf.quantile(0.5))
self.assert_eq(psdf.quantile([0.25, 0.5, 0.75]), pdf.quantile([0.25, 0.5, 0.75]))
with self.assertRaisesRegex(TypeError, "Could not convert object \\(string\\) to numeric"):
psdf.quantile(0.5, numeric_only=False)
with self.assertRaisesRegex(TypeError, "Could not convert object \\(string\\) to numeric"):
psdf.quantile([0.25, 0.5, 0.75], numeric_only=False)
def test_pct_change(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0], "c": [300, 200, 400, 200]},
index=np.random.rand(4),
)
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")])
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.pct_change(2), pdf.pct_change(2), check_exact=False)
self.assert_eq(psdf.pct_change().sum(), pdf.pct_change().sum(), check_exact=False)
def test_where(self):
pdf, psdf = self.df_pair
# pandas requires `axis` argument when the `other` is Series.
# `axis` is not fully supported yet in pandas-on-Spark.
self.assert_eq(
psdf.where(psdf > 2, psdf.a + 10, axis=0), pdf.where(pdf > 2, pdf.a + 10, axis=0)
)
with self.assertRaisesRegex(TypeError, "type of cond must be a DataFrame or Series"):
psdf.where(1)
with self.assertRaisesRegex(
NotImplementedError, 'axis should be either 0 or "index" currently.'
):
psdf.where(psdf > 2, psdf.a + 10, axis=1)
def test_mask(self):
psdf = ps.from_pandas(self.pdf)
with self.assertRaisesRegex(TypeError, "type of cond must be a DataFrame or Series"):
psdf.mask(1)
def test_query(self):
pdf = pd.DataFrame({"A": range(1, 6), "B": range(10, 0, -2), "C": range(10, 5, -1)})
psdf = ps.from_pandas(pdf)
exprs = ("A > B", "A < C", "C == B")
for expr in exprs:
self.assert_eq(psdf.query(expr), pdf.query(expr))
# test `inplace=True`
for expr in exprs:
dummy_psdf = psdf.copy()
dummy_pdf = pdf.copy()
pser = dummy_pdf.A
psser = dummy_psdf.A
dummy_pdf.query(expr, inplace=True)
dummy_psdf.query(expr, inplace=True)
self.assert_eq(dummy_psdf, dummy_pdf)
self.assert_eq(psser, pser)
# invalid values for `expr`
invalid_exprs = (1, 1.0, (exprs[0],), [exprs[0]])
for expr in invalid_exprs:
with self.assertRaisesRegex(
TypeError,
"expr must be a string to be evaluated, {} given".format(type(expr).__name__),
):
psdf.query(expr)
# invalid values for `inplace`
invalid_inplaces = (1, 0, "True", "False")
for inplace in invalid_inplaces:
with self.assertRaisesRegex(
TypeError,
'For argument "inplace" expected type bool, received type {}.'.format(
type(inplace).__name__
),
):
psdf.query("a < b", inplace=inplace)
# doesn't support for MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X"), ("C", "C")])
psdf.columns = columns
with self.assertRaisesRegex(TypeError, "Doesn't support for MultiIndex columns"):
psdf.query("('A', 'Z') > ('B', 'X')")
def test_take(self):
pdf = pd.DataFrame(
{"A": range(0, 50000), "B": range(100000, 0, -2), "C": range(100000, 50000, -1)}
)
psdf = ps.from_pandas(pdf)
# axis=0 (default)
self.assert_eq(psdf.take([1, 2]).sort_index(), pdf.take([1, 2]).sort_index())
self.assert_eq(psdf.take([-1, -2]).sort_index(), pdf.take([-1, -2]).sort_index())
self.assert_eq(
psdf.take(range(100, 110)).sort_index(), pdf.take(range(100, 110)).sort_index()
)
self.assert_eq(
psdf.take(range(-110, -100)).sort_index(), pdf.take(range(-110, -100)).sort_index()
)
self.assert_eq(
psdf.take([10, 100, 1000, 10000]).sort_index(),
pdf.take([10, 100, 1000, 10000]).sort_index(),
)
self.assert_eq(
psdf.take([-10, -100, -1000, -10000]).sort_index(),
pdf.take([-10, -100, -1000, -10000]).sort_index(),
)
# axis=1
self.assert_eq(
psdf.take([1, 2], axis=1).sort_index(), pdf.take([1, 2], axis=1).sort_index()
)
self.assert_eq(
psdf.take([-1, -2], axis=1).sort_index(), pdf.take([-1, -2], axis=1).sort_index()
)
self.assert_eq(
psdf.take(range(1, 3), axis=1).sort_index(),
pdf.take(range(1, 3), axis=1).sort_index(),
)
self.assert_eq(
psdf.take(range(-1, -3), axis=1).sort_index(),
pdf.take(range(-1, -3), axis=1).sort_index(),
)
self.assert_eq(
psdf.take([2, 1], axis=1).sort_index(),
pdf.take([2, 1], axis=1).sort_index(),
)
self.assert_eq(
psdf.take([-1, -2], axis=1).sort_index(),
pdf.take([-1, -2], axis=1).sort_index(),
)
# MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X"), ("C", "C")])
psdf.columns = columns
pdf.columns = columns
# MultiIndex columns with axis=0 (default)
self.assert_eq(psdf.take([1, 2]).sort_index(), pdf.take([1, 2]).sort_index())
self.assert_eq(psdf.take([-1, -2]).sort_index(), pdf.take([-1, -2]).sort_index())
self.assert_eq(
psdf.take(range(100, 110)).sort_index(), pdf.take(range(100, 110)).sort_index()
)
self.assert_eq(
psdf.take(range(-110, -100)).sort_index(), pdf.take(range(-110, -100)).sort_index()
)
self.assert_eq(
psdf.take([10, 100, 1000, 10000]).sort_index(),
pdf.take([10, 100, 1000, 10000]).sort_index(),
)
self.assert_eq(
psdf.take([-10, -100, -1000, -10000]).sort_index(),
pdf.take([-10, -100, -1000, -10000]).sort_index(),
)
# axis=1
self.assert_eq(
psdf.take([1, 2], axis=1).sort_index(), pdf.take([1, 2], axis=1).sort_index()
)
self.assert_eq(
psdf.take([-1, -2], axis=1).sort_index(), pdf.take([-1, -2], axis=1).sort_index()
)
self.assert_eq(
psdf.take(range(1, 3), axis=1).sort_index(),
pdf.take(range(1, 3), axis=1).sort_index(),
)
self.assert_eq(
psdf.take(range(-1, -3), axis=1).sort_index(),
pdf.take(range(-1, -3), axis=1).sort_index(),
)
self.assert_eq(
psdf.take([2, 1], axis=1).sort_index(),
pdf.take([2, 1], axis=1).sort_index(),
)
self.assert_eq(
psdf.take([-1, -2], axis=1).sort_index(),
pdf.take([-1, -2], axis=1).sort_index(),
)
# Checking the type of indices.
self.assertRaises(TypeError, lambda: psdf.take(1))
self.assertRaises(TypeError, lambda: psdf.take("1"))
self.assertRaises(TypeError, lambda: psdf.take({1, 2}))
self.assertRaises(TypeError, lambda: psdf.take({1: None, 2: None}))
def test_axes(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.axes, psdf.axes)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.axes, psdf.axes)
def test_udt(self):
sparse_values = {0: 0.1, 1: 1.1}
sparse_vector = SparseVector(len(sparse_values), sparse_values)
pdf = pd.DataFrame({"a": [sparse_vector], "b": [10]})
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf, pdf)
def test_eval(self):
pdf = pd.DataFrame({"A": range(1, 6), "B": range(10, 0, -2)})
psdf = ps.from_pandas(pdf)
# operation between columns (returns Series)
self.assert_eq(pdf.eval("A + B"), psdf.eval("A + B"))
self.assert_eq(pdf.eval("A + A"), psdf.eval("A + A"))
# assignment (returns DataFrame)
self.assert_eq(pdf.eval("C = A + B"), psdf.eval("C = A + B"))
self.assert_eq(pdf.eval("A = A + A"), psdf.eval("A = A + A"))
# operation between scalars (returns scalar)
self.assert_eq(pdf.eval("1 + 1"), psdf.eval("1 + 1"))
# complicated operations with assignment
self.assert_eq(
pdf.eval("B = A + B // (100 + 200) * (500 - B) - 10.5"),
psdf.eval("B = A + B // (100 + 200) * (500 - B) - 10.5"),
)
# inplace=True (only support for assignment)
pdf.eval("C = A + B", inplace=True)
psdf.eval("C = A + B", inplace=True)
self.assert_eq(pdf, psdf)
pser = pdf.A
psser = psdf.A
pdf.eval("A = B + C", inplace=True)
psdf.eval("A = B + C", inplace=True)
self.assert_eq(pdf, psdf)
# Skip due to pandas bug: https://github.com/pandas-dev/pandas/issues/47449
if not (LooseVersion("1.4.0") <= LooseVersion(pd.__version__) <= LooseVersion("1.4.3")):
self.assert_eq(pser, psser)
# doesn't support for multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b"), ("z", "c")])
psdf.columns = columns
self.assertRaises(TypeError, lambda: psdf.eval("x.a + y.b"))
@unittest.skipIf(not have_tabulate, tabulate_requirement_message)
def test_to_markdown(self):
pdf = pd.DataFrame(data={"animal_1": ["elk", "pig"], "animal_2": ["dog", "quetzal"]})
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.to_markdown(), psdf.to_markdown())
def test_cache(self):
pdf = pd.DataFrame(
[(0.2, 0.3), (0.0, 0.6), (0.6, 0.0), (0.2, 0.1)], columns=["dogs", "cats"]
)
psdf = ps.from_pandas(pdf)
with psdf.spark.cache() as cached_df:
self.assert_eq(isinstance(cached_df, CachedDataFrame), True)
self.assert_eq(
repr(cached_df.spark.storage_level), repr(StorageLevel(True, True, False, True))
)
def test_persist(self):
pdf = pd.DataFrame(
[(0.2, 0.3), (0.0, 0.6), (0.6, 0.0), (0.2, 0.1)], columns=["dogs", "cats"]
)
psdf = ps.from_pandas(pdf)
storage_levels = [
StorageLevel.DISK_ONLY,
StorageLevel.MEMORY_AND_DISK,
StorageLevel.MEMORY_ONLY,
StorageLevel.OFF_HEAP,
]
for storage_level in storage_levels:
with psdf.spark.persist(storage_level) as cached_df:
self.assert_eq(isinstance(cached_df, CachedDataFrame), True)
self.assert_eq(repr(cached_df.spark.storage_level), repr(storage_level))
self.assertRaises(TypeError, lambda: psdf.spark.persist("DISK_ONLY"))
def test_squeeze(self):
axises = [None, 0, 1, "rows", "index", "columns"]
# Multiple columns
pdf = pd.DataFrame([[1, 2], [3, 4]], columns=["a", "b"], index=["x", "y"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Multiple columns with MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X")])
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with single value
pdf = pd.DataFrame([[1]], columns=["a"], index=["x"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with single value with MultiIndex column
columns = pd.MultiIndex.from_tuples([("A", "Z")])
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with multiple values
pdf = pd.DataFrame([1, 2, 3, 4], columns=["a"])
psdf = ps.from_pandas(pdf)
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
# Single column with multiple values with MultiIndex column
pdf.columns = columns
psdf.columns = columns
for axis in axises:
self.assert_eq(pdf.squeeze(axis), psdf.squeeze(axis))
def test_rfloordiv(self):
pdf = pd.DataFrame(
{"angles": [0, 3, 4], "degrees": [360, 180, 360]},
index=["circle", "triangle", "rectangle"],
columns=["angles", "degrees"],
)
psdf = ps.from_pandas(pdf)
expected_result = pdf.rfloordiv(10)
self.assert_eq(psdf.rfloordiv(10), expected_result)
def test_truncate(self):
pdf1 = pd.DataFrame(
{
"A": ["a", "b", "c", "d", "e", "f", "g"],
"B": ["h", "i", "j", "k", "l", "m", "n"],
"C": ["o", "p", "q", "r", "s", "t", "u"],
},
index=[-500, -20, -1, 0, 400, 550, 1000],
)
psdf1 = ps.from_pandas(pdf1)
pdf2 = pd.DataFrame(
{
"A": ["a", "b", "c", "d", "e", "f", "g"],
"B": ["h", "i", "j", "k", "l", "m", "n"],
"C": ["o", "p", "q", "r", "s", "t", "u"],
},
index=[1000, 550, 400, 0, -1, -20, -500],
)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(psdf1.truncate(), pdf1.truncate())
self.assert_eq(psdf1.truncate(before=-20), pdf1.truncate(before=-20))
self.assert_eq(psdf1.truncate(after=400), pdf1.truncate(after=400))
self.assert_eq(psdf1.truncate(copy=False), pdf1.truncate(copy=False))
self.assert_eq(psdf1.truncate(-20, 400, copy=False), pdf1.truncate(-20, 400, copy=False))
# The bug for these tests has been fixed in pandas 1.1.0.
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
self.assert_eq(psdf2.truncate(0, 550), pdf2.truncate(0, 550))
self.assert_eq(psdf2.truncate(0, 550, copy=False), pdf2.truncate(0, 550, copy=False))
else:
expected_psdf = ps.DataFrame(
{"A": ["b", "c", "d"], "B": ["i", "j", "k"], "C": ["p", "q", "r"]},
index=[550, 400, 0],
)
self.assert_eq(psdf2.truncate(0, 550), expected_psdf)
self.assert_eq(psdf2.truncate(0, 550, copy=False), expected_psdf)
# axis = 1
self.assert_eq(psdf1.truncate(axis=1), pdf1.truncate(axis=1))
self.assert_eq(psdf1.truncate(before="B", axis=1), pdf1.truncate(before="B", axis=1))
self.assert_eq(psdf1.truncate(after="A", axis=1), pdf1.truncate(after="A", axis=1))
self.assert_eq(psdf1.truncate(copy=False, axis=1), pdf1.truncate(copy=False, axis=1))
self.assert_eq(psdf2.truncate("B", "C", axis=1), pdf2.truncate("B", "C", axis=1))
self.assert_eq(
psdf1.truncate("B", "C", copy=False, axis=1),
pdf1.truncate("B", "C", copy=False, axis=1),
)
# MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X"), ("C", "Z")])
pdf1.columns = columns
psdf1.columns = columns
pdf2.columns = columns
psdf2.columns = columns
self.assert_eq(psdf1.truncate(), pdf1.truncate())
self.assert_eq(psdf1.truncate(before=-20), pdf1.truncate(before=-20))
self.assert_eq(psdf1.truncate(after=400), pdf1.truncate(after=400))
self.assert_eq(psdf1.truncate(copy=False), pdf1.truncate(copy=False))
self.assert_eq(psdf1.truncate(-20, 400, copy=False), pdf1.truncate(-20, 400, copy=False))
# The bug for these tests has been fixed in pandas 1.1.0.
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
self.assert_eq(psdf2.truncate(0, 550), pdf2.truncate(0, 550))
self.assert_eq(psdf2.truncate(0, 550, copy=False), pdf2.truncate(0, 550, copy=False))
else:
expected_psdf.columns = columns
self.assert_eq(psdf2.truncate(0, 550), expected_psdf)
self.assert_eq(psdf2.truncate(0, 550, copy=False), expected_psdf)
# axis = 1
self.assert_eq(psdf1.truncate(axis=1), pdf1.truncate(axis=1))
self.assert_eq(psdf1.truncate(before="B", axis=1), pdf1.truncate(before="B", axis=1))
self.assert_eq(psdf1.truncate(after="A", axis=1), pdf1.truncate(after="A", axis=1))
self.assert_eq(psdf1.truncate(copy=False, axis=1), pdf1.truncate(copy=False, axis=1))
self.assert_eq(psdf2.truncate("B", "C", axis=1), pdf2.truncate("B", "C", axis=1))
self.assert_eq(
psdf1.truncate("B", "C", copy=False, axis=1),
pdf1.truncate("B", "C", copy=False, axis=1),
)
# Exceptions
psdf = ps.DataFrame(
{
"A": ["a", "b", "c", "d", "e", "f", "g"],
"B": ["h", "i", "j", "k", "l", "m", "n"],
"C": ["o", "p", "q", "r", "s", "t", "u"],
},
index=[-500, 100, 400, 0, -1, 550, -20],
)
msg = "truncate requires a sorted index"
with self.assertRaisesRegex(ValueError, msg):
psdf.truncate()
psdf = ps.DataFrame(
{
"A": ["a", "b", "c", "d", "e", "f", "g"],
"B": ["h", "i", "j", "k", "l", "m", "n"],
"C": ["o", "p", "q", "r", "s", "t", "u"],
},
index=[-500, -20, -1, 0, 400, 550, 1000],
)
msg = "Truncate: -20 must be after 400"
with self.assertRaisesRegex(ValueError, msg):
psdf.truncate(400, -20)
msg = "Truncate: B must be after C"
with self.assertRaisesRegex(ValueError, msg):
psdf.truncate("C", "B", axis=1)
def test_explode(self):
pdf = pd.DataFrame(
{"A": [[-1.0, np.nan], [0.0, np.inf], [1.0, -np.inf]], "B": 1}, index=["a", "b", "c"]
)
pdf.index.name = "index"
pdf.columns.name = "columns"
psdf = ps.from_pandas(pdf)
expected_result1, result1 = pdf.explode("A"), psdf.explode("A")
expected_result2, result2 = pdf.explode("B"), psdf.explode("B")
expected_result3, result3 = pdf.explode("A", ignore_index=True), psdf.explode(
"A", ignore_index=True
)
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2)
self.assert_eq(result1.index.name, expected_result1.index.name)
self.assert_eq(result1.columns.name, expected_result1.columns.name)
self.assert_eq(result3, expected_result3, almost=True)
self.assert_eq(result3.index, expected_result3.index)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
# MultiIndex
midx = pd.MultiIndex.from_tuples(
[("x", "a"), ("x", "b"), ("y", "c")], names=["index1", "index2"]
)
pdf.index = midx
psdf = ps.from_pandas(pdf)
expected_result1, result1 = pdf.explode("A"), psdf.explode("A")
expected_result2, result2 = pdf.explode("B"), psdf.explode("B")
expected_result3, result3 = pdf.explode("A", ignore_index=True), psdf.explode(
"A", ignore_index=True
)
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2)
self.assert_eq(result1.index.names, expected_result1.index.names)
self.assert_eq(result1.columns.name, expected_result1.columns.name)
self.assert_eq(result3, expected_result3, almost=True)
self.assert_eq(result3.index, expected_result3.index)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
# MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "Z"), ("B", "X")], names=["column1", "column2"])
pdf.columns = columns
psdf.columns = columns
expected_result1, result1 = pdf.explode(("A", "Z")), psdf.explode(("A", "Z"))
expected_result2, result2 = pdf.explode(("B", "X")), psdf.explode(("B", "X"))
expected_result3, result3 = pdf.A.explode("Z"), psdf.A.explode("Z")
self.assert_eq(result1, expected_result1, almost=True)
self.assert_eq(result2, expected_result2)
self.assert_eq(result1.index.names, expected_result1.index.names)
self.assert_eq(result1.columns.names, expected_result1.columns.names)
self.assert_eq(result3, expected_result3, almost=True)
self.assertRaises(TypeError, lambda: psdf.explode(["A", "B"]))
self.assertRaises(ValueError, lambda: psdf.explode("A"))
def test_spark_schema(self):
psdf = ps.DataFrame(
{
"a": list("abc"),
"b": list(range(1, 4)),
"c": np.arange(3, 6).astype("i1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
},
columns=["a", "b", "c", "d", "e", "f"],
)
actual = psdf.spark.schema()
expected = (
StructType()
.add("a", "string", False)
.add("b", "long", False)
.add("c", "byte", False)
.add("d", "double", False)
.add("e", "boolean", False)
.add("f", "timestamp", False)
)
self.assertEqual(actual, expected)
actual = psdf.spark.schema("index")
expected = (
StructType()
.add("index", "long", False)
.add("a", "string", False)
.add("b", "long", False)
.add("c", "byte", False)
.add("d", "double", False)
.add("e", "boolean", False)
.add("f", "timestamp", False)
)
self.assertEqual(actual, expected)
def test_print_schema(self):
psdf = ps.DataFrame(
{"a": list("abc"), "b": list(range(1, 4)), "c": np.arange(3, 6).astype("i1")},
columns=["a", "b", "c"],
)
prev = sys.stdout
try:
out = StringIO()
sys.stdout = out
psdf.spark.print_schema()
actual = out.getvalue().strip()
self.assertTrue("a: string" in actual, actual)
self.assertTrue("b: long" in actual, actual)
self.assertTrue("c: byte" in actual, actual)
out = StringIO()
sys.stdout = out
psdf.spark.print_schema(index_col="index")
actual = out.getvalue().strip()
self.assertTrue("index: long" in actual, actual)
self.assertTrue("a: string" in actual, actual)
self.assertTrue("b: long" in actual, actual)
self.assertTrue("c: byte" in actual, actual)
finally:
sys.stdout = prev
def test_explain_hint(self):
psdf1 = ps.DataFrame(
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]},
columns=["lkey", "value"],
)
psdf2 = ps.DataFrame(
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
columns=["rkey", "value"],
)
merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
prev = sys.stdout
try:
out = StringIO()
sys.stdout = out
merged.spark.explain()
actual = out.getvalue().strip()
self.assertTrue("Broadcast" in actual, actual)
finally:
sys.stdout = prev
def test_mad(self):
pdf = pd.DataFrame(
{
"A": [1, 2, None, 4, np.nan],
"B": [-0.1, 0.2, -0.3, np.nan, 0.5],
"C": ["a", "b", "c", "d", "e"],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.mad(), pdf.mad())
self.assert_eq(psdf.mad(axis=1), pdf.mad(axis=1))
with self.assertRaises(ValueError):
psdf.mad(axis=2)
# MultiIndex columns
columns = pd.MultiIndex.from_tuples([("A", "X"), ("A", "Y"), ("A", "Z")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(psdf.mad(), pdf.mad())
self.assert_eq(psdf.mad(axis=1), pdf.mad(axis=1))
pdf = pd.DataFrame({"A": [True, True, False, False], "B": [True, False, False, True]})
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.mad(), pdf.mad())
self.assert_eq(psdf.mad(axis=1), pdf.mad(axis=1))
def test_mode(self):
pdf = pd.DataFrame(
{
"A": [1, 2, None, 4, 5, 4, 2],
"B": [-0.1, 0.2, -0.3, np.nan, 0.5, -0.1, -0.1],
"C": ["d", "b", "c", "c", "e", "a", "a"],
"D": [np.nan, np.nan, np.nan, np.nan, 0.1, -0.1, -0.1],
"E": [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.mode(), pdf.mode())
self.assert_eq(psdf.mode(numeric_only=True), pdf.mode(numeric_only=True))
self.assert_eq(psdf.mode(dropna=False), pdf.mode(dropna=False))
# dataframe with single column
for c in ["A", "B", "C", "D", "E"]:
self.assert_eq(psdf[[c]].mode(), pdf[[c]].mode())
with self.assertRaises(ValueError):
psdf.mode(axis=2)
def f(index, iterator):
return ["3", "3", "3", "3", "4"] if index == 3 else ["0", "1", "2", "3", "4"]
rdd = self.spark.sparkContext.parallelize(
[
1,
],
4,
).mapPartitionsWithIndex(f)
df = self.spark.createDataFrame(rdd, schema="string")
psdf = df.pandas_api()
self.assert_eq(psdf.mode(), psdf._to_pandas().mode())
def test_abs(self):
pdf = pd.DataFrame({"a": [-2, -1, 0, 1]})
psdf = ps.from_pandas(pdf)
self.assert_eq(abs(psdf), abs(pdf))
self.assert_eq(np.abs(psdf), np.abs(pdf))
def test_corrwith(self):
df1 = ps.DataFrame(
{"A": [1, np.nan, 7, 8], "B": [False, True, True, False], "C": [10, 4, 9, 3]}
)
df2 = df1[["A", "C"]]
df3 = df1[["B", "C"]]
self._test_corrwith(df1, df2)
self._test_corrwith(df1, df3)
self._test_corrwith((df1 + 1), df2.A)
self._test_corrwith((df1 + 1), df3.B)
self._test_corrwith((df1 + 1), (df2.C + 2))
self._test_corrwith((df1 + 1), (df3.B + 2))
with self.assertRaisesRegex(TypeError, "unsupported type"):
df1.corrwith(123)
with self.assertRaisesRegex(NotImplementedError, "only works for axis=0"):
df1.corrwith(df1.A, axis=1)
with self.assertRaisesRegex(ValueError, "Invalid method"):
df1.corrwith(df1.A, method="cov")
df_bool = ps.DataFrame({"A": [True, True, False, False], "B": [True, False, False, True]})
self._test_corrwith(df_bool, df_bool.A)
self._test_corrwith(df_bool, df_bool.B)
def _test_corrwith(self, psdf, psobj):
pdf = psdf._to_pandas()
pobj = psobj._to_pandas()
# There was a regression in pandas 1.5.0
# when other is Series and method is "pearson" or "spearman", and fixed in pandas 1.5.1
# Therefore, we only test the pandas 1.5.0 in different way.
# See https://github.com/pandas-dev/pandas/issues/48826 for the reported issue,
# and https://github.com/pandas-dev/pandas/pull/46174 for the initial PR that causes.
if LooseVersion(pd.__version__) == LooseVersion("1.5.0") and isinstance(pobj, pd.Series):
methods = ["kendall"]
else:
methods = ["pearson", "spearman", "kendall"]
for method in methods:
for drop in [True, False]:
p_corr = pdf.corrwith(pobj, drop=drop, method=method)
ps_corr = psdf.corrwith(psobj, drop=drop, method=method)
self.assert_eq(p_corr.sort_index(), ps_corr.sort_index(), almost=True)
def test_iteritems(self):
pdf = pd.DataFrame(
{"species": ["bear", "bear", "marsupial"], "population": [1864, 22000, 80000]},
index=["panda", "polar", "koala"],
columns=["species", "population"],
)
psdf = ps.from_pandas(pdf)
for (p_name, p_items), (k_name, k_items) in zip(pdf.iteritems(), psdf.iteritems()):
self.assert_eq(p_name, k_name)
self.assert_eq(p_items, k_items)
def test_tail(self):
pdf = pd.DataFrame({"x": range(1000)})
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.tail(), psdf.tail())
self.assert_eq(pdf.tail(10), psdf.tail(10))
self.assert_eq(pdf.tail(-990), psdf.tail(-990))
self.assert_eq(pdf.tail(0), psdf.tail(0))
self.assert_eq(pdf.tail(-1001), psdf.tail(-1001))
self.assert_eq(pdf.tail(1001), psdf.tail(1001))
self.assert_eq((pdf + 1).tail(), (psdf + 1).tail())
self.assert_eq((pdf + 1).tail(10), (psdf + 1).tail(10))
self.assert_eq((pdf + 1).tail(-990), (psdf + 1).tail(-990))
self.assert_eq((pdf + 1).tail(0), (psdf + 1).tail(0))
self.assert_eq((pdf + 1).tail(-1001), (psdf + 1).tail(-1001))
self.assert_eq((pdf + 1).tail(1001), (psdf + 1).tail(1001))
with self.assertRaisesRegex(TypeError, "bad operand type for unary -: 'str'"):
psdf.tail("10")
def test_last_valid_index(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, None], "b": [1.0, 2.0, 3.0, None], "c": [100, 200, 400, None]},
index=["Q", "W", "E", "R"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.last_valid_index(), psdf.last_valid_index())
self.assert_eq(pdf[[]].last_valid_index(), psdf[[]].last_valid_index())
# MultiIndex columns
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.last_valid_index(), psdf.last_valid_index())
# Empty DataFrame
pdf = pd.Series([]).to_frame()
psdf = ps.Series([]).to_frame()
self.assert_eq(pdf.last_valid_index(), psdf.last_valid_index())
def test_last(self):
index = pd.date_range("2018-04-09", periods=4, freq="2D")
pdf = pd.DataFrame([1, 2, 3, 4], index=index)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.last("1D"), psdf.last("1D"))
self.assert_eq(pdf.last(DateOffset(days=1)), psdf.last(DateOffset(days=1)))
with self.assertRaisesRegex(TypeError, "'last' only supports a DatetimeIndex"):
ps.DataFrame([1, 2, 3, 4]).last("1D")
def test_first(self):
index = pd.date_range("2018-04-09", periods=4, freq="2D")
pdf = pd.DataFrame([1, 2, 3, 4], index=index)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.first("1D"), psdf.first("1D"))
self.assert_eq(pdf.first(DateOffset(days=1)), psdf.first(DateOffset(days=1)))
with self.assertRaisesRegex(TypeError, "'first' only supports a DatetimeIndex"):
ps.DataFrame([1, 2, 3, 4]).first("1D")
def test_first_valid_index(self):
pdf = pd.DataFrame(
{"a": [None, 2, 3, 2], "b": [None, 2.0, 3.0, 1.0], "c": [None, 200, 400, 200]},
index=["Q", "W", "E", "R"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.first_valid_index(), psdf.first_valid_index())
self.assert_eq(pdf[[]].first_valid_index(), psdf[[]].first_valid_index())
# MultiIndex columns
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.first_valid_index(), psdf.first_valid_index())
# Empty DataFrame
pdf = pd.Series([]).to_frame()
psdf = ps.Series([]).to_frame()
self.assert_eq(pdf.first_valid_index(), psdf.first_valid_index())
pdf = pd.DataFrame(
{"a": [None, 2, 3, 2], "b": [None, 2.0, 3.0, 1.0], "c": [None, 200, 400, 200]},
index=[
datetime(2021, 1, 1),
datetime(2021, 2, 1),
datetime(2021, 3, 1),
datetime(2021, 4, 1),
],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.first_valid_index(), psdf.first_valid_index())
def test_product(self):
pdf = pd.DataFrame(
{"A": [1, 2, 3, 4, 5], "B": [10, 20, 30, 40, 50], "C": ["a", "b", "c", "d", "e"]}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index())
# Named columns
pdf.columns.name = "Koalas"
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index())
# MultiIndex columns
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index())
# Named MultiIndex columns
pdf.columns.names = ["Hello", "Koalas"]
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index())
# No numeric columns
pdf = pd.DataFrame({"key": ["a", "b", "c"], "val": ["x", "y", "z"]})
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index())
# No numeric named columns
pdf.columns.name = "Koalas"
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), almost=True)
# No numeric MultiIndex columns
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y")])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), almost=True)
# No numeric named MultiIndex columns
pdf.columns.names = ["Hello", "Koalas"]
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), almost=True)
# All NaN columns
pdf = pd.DataFrame(
{
"A": [np.nan, np.nan, np.nan, np.nan, np.nan],
"B": [10, 20, 30, 40, 50],
"C": ["a", "b", "c", "d", "e"],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), check_exact=False)
# All NaN named columns
pdf.columns.name = "Koalas"
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), check_exact=False)
# All NaN MultiIndex columns
pdf.columns = pd.MultiIndex.from_tuples([("a", "x"), ("b", "y"), ("c", "z")])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), check_exact=False)
# All NaN named MultiIndex columns
pdf.columns.names = ["Hello", "Koalas"]
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.prod(), psdf.prod().sort_index(), check_exact=False)
def test_from_dict(self):
data = {"row_1": [3, 2, 1, 0], "row_2": [10, 20, 30, 40]}
pdf = pd.DataFrame.from_dict(data)
psdf = ps.DataFrame.from_dict(data)
self.assert_eq(pdf, psdf)
pdf = pd.DataFrame.from_dict(data, dtype="int8")
psdf = ps.DataFrame.from_dict(data, dtype="int8")
self.assert_eq(pdf, psdf)
pdf = pd.DataFrame.from_dict(data, orient="index", columns=["A", "B", "C", "D"])
psdf = ps.DataFrame.from_dict(data, orient="index", columns=["A", "B", "C", "D"])
self.assert_eq(pdf, psdf)
def test_pad(self):
pdf = pd.DataFrame(
{
"A": [None, 3, None, None],
"B": [2, 4, None, 3],
"C": [None, None, None, 1],
"D": [0, 1, 5, 4],
},
columns=["A", "B", "C", "D"],
)
psdf = ps.from_pandas(pdf)
if LooseVersion(pd.__version__) >= LooseVersion("1.1"):
self.assert_eq(pdf.pad(), psdf.pad())
# Test `inplace=True`
pdf.pad(inplace=True)
psdf.pad(inplace=True)
self.assert_eq(pdf, psdf)
else:
expected = ps.DataFrame(
{
"A": [None, 3, 3, 3],
"B": [2.0, 4.0, 4.0, 3.0],
"C": [None, None, None, 1],
"D": [0, 1, 5, 4],
},
columns=["A", "B", "C", "D"],
)
self.assert_eq(expected, psdf.pad())
# Test `inplace=True`
psdf.pad(inplace=True)
self.assert_eq(expected, psdf)
def test_backfill(self):
pdf = pd.DataFrame(
{
"A": [None, 3, None, None],
"B": [2, 4, None, 3],
"C": [None, None, None, 1],
"D": [0, 1, 5, 4],
},
columns=["A", "B", "C", "D"],
)
psdf = ps.from_pandas(pdf)
if LooseVersion(pd.__version__) >= LooseVersion("1.1"):
self.assert_eq(pdf.backfill(), psdf.backfill())
# Test `inplace=True`
pdf.backfill(inplace=True)
psdf.backfill(inplace=True)
self.assert_eq(pdf, psdf)
else:
expected = ps.DataFrame(
{
"A": [3.0, 3.0, None, None],
"B": [2.0, 4.0, 3.0, 3.0],
"C": [1.0, 1.0, 1.0, 1.0],
"D": [0, 1, 5, 4],
},
columns=["A", "B", "C", "D"],
)
self.assert_eq(expected, psdf.backfill())
# Test `inplace=True`
psdf.backfill(inplace=True)
self.assert_eq(expected, psdf)
def test_align(self):
pdf1 = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}, index=[10, 20, 30])
psdf1 = ps.from_pandas(pdf1)
for join in ["outer", "inner", "left", "right"]:
for axis in [None, 0, 1]:
psdf_l, psdf_r = psdf1.align(psdf1[["b"]], join=join, axis=axis)
pdf_l, pdf_r = pdf1.align(pdf1[["b"]], join=join, axis=axis)
self.assert_eq(psdf_l, pdf_l)
self.assert_eq(psdf_r, pdf_r)
psdf_l, psdf_r = psdf1[["a"]].align(psdf1[["b", "a"]], join=join, axis=axis)
pdf_l, pdf_r = pdf1[["a"]].align(pdf1[["b", "a"]], join=join, axis=axis)
self.assert_eq(psdf_l, pdf_l)
self.assert_eq(psdf_r, pdf_r)
psdf_l, psdf_r = psdf1[["b", "a"]].align(psdf1[["a"]], join=join, axis=axis)
pdf_l, pdf_r = pdf1[["b", "a"]].align(pdf1[["a"]], join=join, axis=axis)
self.assert_eq(psdf_l, pdf_l)
self.assert_eq(psdf_r, pdf_r)
psdf_l, psdf_r = psdf1.align(psdf1["b"], axis=0)
pdf_l, pdf_r = pdf1.align(pdf1["b"], axis=0)
self.assert_eq(psdf_l, pdf_l)
self.assert_eq(psdf_r, pdf_r)
psdf_l, psser_b = psdf1[["a"]].align(psdf1["b"], axis=0)
pdf_l, pser_b = pdf1[["a"]].align(pdf1["b"], axis=0)
self.assert_eq(psdf_l, pdf_l)
self.assert_eq(psser_b, pser_b)
self.assertRaises(ValueError, lambda: psdf1.align(psdf1, join="unknown"))
self.assertRaises(ValueError, lambda: psdf1.align(psdf1["b"]))
self.assertRaises(TypeError, lambda: psdf1.align(["b"]))
self.assertRaises(NotImplementedError, lambda: psdf1.align(psdf1["b"], axis=1))
pdf2 = pd.DataFrame({"a": [4, 5, 6], "d": ["d", "e", "f"]}, index=[10, 11, 12])
psdf2 = ps.from_pandas(pdf2)
for join in ["outer", "inner", "left", "right"]:
psdf_l, psdf_r = psdf1.align(psdf2, join=join, axis=1)
pdf_l, pdf_r = pdf1.align(pdf2, join=join, axis=1)
self.assert_eq(psdf_l.sort_index(), pdf_l.sort_index())
self.assert_eq(psdf_r.sort_index(), pdf_r.sort_index())
def test_between_time(self):
idx = pd.date_range("2018-04-09", periods=4, freq="1D20min")
pdf = pd.DataFrame({"A": [1, 2, 3, 4]}, index=idx)
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.between_time("0:15", "0:45").sort_index(),
psdf.between_time("0:15", "0:45").sort_index(),
)
pdf.index.name = "ts"
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.between_time("0:15", "0:45").sort_index(),
psdf.between_time("0:15", "0:45").sort_index(),
)
# Column label is 'index'
pdf.columns = pd.Index(["index"])
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.between_time("0:15", "0:45").sort_index(),
psdf.between_time("0:15", "0:45").sort_index(),
)
# Both index name and column label are 'index'
pdf.index.name = "index"
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.between_time("0:15", "0:45").sort_index(),
psdf.between_time("0:15", "0:45").sort_index(),
)
# Index name is 'index', column label is ('X', 'A')
pdf.columns = pd.MultiIndex.from_arrays([["X"], ["A"]])
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.between_time("0:15", "0:45").sort_index(),
psdf.between_time("0:15", "0:45").sort_index(),
)
with self.assertRaisesRegex(
NotImplementedError, "between_time currently only works for axis=0"
):
psdf.between_time("0:15", "0:45", axis=1)
psdf = ps.DataFrame({"A": [1, 2, 3, 4]})
with self.assertRaisesRegex(TypeError, "Index must be DatetimeIndex"):
psdf.between_time("0:15", "0:45")
def test_at_time(self):
idx = pd.date_range("2018-04-09", periods=4, freq="1D20min")
pdf = pd.DataFrame({"A": [1, 2, 3, 4]}, index=idx)
psdf = ps.from_pandas(pdf)
psdf.at_time("0:20")
self.assert_eq(
pdf.at_time("0:20").sort_index(),
psdf.at_time("0:20").sort_index(),
)
# Index name is 'ts'
pdf.index.name = "ts"
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.at_time("0:20").sort_index(),
psdf.at_time("0:20").sort_index(),
)
# Index name is 'ts', column label is 'index'
pdf.columns = pd.Index(["index"])
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.at_time("0:40").sort_index(),
psdf.at_time("0:40").sort_index(),
)
# Both index name and column label are 'index'
pdf.index.name = "index"
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.at_time("0:40").sort_index(),
psdf.at_time("0:40").sort_index(),
)
# Index name is 'index', column label is ('X', 'A')
pdf.columns = pd.MultiIndex.from_arrays([["X"], ["A"]])
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.at_time("0:40").sort_index(),
psdf.at_time("0:40").sort_index(),
)
with self.assertRaisesRegex(NotImplementedError, "'asof' argument is not supported"):
psdf.at_time("0:15", asof=True)
with self.assertRaisesRegex(NotImplementedError, "at_time currently only works for axis=0"):
psdf.at_time("0:15", axis=1)
psdf = ps.DataFrame({"A": [1, 2, 3, 4]})
with self.assertRaisesRegex(TypeError, "Index must be DatetimeIndex"):
psdf.at_time("0:15")
def test_astype(self):
psdf = self.psdf
msg = "Only a column name can be used for the key in a dtype mappings argument."
with self.assertRaisesRegex(KeyError, msg):
psdf.astype({"c": float})
def test_describe(self):
pdf, psdf = self.df_pair
# numeric columns
self.assert_eq(psdf.describe(), pdf.describe())
psdf.a += psdf.a
pdf.a += pdf.a
self.assert_eq(psdf.describe(), pdf.describe())
# string columns
psdf = ps.DataFrame({"A": ["a", "b", "b", "c"], "B": ["d", "e", "f", "f"]})
pdf = psdf._to_pandas()
self.assert_eq(psdf.describe(), pdf.describe().astype(str))
psdf.A += psdf.A
pdf.A += pdf.A
self.assert_eq(psdf.describe(), pdf.describe().astype(str))
# timestamp columns
psdf = ps.DataFrame(
{
"A": [
pd.Timestamp("2020-10-20"),
pd.Timestamp("2021-06-02"),
pd.Timestamp("2021-06-02"),
pd.Timestamp("2022-07-11"),
],
"B": [
pd.Timestamp("2021-11-20"),
pd.Timestamp("2023-06-02"),
pd.Timestamp("2026-07-11"),
pd.Timestamp("2026-07-11"),
],
}
)
pdf = psdf._to_pandas()
# NOTE: Set `datetime_is_numeric=True` for pandas:
# FutureWarning: Treating datetime data as categorical rather than numeric in
# `.describe` is deprecated and will be removed in a future version of pandas.
# Specify `datetime_is_numeric=True` to silence this
# warning and adopt the future behavior now.
# NOTE: Compare the result except percentiles, since we use approximate percentile
# so the result is different from pandas.
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pdf.describe(datetime_is_numeric=True)
.astype(str)
.loc[["count", "mean", "min", "max"]],
)
else:
self.assert_eq(
psdf.describe(),
ps.DataFrame(
{
"A": [
"4",
"2021-07-16 18:00:00",
"2020-10-20 00:00:00",
"2020-10-20 00:00:00",
"2021-06-02 00:00:00",
"2021-06-02 00:00:00",
"2022-07-11 00:00:00",
],
"B": [
"4",
"2024-08-02 18:00:00",
"2021-11-20 00:00:00",
"2021-11-20 00:00:00",
"2023-06-02 00:00:00",
"2026-07-11 00:00:00",
"2026-07-11 00:00:00",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max"],
),
)
# String & timestamp columns
psdf = ps.DataFrame(
{
"A": ["a", "b", "b", "c"],
"B": [
pd.Timestamp("2021-11-20"),
pd.Timestamp("2023-06-02"),
pd.Timestamp("2026-07-11"),
pd.Timestamp("2026-07-11"),
],
}
)
pdf = psdf._to_pandas()
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pdf.describe(datetime_is_numeric=True)
.astype(str)
.loc[["count", "mean", "min", "max"]],
)
psdf.A += psdf.A
pdf.A += pdf.A
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pdf.describe(datetime_is_numeric=True)
.astype(str)
.loc[["count", "mean", "min", "max"]],
)
else:
expected_result = ps.DataFrame(
{
"B": [
"4",
"2024-08-02 18:00:00",
"2021-11-20 00:00:00",
"2021-11-20 00:00:00",
"2023-06-02 00:00:00",
"2026-07-11 00:00:00",
"2026-07-11 00:00:00",
]
},
index=["count", "mean", "min", "25%", "50%", "75%", "max"],
)
self.assert_eq(
psdf.describe(),
expected_result,
)
psdf.A += psdf.A
self.assert_eq(
psdf.describe(),
expected_result,
)
# Numeric & timestamp columns
psdf = ps.DataFrame(
{
"A": [1, 2, 2, 3],
"B": [
pd.Timestamp("2021-11-20"),
pd.Timestamp("2023-06-02"),
pd.Timestamp("2026-07-11"),
pd.Timestamp("2026-07-11"),
],
}
)
pdf = psdf._to_pandas()
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
pandas_result = pdf.describe(datetime_is_numeric=True)
pandas_result.B = pandas_result.B.astype(str)
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pandas_result.loc[["count", "mean", "min", "max"]],
)
psdf.A += psdf.A
pdf.A += pdf.A
pandas_result = pdf.describe(datetime_is_numeric=True)
pandas_result.B = pandas_result.B.astype(str)
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pandas_result.loc[["count", "mean", "min", "max"]],
)
else:
self.assert_eq(
psdf.describe(),
ps.DataFrame(
{
"A": [4, 2, 1, 1, 2, 2, 3, 0.816497],
"B": [
"4",
"2024-08-02 18:00:00",
"2021-11-20 00:00:00",
"2021-11-20 00:00:00",
"2023-06-02 00:00:00",
"2026-07-11 00:00:00",
"2026-07-11 00:00:00",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max", "std"],
),
)
psdf.A += psdf.A
self.assert_eq(
psdf.describe(),
ps.DataFrame(
{
"A": [4, 4, 2, 2, 4, 4, 6, 1.632993],
"B": [
"4",
"2024-08-02 18:00:00",
"2021-11-20 00:00:00",
"2021-11-20 00:00:00",
"2023-06-02 00:00:00",
"2026-07-11 00:00:00",
"2026-07-11 00:00:00",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max", "std"],
),
)
# Include None column
psdf = ps.DataFrame(
{
"a": [1, 2, 3],
"b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
"c": [None, None, None],
}
)
pdf = psdf._to_pandas()
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
pandas_result = pdf.describe(datetime_is_numeric=True)
pandas_result.b = pandas_result.b.astype(str)
self.assert_eq(
psdf.describe().loc[["count", "mean", "min", "max"]],
pandas_result.loc[["count", "mean", "min", "max"]],
)
else:
self.assert_eq(
psdf.describe(),
ps.DataFrame(
{
"a": [3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 3.0, 1.0],
"b": [
"3",
"1970-01-01 00:00:00.000001",
"1970-01-01 00:00:00.000001",
"1970-01-01 00:00:00.000001",
"1970-01-01 00:00:00.000001",
"1970-01-01 00:00:00.000001",
"1970-01-01 00:00:00.000001",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max", "std"],
),
)
msg = r"Percentiles should all be in the interval \[0, 1\]"
with self.assertRaisesRegex(ValueError, msg):
psdf.describe(percentiles=[1.1])
psdf = ps.DataFrame()
msg = "Cannot describe a DataFrame without columns"
with self.assertRaisesRegex(ValueError, msg):
psdf.describe()
def test_describe_empty(self):
# Empty DataFrame
psdf = ps.DataFrame(columns=["A", "B"])
pdf = psdf._to_pandas()
self.assert_eq(
psdf.describe(),
pdf.describe().astype(float),
)
# Explicit empty DataFrame numeric only
psdf = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
pdf = psdf._to_pandas()
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf[pdf.a != pdf.a].describe(),
)
# Explicit empty DataFrame string only
psdf = ps.DataFrame({"a": ["a", "b", "c"], "b": ["q", "w", "e"]})
pdf = psdf._to_pandas()
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf[pdf.a != pdf.a].describe().astype(float),
)
# Explicit empty DataFrame timestamp only
psdf = ps.DataFrame(
{
"a": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
"b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)],
}
)
pdf = psdf._to_pandas()
# For timestamp type, we should convert NaT to None in pandas result
# since pandas API on Spark doesn't support the NaT for object type.
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
pdf_result = pdf[pdf.a != pdf.a].describe(datetime_is_numeric=True)
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf_result.where(pdf_result.notnull(), None).astype(str),
)
else:
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
ps.DataFrame(
{
"a": [
"0",
"None",
"None",
"None",
"None",
"None",
"None",
],
"b": [
"0",
"None",
"None",
"None",
"None",
"None",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max"],
),
)
# Explicit empty DataFrame numeric & timestamp
psdf = ps.DataFrame(
{"a": [1, 2, 3], "b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)]}
)
pdf = psdf._to_pandas()
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
pdf_result = pdf[pdf.a != pdf.a].describe(datetime_is_numeric=True)
pdf_result.b = pdf_result.b.where(pdf_result.b.notnull(), None).astype(str)
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf_result,
)
else:
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
ps.DataFrame(
{
"a": [
0,
None,
None,
None,
None,
None,
None,
None,
],
"b": [
"0",
"None",
"None",
"None",
"None",
"None",
"None",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max", "std"],
),
)
# Explicit empty DataFrame numeric & string
psdf = ps.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
pdf = psdf._to_pandas()
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf[pdf.a != pdf.a].describe(),
)
# Explicit empty DataFrame string & timestamp
psdf = ps.DataFrame(
{"a": ["a", "b", "c"], "b": [pd.Timestamp(1), pd.Timestamp(1), pd.Timestamp(1)]}
)
pdf = psdf._to_pandas()
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
pdf_result = pdf[pdf.a != pdf.a].describe(datetime_is_numeric=True)
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
pdf_result.where(pdf_result.notnull(), None).astype(str),
)
else:
self.assert_eq(
psdf[psdf.a != psdf.a].describe(),
ps.DataFrame(
{
"b": [
"0",
"None",
"None",
"None",
"None",
"None",
"None",
],
},
index=["count", "mean", "min", "25%", "50%", "75%", "max"],
),
)
def test_getitem_with_none_key(self):
psdf = self.psdf
with self.assertRaisesRegex(KeyError, "none key"):
psdf[None]
def test_iter_dataframe(self):
pdf, psdf = self.df_pair
for value_psdf, value_pdf in zip(psdf, pdf):
self.assert_eq(value_psdf, value_pdf)
def test_combine_first(self):
pdf = pd.DataFrame(
{("X", "A"): [None, 0], ("X", "B"): [4, None], ("Y", "C"): [3, 3], ("Y", "B"): [1, 1]}
)
pdf1, pdf2 = pdf["X"], pdf["Y"]
psdf = ps.from_pandas(pdf)
psdf1, psdf2 = psdf["X"], psdf["Y"]
if LooseVersion(pd.__version__) >= LooseVersion("1.2.0"):
self.assert_eq(pdf1.combine_first(pdf2), psdf1.combine_first(psdf2))
else:
# pandas < 1.2.0 returns unexpected dtypes,
# please refer to https://github.com/pandas-dev/pandas/issues/28481 for details
expected_pdf = pd.DataFrame({"A": [None, 0], "B": [4.0, 1.0], "C": [3, 3]})
self.assert_eq(expected_pdf, psdf1.combine_first(psdf2))
def test_multi_index_dtypes(self):
# SPARK-36930: Support ps.MultiIndex.dtypes
arrays = [[1, 1, 2, 2], ["red", "blue", "red", "blue"]]
pmidx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
psmidx = ps.from_pandas(pmidx)
if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
else:
expected = pd.Series([np.dtype("int64"), np.dtype("O")], index=["number", "color"])
self.assert_eq(psmidx.dtypes, expected)
# multiple labels
pmidx = pd.MultiIndex.from_arrays(arrays, names=[("zero", "first"), ("one", "second")])
psmidx = ps.from_pandas(pmidx)
if LooseVersion(pd.__version__) >= LooseVersion("1.3"):
if LooseVersion(pd.__version__) not in (LooseVersion("1.4.1"), LooseVersion("1.4.2")):
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
else:
expected = pd.Series(
[np.dtype("int64"), np.dtype("O")],
index=pd.Index([("zero", "first"), ("one", "second")]),
)
self.assert_eq(psmidx.dtypes, expected)
def test_multi_index_dtypes_not_unique_name(self):
# Regression test for https://github.com/pandas-dev/pandas/issues/45174
pmidx = pd.MultiIndex.from_arrays([[1], [2]], names=[1, 1])
psmidx = ps.from_pandas(pmidx)
if LooseVersion(pd.__version__) < LooseVersion("1.4"):
expected = pd.Series(
[np.dtype("int64"), np.dtype("int64")],
index=[1, 1],
)
self.assert_eq(psmidx.dtypes, expected)
else:
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
def test_cov(self):
# SPARK-36396: Implement DataFrame.cov
# int
pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a", "b"])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
# ddof
with self.assertRaisesRegex(TypeError, "ddof must be integer"):
psdf.cov(ddof="ddof")
for ddof in [-1, 0, 2]:
self.assert_eq(pdf.cov(ddof=ddof), psdf.cov(ddof=ddof), almost=True)
self.assert_eq(
pdf.cov(min_periods=4, ddof=ddof), psdf.cov(min_periods=4, ddof=ddof), almost=True
)
self.assert_eq(pdf.cov(min_periods=5, ddof=ddof), psdf.cov(min_periods=5, ddof=ddof))
# bool
pdf = pd.DataFrame(
{
"a": [1, np.nan, 3, 4],
"b": [True, False, False, True],
"c": [True, True, False, True],
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
# extension dtype
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"]
boolean_dtypes = ["boolean", "bool"]
else:
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "float"]
boolean_dtypes = ["boolean", "bool"]
sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in numeric_dtypes]
sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype in boolean_dtypes]
sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(3), None]))
pdf = pd.concat(sers, axis=1)
pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] + ["decimal"]
psdf = ps.from_pandas(pdf)
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4))
else:
test_types = [
"Int8",
"Int16",
"Int32",
"Int64",
"float",
"boolean",
"bool",
]
expected = pd.DataFrame(
data=[
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
],
index=test_types,
columns=test_types,
)
self.assert_eq(expected, psdf.cov(), almost=True)
# string column
pdf = pd.DataFrame(
[(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)],
columns=["a", "b", "c", "d"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
# nan
np.random.seed(42)
pdf = pd.DataFrame(np.random.randn(20, 3), columns=["a", "b", "c"])
pdf.loc[pdf.index[:5], "a"] = np.nan
pdf.loc[pdf.index[5:10], "b"] = np.nan
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(min_periods=11), psdf.cov(min_periods=11), almost=True)
self.assert_eq(pdf.cov(min_periods=10), psdf.cov(min_periods=10), almost=True)
# return empty DataFrame
pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")], columns=["a", "b"])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov())
@unittest.skipIf(
LooseVersion(pd.__version__) < LooseVersion("1.3.0"),
"pandas support `Styler.to_latex` since 1.3.0",
)
def test_style(self):
# Currently, the `style` function returns a pandas object `Styler` as it is,
# processing only the number of rows declared in `compute.max_rows`.
# So it's a bit vague to test, but we are doing minimal tests instead of not testing at all.
pdf = pd.DataFrame(np.random.randn(10, 4), columns=["A", "B", "C", "D"])
psdf = ps.from_pandas(pdf)
def style_negative(v, props=""):
return props if v < 0 else None
def check_style():
# If the value is negative, the text color will be displayed as red.
pdf_style = pdf.style.applymap(style_negative, props="color:red;")
psdf_style = psdf.style.applymap(style_negative, props="color:red;")
# Test whether the same shape as pandas table is created including the color.
self.assert_eq(pdf_style.to_latex(), psdf_style.to_latex())
check_style()
with ps.option_context("compute.max_rows", None):
check_style()
if __name__ == "__main__":
from pyspark.pandas.tests.test_dataframe_slow import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)