blob: ff511c0438f27025c0595aa67e2a80010a1acc53 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime
import unittest
import numpy as np
import pandas as pd
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
# This file contains test cases for 'Attributes and underlying data'
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#attributes-and-underlying-data
class FrameAttrsMixin:
@property
def pdf(self):
return pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=np.random.rand(9),
)
@property
def psdf(self):
return ps.from_pandas(self.pdf)
@property
def df_pair(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
return pdf, psdf
def test_column_names(self):
pdf, psdf = self.df_pair
self.assert_eq(psdf.columns, pdf.columns)
self.assert_eq(psdf[["b", "a"]].columns, pdf[["b", "a"]].columns)
self.assert_eq(psdf["a"].name, pdf["a"].name)
self.assert_eq((psdf["a"] + 1).name, (pdf["a"] + 1).name)
self.assert_eq((psdf.a + psdf.b).name, (pdf.a + pdf.b).name)
self.assert_eq((psdf.a + psdf.b.rename("a")).name, (pdf.a + pdf.b.rename("a")).name)
self.assert_eq((psdf.a + psdf.b.rename()).name, (pdf.a + pdf.b.rename()).name)
self.assert_eq((psdf.a.rename() + psdf.b).name, (pdf.a.rename() + pdf.b).name)
self.assert_eq(
(psdf.a.rename() + psdf.b.rename()).name, (pdf.a.rename() + pdf.b.rename()).name
)
def test_rename_columns(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]}, index=np.random.rand(7)
)
psdf = ps.from_pandas(pdf)
psdf.columns = ["x", "y"]
pdf.columns = ["x", "y"]
self.assert_eq(psdf.columns, pd.Index(["x", "y"]))
self.assert_eq(psdf, pdf)
self.assert_eq(psdf._internal.data_spark_column_names, ["x", "y"])
self.assert_eq(psdf.to_spark().columns, ["x", "y"])
self.assert_eq(psdf.to_spark(index_col="index").columns, ["index", "x", "y"])
columns = pdf.columns
columns.name = "lvl_1"
psdf.columns = columns
self.assert_eq(psdf.columns.names, ["lvl_1"])
self.assert_eq(psdf, pdf)
msg = "Length mismatch: Expected axis has 2 elements, new values have 4 elements"
with self.assertRaisesRegex(ValueError, msg):
psdf.columns = [1, 2, 3, 4]
# Multi-index columns
pdf = pd.DataFrame(
{("A", "0"): [1, 2, 2, 3], ("B", "1"): [1, 2, 3, 4]}, index=np.random.rand(4)
)
psdf = ps.from_pandas(pdf)
columns = pdf.columns
self.assert_eq(psdf.columns, columns)
self.assert_eq(psdf, pdf)
pdf.columns = ["x", "y"]
psdf.columns = ["x", "y"]
self.assert_eq(psdf.columns, pd.Index(["x", "y"]))
self.assert_eq(psdf, pdf)
self.assert_eq(psdf._internal.data_spark_column_names, ["x", "y"])
self.assert_eq(psdf.to_spark().columns, ["x", "y"])
self.assert_eq(psdf.to_spark(index_col="index").columns, ["index", "x", "y"])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(psdf.columns, columns)
self.assert_eq(psdf, pdf)
self.assert_eq(psdf._internal.data_spark_column_names, ["(A, 0)", "(B, 1)"])
self.assert_eq(psdf.to_spark().columns, ["(A, 0)", "(B, 1)"])
self.assert_eq(psdf.to_spark(index_col="index").columns, ["index", "(A, 0)", "(B, 1)"])
columns.names = ["lvl_1", "lvl_2"]
psdf.columns = columns
self.assert_eq(psdf.columns.names, ["lvl_1", "lvl_2"])
self.assert_eq(psdf, pdf)
self.assert_eq(psdf._internal.data_spark_column_names, ["(A, 0)", "(B, 1)"])
self.assert_eq(psdf.to_spark().columns, ["(A, 0)", "(B, 1)"])
self.assert_eq(psdf.to_spark(index_col="index").columns, ["index", "(A, 0)", "(B, 1)"])
def test_multi_index_dtypes(self):
# SPARK-36930: Support ps.MultiIndex.dtypes
arrays = [[1, 1, 2, 2], ["red", "blue", "red", "blue"]]
pmidx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
psmidx = ps.from_pandas(pmidx)
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
# multiple labels
pmidx = pd.MultiIndex.from_arrays(arrays, names=[("zero", "first"), ("one", "second")])
psmidx = ps.from_pandas(pmidx)
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
def test_multi_index_dtypes_not_unique_name(self):
# Regression test for https://github.com/pandas-dev/pandas/issues/45174
pmidx = pd.MultiIndex.from_arrays([[1], [2]], names=[1, 1])
psmidx = ps.from_pandas(pmidx)
self.assert_eq(psmidx.dtypes, pmidx.dtypes)
def test_dtype(self):
pdf = pd.DataFrame(
{
"a": list("abc"),
"b": list(range(1, 4)),
"c": np.arange(3, 6).astype("i1"),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
},
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf, pdf)
self.assertTrue((psdf.dtypes == pdf.dtypes).all())
# multi-index columns
columns = pd.MultiIndex.from_tuples(zip(list("xxxyyz"), list("abcdef")))
pdf.columns = columns
psdf.columns = columns
self.assertTrue((psdf.dtypes == pdf.dtypes).all())
def test_axes(self):
pdf = self.pdf
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.axes, psdf.axes)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b")])
pdf.columns = columns
psdf.columns = columns
self.assert_eq(pdf.axes, psdf.axes)
def test_inplace(self):
pdf, psdf = self.df_pair
pser = pdf.a
psser = psdf.a
pdf["a"] = pdf["a"] + 10
psdf["a"] = psdf["a"] + 10
self.assert_eq(psdf, pdf)
self.assert_eq(psser, pser)
def test_dataframe_multiindex_columns(self):
pdf = pd.DataFrame(
{
("x", "a", "1"): [1, 2, 3],
("x", "b", "2"): [4, 5, 6],
("y.z", "c.d", "3"): [7, 8, 9],
("x", "b", "4"): [10, 11, 12],
},
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf, pdf)
self.assert_eq(psdf["x"], pdf["x"])
self.assert_eq(psdf["y.z"], pdf["y.z"])
self.assert_eq(psdf["x"]["b"], pdf["x"]["b"])
self.assert_eq(psdf["x"]["b"]["2"], pdf["x"]["b"]["2"])
self.assert_eq(psdf.x, pdf.x)
self.assert_eq(psdf.x.b, pdf.x.b)
self.assert_eq(psdf.x.b["2"], pdf.x.b["2"])
self.assertRaises(KeyError, lambda: psdf["z"])
self.assertRaises(AttributeError, lambda: psdf.z)
self.assert_eq(psdf[("x",)], pdf[("x",)])
self.assert_eq(psdf[("x", "a")], pdf[("x", "a")])
self.assert_eq(psdf[("x", "a", "1")], pdf[("x", "a", "1")])
def test_dataframe_column_level_name(self):
column = pd.Index(["A", "B", "C"], name="X")
pdf = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=column, index=np.random.rand(2))
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf, pdf)
self.assert_eq(psdf.columns.names, pdf.columns.names)
self.assert_eq(psdf._to_pandas().columns.names, pdf.columns.names)
def test_dataframe_multiindex_names_level(self):
columns = pd.MultiIndex.from_tuples(
[("X", "A", "Z"), ("X", "B", "Z"), ("Y", "C", "Z"), ("Y", "D", "Z")],
names=["lvl_1", "lvl_2", "lv_3"],
)
pdf = pd.DataFrame(
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16], [17, 18, 19, 20]],
columns=columns,
index=np.random.rand(5),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf.columns.names, pdf.columns.names)
self.assert_eq(psdf._to_pandas().columns.names, pdf.columns.names)
psdf1 = ps.from_pandas(pdf)
self.assert_eq(psdf1.columns.names, pdf.columns.names)
self.assertRaises(
AssertionError,
lambda: ps.DataFrame(psdf1._internal.copy(column_label_names=("level",))),
)
self.assert_eq(psdf["X"], pdf["X"])
self.assert_eq(psdf["X"].columns.names, pdf["X"].columns.names)
self.assert_eq(psdf["X"]._to_pandas().columns.names, pdf["X"].columns.names)
self.assert_eq(psdf["X"]["A"], pdf["X"]["A"])
self.assert_eq(psdf["X"]["A"].columns.names, pdf["X"]["A"].columns.names)
self.assert_eq(psdf["X"]["A"]._to_pandas().columns.names, pdf["X"]["A"].columns.names)
self.assert_eq(psdf[("X", "A")], pdf[("X", "A")])
self.assert_eq(psdf[("X", "A")].columns.names, pdf[("X", "A")].columns.names)
self.assert_eq(psdf[("X", "A")]._to_pandas().columns.names, pdf[("X", "A")].columns.names)
self.assert_eq(psdf[("X", "A", "Z")], pdf[("X", "A", "Z")])
def test_repr_cache_invalidation(self):
# If there is any cache, inplace operations should invalidate it.
df = ps.range(10)
df.__repr__()
df["a"] = df["id"]
self.assertEqual(df.__repr__(), df._to_pandas().__repr__())
def test_repr_html_cache_invalidation(self):
# If there is any cache, inplace operations should invalidate it.
df = ps.range(10)
df._repr_html_()
df["a"] = df["id"]
self.assertEqual(df._repr_html_(), df._to_pandas()._repr_html_())
def test_assign(self):
pdf, psdf = self.df_pair
psdf["w"] = 1.0
pdf["w"] = 1.0
self.assert_eq(psdf, pdf)
psdf.w = 10.0
pdf.w = 10.0
self.assert_eq(psdf, pdf)
psdf[1] = 1.0
pdf[1] = 1.0
self.assert_eq(psdf, pdf)
psdf = psdf.assign(a=psdf["a"] * 2)
pdf = pdf.assign(a=pdf["a"] * 2)
self.assert_eq(psdf, pdf)
# multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "w"), ("y", "v")])
pdf.columns = columns
psdf.columns = columns
psdf[("a", "c")] = "def"
pdf[("a", "c")] = "def"
self.assert_eq(psdf, pdf)
psdf = psdf.assign(Z="ZZ")
pdf = pdf.assign(Z="ZZ")
self.assert_eq(psdf, pdf)
psdf["x"] = "ghi"
pdf["x"] = "ghi"
self.assert_eq(psdf, pdf)
def test_attributes(self):
psdf = self.psdf
self.assertIn("a", dir(psdf))
self.assertNotIn("foo", dir(psdf))
self.assertRaises(AttributeError, lambda: psdf.foo)
psdf = ps.DataFrame({"a b c": [1, 2, 3]})
self.assertNotIn("a b c", dir(psdf))
psdf = ps.DataFrame({"a": [1, 2], 5: [1, 2]})
self.assertIn("a", dir(psdf))
self.assertNotIn(5, dir(psdf))
def test_empty_timestamp(self):
pdf = pd.DataFrame(
{
"t": [
datetime(2019, 1, 1, 0, 0, 0),
datetime(2019, 1, 2, 0, 0, 0),
datetime(2019, 1, 3, 0, 0, 0),
]
},
index=np.random.rand(3),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(psdf[psdf["t"] != psdf["t"]], pdf[pdf["t"] != pdf["t"]])
self.assert_eq(psdf[psdf["t"] != psdf["t"]].dtypes, pdf[pdf["t"] != pdf["t"]].dtypes)
class FrameAttrsTests(
FrameAttrsMixin,
PandasOnSparkTestCase,
SQLTestUtils,
):
pass
if __name__ == "__main__":
from pyspark.pandas.tests.frame.test_attrs import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)