blob: 2ad6523f59265335e8908ae6a2ac0c37c0c4f39f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import unittest
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.pandas.exceptions import SparkPandasNotImplementedError
from pyspark.testing.pandasutils import PandasOnSparkTestCase, compare_both
from pyspark.testing.sqlutils import SQLTestUtils
class IndexingAdvMixin:
@property
def pdf(self):
return pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
@property
def pdf2(self):
return pd.DataFrame(
{0: [1, 2, 3, 4, 5, 6, 7, 8, 9], 1: [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
@property
def psdf(self):
return ps.from_pandas(self.pdf)
@property
def psdf2(self):
return ps.from_pandas(self.pdf2)
def test_at(self):
pdf = self.pdf
psdf = self.psdf
# Create the equivalent of pdf.loc[3] as a Koalas Series
# This is necessary because .loc[n] does not currently work with Koalas DataFrames (#383)
test_series = ps.Series([3, 6], index=["a", "b"], name="3")
# Assert invalided signatures raise TypeError
with self.assertRaises(TypeError, msg="Use DataFrame.at like .at[row_index, column_name]"):
psdf.at[3]
with self.assertRaises(TypeError, msg="Use DataFrame.at like .at[row_index, column_name]"):
psdf.at["ab"] # 'ab' is of length 2 but str type instead of tuple
with self.assertRaises(TypeError, msg="Use Series.at like .at[column_name]"):
test_series.at[3, "b"]
# Assert .at for DataFrames
self.assertEqual(psdf.at[3, "b"], 6)
self.assertEqual(psdf.at[3, "b"], pdf.at[3, "b"])
self.assert_eq(psdf.at[9, "b"], np.array([0, 0, 0]))
self.assert_eq(psdf.at[9, "b"], pdf.at[9, "b"])
# Assert .at for Series
self.assertEqual(test_series.at["b"], 6)
self.assertEqual(test_series.at["b"], pdf.loc[3].at["b"])
# Assert multi-character indices
self.assertEqual(
ps.Series([0, 1], index=["ab", "cd"]).at["ab"],
pd.Series([0, 1], index=["ab", "cd"]).at["ab"],
)
# Assert invalid column or index names result in a KeyError like with pandas
with self.assertRaises(KeyError, msg="x"):
psdf.at[3, "x"]
with self.assertRaises(KeyError, msg=99):
psdf.at[99, "b"]
with self.assertRaises(ValueError):
psdf.at[(3, 6), "b"]
with self.assertRaises(KeyError):
psdf.at[3, ("x", "b")]
# Assert setting values fails
with self.assertRaises(TypeError):
psdf.at[3, "b"] = 10
# non-string column names
pdf = self.pdf2
psdf = self.psdf2
# Assert .at for DataFrames
self.assertEqual(psdf.at[3, 1], 6)
self.assertEqual(psdf.at[3, 1], pdf.at[3, 1])
self.assert_eq(psdf.at[9, 1], np.array([0, 0, 0]))
self.assert_eq(psdf.at[9, 1], pdf.at[9, 1])
def test_at_multiindex(self):
psdf = self.psdf.set_index("b", append=True)
self.assert_eq(psdf.at[(3, 6), "a"], 3)
self.assert_eq(psdf.at[(3,), "a"], np.array([3]))
self.assert_eq(list(psdf.at[(9, 0), "a"]), [7, 8, 9])
self.assert_eq(list(psdf.at[(9,), "a"]), [7, 8, 9])
with self.assertRaises(ValueError):
psdf.at[3, "a"]
def test_at_multiindex_columns(self):
arrays = [np.array(["bar", "bar", "baz", "baz"]), np.array(["one", "two", "one", "two"])]
pdf = pd.DataFrame(np.random.randn(3, 4), index=["A", "B", "C"], columns=arrays)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.at["B", ("bar", "one")], pdf.at["B", ("bar", "one")])
with self.assertRaises(KeyError):
psdf.at["B", "bar"]
# non-string column names
arrays = [np.array([0, 0, 1, 1]), np.array([1, 2, 1, 2])]
pdf = pd.DataFrame(np.random.randn(3, 4), index=["A", "B", "C"], columns=arrays)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.at["B", (0, 1)], pdf.at["B", (0, 1)])
def test_iat(self):
pdf = self.pdf
psdf = self.psdf
# Create the equivalent of pdf.loc[3] as a Koalas Series
# This is necessary because .loc[n] does not currently work with Koalas DataFrames (#383)
test_series = ps.Series([3, 6], index=["a", "b"], name="3")
# Assert invalided signatures raise TypeError
with self.assertRaises(
TypeError,
msg="Use DataFrame.at like .iat[row_interget_position, column_integer_position]",
):
psdf.iat[3]
with self.assertRaises(
ValueError, msg="iAt based indexing on multi-index can only have tuple values"
):
psdf.iat[3, "b"] # 'ab' is of length 2 but str type instead of tuple
with self.assertRaises(TypeError, msg="Use Series.iat like .iat[row_integer_position]"):
test_series.iat[3, "b"]
# Assert .iat for DataFrames
self.assertEqual(psdf.iat[7, 0], 8)
self.assertEqual(psdf.iat[7, 0], pdf.iat[7, 0])
# Assert .iat for Series
self.assertEqual(test_series.iat[1], 6)
self.assertEqual(test_series.iat[1], pdf.loc[3].iat[1])
# Assert invalid column or integer position result in a KeyError like with pandas
with self.assertRaises(KeyError, msg=99):
psdf.iat[0, 99]
with self.assertRaises(KeyError, msg=99):
psdf.iat[99, 0]
with self.assertRaises(ValueError):
psdf.iat[(1, 1), 1]
with self.assertRaises(ValueError):
psdf.iat[1, (1, 1)]
# Assert setting values fails
with self.assertRaises(TypeError):
psdf.iat[4, 1] = 10
def test_iat_multiindex(self):
pdf = self.pdf.set_index("b", append=True)
psdf = self.psdf.set_index("b", append=True)
self.assert_eq(psdf.iat[7, 0], pdf.iat[7, 0])
with self.assertRaises(ValueError):
psdf.iat[3, "a"]
def test_iat_multiindex_columns(self):
arrays = [np.array(["bar", "bar", "baz", "baz"]), np.array(["one", "two", "one", "two"])]
pdf = pd.DataFrame(np.random.randn(3, 4), index=["A", "B", "C"], columns=arrays)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.iat[1, 3], pdf.iat[1, 3])
with self.assertRaises(KeyError):
psdf.iat[0, 99]
with self.assertRaises(KeyError):
psdf.iat[99, 0]
def test_getitem(self):
pdf = pd.DataFrame(
{
"A": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"B": [9, 8, 7, 6, 5, 4, 3, 2, 1],
"C": [True, False, True] * 3,
},
columns=list("ABC"),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf["A"], pdf["A"])
self.assert_eq(psdf[["A", "B"]], pdf[["A", "B"]])
self.assert_eq(psdf[psdf.C], pdf[pdf.C])
self.assertRaises(KeyError, lambda: psdf["X"])
self.assertRaises(KeyError, lambda: psdf[["A", "X"]])
self.assertRaises(AttributeError, lambda: psdf.X)
# not str/unicode
pdf = pd.DataFrame(np.random.randn(10, 5))
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf[0], pdf[0])
self.assert_eq(psdf[[1, 2]], pdf[[1, 2]])
self.assertRaises(KeyError, lambda: pdf[8])
self.assertRaises(KeyError, lambda: pdf[[1, 8]])
# non-string column names
pdf = pd.DataFrame(
{
10: [1, 2, 3, 4, 5, 6, 7, 8, 9],
20: [9, 8, 7, 6, 5, 4, 3, 2, 1],
30: [True, False, True] * 3,
}
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf[10], pdf[10])
self.assert_eq(psdf[[10, 20]], pdf[[10, 20]])
def test_getitem_slice(self):
pdf = pd.DataFrame(
{
"A": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"B": [9, 8, 7, 6, 5, 4, 3, 2, 1],
"C": [True, False, True] * 3,
},
index=list("abcdefghi"),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf["a":"e"], pdf["a":"e"])
self.assert_eq(psdf["a":"b"], pdf["a":"b"])
self.assert_eq(psdf["f":], pdf["f":])
@unittest.skip("TODO?: the behavior of slice for datetime")
def test_getitem_timestamp_str(self):
pdf = pd.DataFrame(
{"A": np.random.randn(100), "B": np.random.randn(100)},
index=pd.date_range("2011-01-01", freq="H", periods=100),
)
psdf = ps.from_pandas(pdf)
# partial string slice
# TODO?: self.assert_eq(pdf['2011-01-02'],
# TODO?: psdf['2011-01-02'])
self.assert_eq(pdf["2011-01-02":"2011-01-05"], psdf["2011-01-02":"2011-01-05"])
pdf = pd.DataFrame(
{"A": np.random.randn(100), "B": np.random.randn(100)},
index=pd.date_range("2011-01-01", freq="M", periods=100),
)
psdf = ps.from_pandas(pdf)
# TODO?: self.assert_eq(pdf['2011-01'], psdf['2011-01'])
# TODO?: self.assert_eq(pdf['2011'], psdf['2011'])
self.assert_eq(pdf["2011-01":"2012-05"], psdf["2011-01":"2012-05"])
self.assert_eq(pdf["2011":"2015"], psdf["2011":"2015"])
@unittest.skip("TODO?: period index can't convert to DataFrame correctly")
def test_getitem_period_str(self):
pdf = pd.DataFrame(
{"A": np.random.randn(100), "B": np.random.randn(100)},
index=pd.period_range("2011-01-01", freq="H", periods=100),
)
psdf = ps.from_pandas(pdf)
# partial string slice
# TODO?: self.assert_eq(pdf['2011-01-02'],
# TODO?: psdf['2011-01-02'])
self.assert_eq(pdf["2011-01-02":"2011-01-05"], psdf["2011-01-02":"2011-01-05"])
pdf = pd.DataFrame(
{"A": np.random.randn(100), "B": np.random.randn(100)},
index=pd.period_range("2011-01-01", freq="M", periods=100),
)
psdf = ps.from_pandas(pdf)
# TODO?: self.assert_eq(pdf['2011-01'], psdf['2011-01'])
# TODO?: self.assert_eq(pdf['2011'], psdf['2011'])
self.assert_eq(pdf["2011-01":"2012-05"], psdf["2011-01":"2012-05"])
self.assert_eq(pdf["2011":"2015"], psdf["2011":"2015"])
def test_iloc(self):
pdf = pd.DataFrame({"A": [1, 2], "B": [3, 4], "C": [5, 6]})
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.iloc[0, 0], pdf.iloc[0, 0])
for indexer in [0, [0], [0, 1], [1, 0], [False, True, True], slice(0, 1)]:
self.assert_eq(psdf.iloc[:, indexer], pdf.iloc[:, indexer])
self.assert_eq(psdf.iloc[:1, indexer], pdf.iloc[:1, indexer])
self.assert_eq(psdf.iloc[:-1, indexer], pdf.iloc[:-1, indexer])
# self.assert_eq(psdf.iloc[psdf.index == 2, indexer], pdf.iloc[pdf.index == 2, indexer])
self.assertRaisesRegex(
SparkPandasNotImplementedError,
".iloc requires numeric slice, conditional boolean",
lambda: ps.range(10).iloc["a", :],
)
def test_index_operator_datetime(self):
dates = pd.date_range("20130101", periods=6)
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list("ABCD"))
psdf = ps.from_pandas(pdf)
# Positional iloc search
self.assert_eq(psdf[:4], pdf[:4], almost=True)
self.assert_eq(psdf[:3], pdf[:3], almost=True)
self.assert_eq(psdf[3:], pdf[3:], almost=True)
self.assert_eq(psdf[2:], pdf[2:], almost=True)
self.assert_eq(psdf[2:3], pdf[2:3], almost=True)
self.assert_eq(psdf[2:-1], pdf[2:-1], almost=True)
self.assert_eq(psdf[10:3], pdf[10:3], almost=True)
# Index loc search
self.assert_eq(psdf.A[4], pdf.A[4])
self.assert_eq(psdf.A[3], pdf.A[3])
# Positional iloc search
self.assert_eq(psdf.A[:4], pdf.A[:4], almost=True)
self.assert_eq(psdf.A[:3], pdf.A[:3], almost=True)
self.assert_eq(psdf.A[3:], pdf.A[3:], almost=True)
self.assert_eq(psdf.A[2:], pdf.A[2:], almost=True)
self.assert_eq(psdf.A[2:3], pdf.A[2:3], almost=True)
self.assert_eq(psdf.A[2:-1], pdf.A[2:-1], almost=True)
self.assert_eq(psdf.A[10:3], pdf.A[10:3], almost=True)
dt1 = datetime.datetime.strptime("2013-01-02", "%Y-%m-%d")
dt2 = datetime.datetime.strptime("2013-01-04", "%Y-%m-%d")
# Index loc search
self.assert_eq(psdf[:dt2], pdf[:dt2], almost=True)
self.assert_eq(psdf[dt1:], pdf[dt1:], almost=True)
self.assert_eq(psdf[dt1:dt2], pdf[dt1:dt2], almost=True)
self.assert_eq(psdf.A[dt2], pdf.A[dt2], almost=True)
self.assert_eq(psdf.A[:dt2], pdf.A[:dt2], almost=True)
self.assert_eq(psdf.A[dt1:], pdf.A[dt1:], almost=True)
self.assert_eq(psdf.A[dt1:dt2], pdf.A[dt1:dt2], almost=True)
def test_index_operator_int(self):
pdf = pd.DataFrame(np.random.randn(6, 4), index=[1, 3, 5, 7, 9, 11], columns=list("ABCD"))
psdf = ps.from_pandas(pdf)
# Positional iloc search
self.assert_eq(psdf[:4], pdf[:4])
self.assert_eq(psdf[:3], pdf[:3])
self.assert_eq(psdf[3:], pdf[3:])
self.assert_eq(psdf[2:], pdf[2:])
self.assert_eq(psdf[2:3], pdf[2:3])
self.assert_eq(psdf[2:-1], pdf[2:-1])
self.assert_eq(psdf[10:3], pdf[10:3])
# Index loc search
self.assert_eq(psdf.A[5], pdf.A[5])
self.assert_eq(psdf.A[3], pdf.A[3])
with self.assertRaisesRegex(
NotImplementedError, "Duplicated row selection is not currently supported"
):
psdf.iloc[[1, 1]]
class IndexingAdvTests(
IndexingAdvMixin,
PandasOnSparkTestCase,
SQLTestUtils,
):
pass
if __name__ == "__main__":
from pyspark.pandas.tests.indexes.test_indexing_adv import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)