blob: f62aa2f214fe9c1ddb186b58523e27cc7226a76b [file]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from itertools import product
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.errors import PySparkValueError
from pyspark.testing.pandasutils import PandasOnSparkTestCase
class SeriesComputeMixin:
def test_duplicated(self):
for pser in [
pd.Series(["beetle", None, "beetle", None, "lama", "beetle"], name="objects"),
pd.Series([1, np.nan, 1, np.nan], name="numbers"),
pd.Series(
[
pd.Timestamp("2022-01-01"),
pd.Timestamp("2022-02-02"),
pd.Timestamp("2022-01-01"),
pd.Timestamp("2022-02-02"),
],
name="times",
),
]:
psser = ps.from_pandas(pser)
self.assert_eq(psser.duplicated().sort_index(), pser.duplicated())
self.assert_eq(
psser.duplicated(keep="first").sort_index(), pser.duplicated(keep="first")
)
self.assert_eq(psser.duplicated(keep="last").sort_index(), pser.duplicated(keep="last"))
self.assert_eq(psser.duplicated(keep=False).sort_index(), pser.duplicated(keep=False))
pser = pd.Series([1, 2, 1, 2, 3], name="numbers")
psser = ps.from_pandas(pser)
self.assert_eq((psser + 1).duplicated().sort_index(), (pser + 1).duplicated())
def test_drop_duplicates(self):
pdf = pd.DataFrame({"animal": ["lama", "cow", "lama", "beetle", "lama", "hippo"]})
psdf = ps.from_pandas(pdf)
pser = pdf.animal
psser = psdf.animal
self.assert_eq(psser.drop_duplicates().sort_index(), pser.drop_duplicates().sort_index())
self.assert_eq(
psser.drop_duplicates(keep="last").sort_index(),
pser.drop_duplicates(keep="last").sort_index(),
)
# inplace
psser.drop_duplicates(keep=False, inplace=True)
pser.drop_duplicates(keep=False, inplace=True)
self.assert_eq(psser.sort_index(), pser.sort_index())
self.assert_eq(psdf, pdf)
def test_clip(self):
pdf = pd.DataFrame({"x": [0, 2, 4]}, index=np.random.rand(3))
psdf = ps.from_pandas(pdf)
pser, psser = pdf.x, psdf.x
# Assert list-like values are not accepted for 'lower' and 'upper'
msg = "List-like value are not supported for 'lower' and 'upper' at the moment"
with self.assertRaises(TypeError, msg=msg):
psser.clip(lower=[1])
with self.assertRaises(TypeError, msg=msg):
psser.clip(upper=[1])
# Assert no lower or upper
self.assert_eq(psser.clip(), pser.clip())
# Assert lower only
self.assert_eq(psser.clip(1), pser.clip(1))
# Assert upper only
self.assert_eq(psser.clip(upper=3), pser.clip(upper=3))
# Assert lower and upper
self.assert_eq(psser.clip(1, 3), pser.clip(1, 3))
self.assert_eq((psser + 1).clip(1, 3), (pser + 1).clip(1, 3))
# Assert inplace is True
pser.clip(1, 3, inplace=True)
psser.clip(1, 3, inplace=True)
self.assert_eq(psser, pser)
self.assert_eq(psdf, pdf)
# Assert behavior on string values
str_psser = ps.Series(["a", "b", "c"])
self.assert_eq(str_psser.clip(1, 3), str_psser)
def test_compare(self):
pser = pd.Series([1, 2])
psser = ps.from_pandas(pser)
res_psdf = psser.compare(psser)
self.assertTrue(res_psdf.empty)
self.assert_eq(res_psdf.columns, pd.Index(["self", "other"]))
self.assert_eq(pser.compare(pser + 1).sort_index(), psser.compare(psser + 1).sort_index())
pser = pd.Series([1, 2], index=["x", "y"])
psser = ps.from_pandas(pser)
self.assert_eq(pser.compare(pser + 1).sort_index(), psser.compare(psser + 1).sort_index())
def test_concat(self):
pser1 = pd.Series([1, 2, 3], name="0")
pser2 = pd.Series([4, 5, 6], name="0")
pser3 = pd.Series([4, 5, 6], index=[3, 4, 5], name="0")
psser1 = ps.from_pandas(pser1)
psser2 = ps.from_pandas(pser2)
psser3 = ps.from_pandas(pser3)
self.assert_eq(ps.concat([psser1, psser2]), pd.concat([pser1, pser2]))
self.assert_eq(ps.concat([psser1, psser3]), pd.concat([pser1, pser3]))
self.assert_eq(
ps.concat([psser1, psser2], ignore_index=True),
pd.concat([pser1, pser2], ignore_index=True),
)
def test_shift(self):
pser = pd.Series([10, 20, 15, 30, 45], name="x")
psser = ps.Series(pser)
self.assert_eq(psser.shift(2), pser.shift(2))
self.assert_eq(psser.shift().shift(-1), pser.shift().shift(-1))
self.assert_eq(psser.shift().sum(), pser.shift().sum())
self.assert_eq(psser.shift(periods=2, fill_value=0), pser.shift(periods=2, fill_value=0))
with self.assertRaisesRegex(TypeError, "periods should be an int; however"):
psser.shift(periods=1.5)
self.assert_eq(psser.shift(periods=0), pser.shift(periods=0))
def test_diff(self):
pser = pd.Series([10, 20, 15, 30, 45], name="x")
psser = ps.Series(pser)
self.assert_eq(psser.diff(2), pser.diff(2))
self.assert_eq(psser.diff().diff(-1), pser.diff().diff(-1))
self.assert_eq(psser.diff().sum(), pser.diff().sum())
def test_aggregate(self):
pser = pd.Series([10, 20, 15, 30, 45], name="x")
psser = ps.Series(pser)
msg = "func must be a string or list of strings"
with self.assertRaisesRegex(TypeError, msg):
psser.aggregate({"x": ["min", "max"]})
msg = "If the given function is a list, it should only contains function names as strings."
with self.assertRaisesRegex(ValueError, msg):
psser.aggregate(["min", max])
def test_drop(self):
pdf = pd.DataFrame({"x": [10, 20, 15, 30, 45]})
psdf = ps.from_pandas(pdf)
pser, psser = pdf.x, psdf.x
self.assert_eq(psser.drop(1), pser.drop(1))
self.assert_eq(psser.drop([1, 4]), pser.drop([1, 4]))
self.assert_eq(psser.drop(columns=1), pser.drop(columns=1))
self.assert_eq(psser.drop(columns=[1, 4]), pser.drop(columns=[1, 4]))
msg = "Need to specify at least one of 'labels', 'index' or 'columns'"
with self.assertRaisesRegex(ValueError, msg):
psser.drop()
self.assertRaises(KeyError, lambda: psser.drop((0, 1)))
psser.drop([2, 3], inplace=True)
pser.drop([2, 3], inplace=True)
self.assert_eq(psser, pser)
self.assert_eq(psdf, pdf)
n_pser, n_psser = pser + 1, psser + 1
n_psser.drop([1, 4], inplace=True)
n_pser.drop([1, 4], inplace=True)
self.assert_eq(n_psser, n_pser)
self.assert_eq(psser, pser)
# For MultiIndex
midx = pd.MultiIndex(
[["lama", "cow", "falcon"], ["speed", "weight", "length"]],
[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]],
)
pdf = pd.DataFrame({"x": [45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3]}, index=midx)
psdf = ps.from_pandas(pdf)
psser, pser = psdf.x, pdf.x
self.assert_eq(psser.drop("lama"), pser.drop("lama"))
self.assert_eq(psser.drop(labels="weight", level=1), pser.drop(labels="weight", level=1))
self.assert_eq(psser.drop(("lama", "weight")), pser.drop(("lama", "weight")))
self.assert_eq(
psser.drop([("lama", "speed"), ("falcon", "weight")]),
pser.drop([("lama", "speed"), ("falcon", "weight")]),
)
self.assert_eq(psser.drop({"lama": "speed"}), pser.drop({"lama": "speed"}))
msg = "'level' should be less than the number of indexes"
with self.assertRaisesRegex(ValueError, msg):
psser.drop(labels="weight", level=2)
msg = (
"If the given index is a list, it "
"should only contains names as all tuples or all non tuples "
"that contain index names"
)
with self.assertRaisesRegex(ValueError, msg):
psser.drop(["lama", ["cow", "falcon"]])
msg = "Cannot specify both 'labels' and 'index'/'columns'"
with self.assertRaisesRegex(ValueError, msg):
psser.drop("lama", index="cow")
with self.assertRaisesRegex(ValueError, msg):
psser.drop("lama", columns="cow")
msg = r"'Key length \(2\) exceeds index depth \(3\)'"
with self.assertRaisesRegex(KeyError, msg):
psser.drop(("lama", "speed", "x"))
psser.drop({"lama": "speed"}, inplace=True)
pser.drop({"lama": "speed"}, inplace=True)
self.assert_eq(psser, pser)
self.assert_eq(psdf, pdf)
def test_drop_with_errors(self):
pser = pd.Series([10, 20, 30], index=["a", "b", "c"])
psser = ps.from_pandas(pser)
# errors='ignore' with missing index label
self.assert_eq(
psser.drop(["a", "x"], errors="ignore"),
pser.drop(["a", "x"], errors="ignore"),
)
# errors='ignore' with all-missing index labels
self.assert_eq(
psser.drop(["x", "y"], errors="ignore"),
pser.drop(["x", "y"], errors="ignore"),
)
# errors='ignore' with columns (no-op for Series)
self.assert_eq(
psser.drop(columns=["a"], errors="ignore"),
pser.drop(columns=["a"], errors="ignore"),
)
# Invalid errors value
self.assertRaises(
ValueError,
lambda: psser.drop("a", errors="invalid"),
)
def test_pop(self):
midx = pd.MultiIndex(
[["lama", "cow", "falcon"], ["speed", "weight", "length"]],
[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]],
)
pdf = pd.DataFrame({"x": [45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3]}, index=midx)
psdf = ps.from_pandas(pdf)
pser = pdf.x
psser = psdf.x
self.assert_eq(psser.pop(("lama", "speed")), pser.pop(("lama", "speed")))
self.assert_eq(psser, pser)
self.assert_eq(psdf, pdf)
msg = r"'Key length \(3\) exceeds index depth \(2\)'"
with self.assertRaisesRegex(KeyError, msg):
psser.pop(("lama", "speed", "x"))
msg = "'key' should be string or tuple that contains strings"
with self.assertRaisesRegex(TypeError, msg):
psser.pop(["lama", "speed"])
pser = pd.Series(["a", "b", "c", "a"], dtype="category")
psser = ps.from_pandas(pser)
self.assert_eq(psser.pop(0), pser.pop(0))
self.assert_eq(psser, pser)
self.assert_eq(psser.pop(3), pser.pop(3))
self.assert_eq(psser, pser)
def test_duplicates(self):
psers = {
"test on texts": pd.Series(
["lama", "cow", "lama", "beetle", "lama", "hippo"], name="animal"
),
"test on numbers": pd.Series([1, 1, 2, 4, 3]),
}
keeps = ["first", "last", False]
for (msg, pser), keep in product(psers.items(), keeps):
with self.subTest(msg, keep=keep):
psser = ps.Series(pser)
self.assert_eq(
pser.drop_duplicates(keep=keep).sort_values(),
psser.drop_duplicates(keep=keep).sort_values(),
)
def test_truncate(self):
pser1 = pd.Series([10, 20, 30, 40, 50, 60, 70], index=[1, 2, 3, 4, 5, 6, 7])
psser1 = ps.Series(pser1)
pser2 = pd.Series([10, 20, 30, 40, 50, 60, 70], index=[7, 6, 5, 4, 3, 2, 1])
psser2 = ps.Series(pser2)
self.assert_eq(psser1.truncate(), pser1.truncate())
self.assert_eq(psser1.truncate(before=2), pser1.truncate(before=2))
self.assert_eq(psser1.truncate(after=5), pser1.truncate(after=5))
self.assert_eq(psser1.truncate(copy=False), pser1.truncate(copy=False))
self.assert_eq(psser1.truncate(2, 5, copy=False), pser1.truncate(2, 5, copy=False))
self.assert_eq(psser2.truncate(4, 6), pser2.truncate(4, 6))
self.assert_eq(psser2.truncate(4, 6, copy=False), pser2.truncate(4, 6, copy=False))
psser = ps.Series([10, 20, 30, 40, 50, 60, 70], index=[1, 2, 3, 4, 3, 2, 1])
msg = "truncate requires a sorted index"
with self.assertRaisesRegex(ValueError, msg):
psser.truncate()
psser = ps.Series([10, 20, 30, 40, 50, 60, 70], index=[1, 2, 3, 4, 5, 6, 7])
msg = "Truncate: 2 must be after 5"
with self.assertRaisesRegex(ValueError, msg):
psser.truncate(5, 2)
def test_unstack(self):
pser = pd.Series(
[10, -2, 4, 7],
index=pd.MultiIndex.from_tuples(
[("one", "a", "z"), ("one", "b", "x"), ("two", "a", "c"), ("two", "b", "v")],
names=["A", "B", "C"],
),
)
psser = ps.from_pandas(pser)
levels = [-3, -2, -1, 0, 1, 2]
for level in levels:
pandas_result = pser.unstack(level=level)
pandas_on_spark_result = psser.unstack(level=level).sort_index()
self.assert_eq(pandas_result, pandas_on_spark_result)
self.assert_eq(pandas_result.index.names, pandas_on_spark_result.index.names)
self.assert_eq(pandas_result.columns.names, pandas_on_spark_result.columns.names)
# non-numeric datatypes
pser = pd.Series(
list("abcd"), index=pd.MultiIndex.from_product([["one", "two"], ["a", "b"]])
)
psser = ps.from_pandas(pser)
levels = [-2, -1, 0, 1]
for level in levels:
pandas_result = pser.unstack(level=level)
pandas_on_spark_result = psser.unstack(level=level).sort_index()
self.assert_eq(pandas_result, pandas_on_spark_result)
self.assert_eq(pandas_result.index.names, pandas_on_spark_result.index.names)
self.assert_eq(pandas_result.columns.names, pandas_on_spark_result.columns.names)
# Exceeding the range of level
self.assertRaises(IndexError, lambda: psser.unstack(level=3))
self.assertRaises(IndexError, lambda: psser.unstack(level=-4))
# Only support for MultiIndex
psser = ps.Series([10, -2, 4, 7])
self.assertRaises(ValueError, lambda: psser.unstack())
def test_abs(self):
pser = pd.Series([-2, -1, 0, 1])
psser = ps.from_pandas(pser)
self.assert_eq(abs(psser), abs(pser))
self.assert_eq(np.abs(psser), np.abs(pser))
def test_factorize(self):
pser = pd.Series(["a", "b", "a", "b"])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series([5, 1, 5, 1])
psser = ps.from_pandas(pser)
pcodes, puniques = (pser + 1).factorize(sort=True)
kcodes, kuniques = (psser + 1).factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series(["a", "b", "a", "b"], name="ser", index=["w", "x", "y", "z"])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series(
["a", "b", "a", "b"], index=pd.MultiIndex.from_arrays([[4, 3, 2, 1], [1, 2, 3, 4]])
)
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
#
# Deals with None and np.nan
#
pser = pd.Series(["a", "b", "a", np.nan])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series([1, None, 3, 2, 1])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series(["a", None, "a"])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True)
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pser = pd.Series([None, np.nan])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize()
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes, kcodes.to_list())
# pandas: Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)
pser = pd.Series([np.nan, np.nan])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize()
kcodes, kuniques = psser.factorize()
self.assert_eq(pcodes, kcodes.to_list())
# pandas: Index([], dtype='float64')
self.assert_eq(pd.Index([]), kuniques)
#
# Deals with na_sentinel
#
pser = pd.Series(["a", "b", "a", np.nan, None])
psser = ps.from_pandas(pser)
pcodes, puniques = pser.factorize(sort=True, use_na_sentinel=-2)
kcodes, kuniques = psser.factorize(use_na_sentinel=-2)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pcodes, puniques = pser.factorize(sort=True, use_na_sentinel=2)
kcodes, kuniques = psser.factorize(use_na_sentinel=2)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
pcodes, puniques = pser.factorize(sort=True, use_na_sentinel=None)
kcodes, kuniques = psser.factorize(use_na_sentinel=None)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
# puniques is Index(['a', 'b', nan], dtype='object')
self.assert_eq(ps.Index(["a", "b", None]), kuniques)
psser = ps.Series([1, 2, np.nan, 4, 5]) # Arrow takes np.nan as null
psser.loc[3] = np.nan # Spark takes np.nan as NaN
kcodes, kuniques = psser.factorize(use_na_sentinel=None)
pcodes, puniques = psser._to_pandas().factorize(sort=True, use_na_sentinel=None)
self.assert_eq(pcodes.tolist(), kcodes.to_list())
self.assert_eq(puniques, kuniques)
def test_explode(self):
pser = pd.Series([[1, 2, 3], [], None, [3, 4]])
psser = ps.from_pandas(pser)
self.assert_eq(pser.explode(), psser.explode(), almost=True)
# MultiIndex
pser.index = pd.MultiIndex.from_tuples([("a", "w"), ("b", "x"), ("c", "y"), ("d", "z")])
psser = ps.from_pandas(pser)
self.assert_eq(pser.explode(), psser.explode(), almost=True)
# non-array type Series
pser = pd.Series([1, 2, 3, 4])
psser = ps.from_pandas(pser)
self.assert_eq(pser.explode(), psser.explode())
def test_between(self):
pser = pd.Series([np.nan, 1, 2, 3, 4])
psser = ps.from_pandas(pser)
self.assert_eq(psser.between(1, 4), pser.between(1, 4))
self.assert_eq(psser.between(1, 4, inclusive="both"), pser.between(1, 4, inclusive="both"))
self.assert_eq(
psser.between(1, 4, inclusive="neither"), pser.between(1, 4, inclusive="neither")
)
self.assert_eq(psser.between(1, 4, inclusive="left"), pser.between(1, 4, inclusive="left"))
self.assert_eq(
psser.between(1, 4, inclusive="right"), pser.between(1, 4, inclusive="right")
)
expected_err_msg = (
"Inclusive has to be either string of 'both','left', 'right', or 'neither'"
)
with self.assertRaisesRegex(ValueError, expected_err_msg):
psser.between(1, 4, inclusive="middle")
# Test for backward compatibility
self.assert_eq(psser.between(1, 4, inclusive="both"), pser.between(1, 4, inclusive="both"))
self.assert_eq(
psser.between(1, 4, inclusive="neither"), pser.between(1, 4, inclusive="neither")
)
def test_between_time(self):
idx = pd.date_range("2018-04-09", periods=4, freq="1D20min")
pser = pd.Series([1, 2, 3, 4], index=idx)
psser = ps.from_pandas(pser)
self.assert_eq(
pser.between_time("0:15", "0:45").sort_index(),
psser.between_time("0:15", "0:45").sort_index(),
)
pser.index.name = "ts"
psser = ps.from_pandas(pser)
self.assert_eq(
pser.between_time("0:15", "0:45").sort_index(),
psser.between_time("0:15", "0:45").sort_index(),
)
pser.index.name = "index"
psser = ps.from_pandas(pser)
self.assert_eq(
pser.between_time("0:15", "0:45").sort_index(),
psser.between_time("0:15", "0:45").sort_index(),
)
self.assert_eq(
pser.between_time("0:15", "0:45", inclusive="neither").sort_index(),
psser.between_time("0:15", "0:45", inclusive="neither").sort_index(),
)
self.assert_eq(
pser.between_time("0:15", "0:45", inclusive="left").sort_index(),
psser.between_time("0:15", "0:45", inclusive="left").sort_index(),
)
self.assert_eq(
pser.between_time("0:15", "0:45", inclusive="right").sort_index(),
psser.between_time("0:15", "0:45", inclusive="right").sort_index(),
)
with self.assertRaises(PySparkValueError) as ctx:
psser.between_time("0:15", "0:45", inclusive="")
self.check_error(
exception=ctx.exception,
errorClass="VALUE_NOT_ALLOWED",
messageParameters={
"arg_name": "inclusive",
"allowed_values": str(["left", "right", "both", "neither"]),
},
)
def test_at_time(self):
idx = pd.date_range("2018-04-09", periods=4, freq="1D20min")
pser = pd.Series([1, 2, 3, 4], index=idx)
psser = ps.from_pandas(pser)
self.assert_eq(
pser.at_time("0:20").sort_index(),
psser.at_time("0:20").sort_index(),
)
pser.index.name = "ts"
psser = ps.from_pandas(pser)
self.assert_eq(
pser.at_time("0:20").sort_index(),
psser.at_time("0:20").sort_index(),
)
pser.index.name = "index"
psser = ps.from_pandas(pser)
self.assert_eq(
pser.at_time("0:20").sort_index(),
psser.at_time("0:20").sort_index(),
)
class SeriesComputeTests(
SeriesComputeMixin,
PandasOnSparkTestCase,
):
pass
if __name__ == "__main__":
from pyspark.testing import main
main()