blob: 3ccc9697070fc6c506e2fbe4c790819ce35c5471 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from datetime import datetime
import pandas as pd
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.pandas.utils import (
SPARK_CONF_ARROW_ENABLED,
SPARK_CONF_PANDAS_STRUCT_MODE,
)
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.testing.utils import is_ansi_mode_test
class ConversionMixin:
@property
def pdf(self):
return pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
@property
def psdf(self):
return ps.from_pandas(self.pdf)
def test_index_from_series(self):
pser = pd.Series([1, 2, 3], name="a", index=[10, 20, 30])
psser = ps.from_pandas(pser)
self.assert_eq(ps.Index(psser), pd.Index(pser))
self.assert_eq(ps.Index(psser, dtype="float"), pd.Index(pser, dtype="float"))
self.assert_eq(ps.Index(psser, name="x"), pd.Index(pser, name="x"))
self.assert_eq(ps.Index(psser, dtype="int64"), pd.Index(pser, dtype="int64"))
self.assert_eq(ps.Index(psser, dtype="float64"), pd.Index(pser, dtype="float64"))
pser = pd.Series([datetime(2021, 3, 1), datetime(2021, 3, 2)], name="x", index=[10, 20])
psser = ps.from_pandas(pser)
self.assert_eq(ps.Index(psser), pd.Index(pser))
self.assert_eq(ps.DatetimeIndex(psser), pd.DatetimeIndex(pser))
def test_index_from_index(self):
pidx = pd.Index([1, 2, 3], name="a")
psidx = ps.from_pandas(pidx)
self.assert_eq(ps.Index(psidx), pd.Index(pidx))
self.assert_eq(ps.Index(psidx, dtype="float"), pd.Index(pidx, dtype="float"))
self.assert_eq(ps.Index(psidx, name="x"), pd.Index(pidx, name="x"))
self.assert_eq(ps.Index(psidx, copy=True), pd.Index(pidx, copy=True))
self.assert_eq(ps.Index(psidx, dtype="int64"), pd.Index(pidx, dtype="int64"))
self.assert_eq(ps.Index(psidx, dtype="float64"), pd.Index(pidx, dtype="float64"))
pidx = pd.DatetimeIndex(["2021-03-01", "2021-03-02"])
psidx = ps.from_pandas(pidx)
self.assert_eq(ps.Index(psidx), pd.Index(pidx))
self.assert_eq(ps.DatetimeIndex(psidx), pd.DatetimeIndex(pidx))
def test_multiindex_from_arrays(self):
arrays = [["a", "a", "b", "b"], ["red", "blue", "red", "blue"]]
pidx = pd.MultiIndex.from_arrays(arrays)
psidx = ps.MultiIndex.from_arrays(arrays)
self.assert_eq(pidx, psidx)
def test_multiindex_from_tuples(self):
tuples = [(1, "red"), (1, "blue"), (2, "red"), (2, "blue")]
pidx = pd.MultiIndex.from_tuples(tuples)
psidx = ps.MultiIndex.from_tuples(tuples)
self.assert_eq(pidx, psidx)
def test_multiindex_from_product(self):
iterables = [[0, 1, 2], ["green", "purple"]]
pidx = pd.MultiIndex.from_product(iterables)
psidx = ps.MultiIndex.from_product(iterables)
self.assert_eq(pidx, psidx)
def test_multi_index_from_index(self):
tuples = [(1, "red"), (1, "blue"), (2, "red"), (2, "blue")]
pmidx = pd.Index(tuples)
psmidx = ps.Index(tuples)
self.assertTrue(isinstance(psmidx, ps.MultiIndex))
self.assert_eq(pmidx, psmidx)
# Specify the `names`
pmidx = pd.Index(tuples)
pmidx.names = ["Hello", "Koalas"]
psmidx = ps.Index(tuples)
psmidx.names = ["Hello", "Koalas"]
self.assertTrue(isinstance(psmidx, ps.MultiIndex))
self.assert_eq(pmidx, psmidx)
def test_multiindex_from_frame(self):
pdf = pd.DataFrame(
[["HI", "Temp"], ["HI", "Precip"], ["NJ", "Temp"], ["NJ", "Precip"]], columns=["a", "b"]
)
psdf = ps.from_pandas(pdf)
pidx = pd.MultiIndex.from_frame(pdf)
psidx = ps.MultiIndex.from_frame(psdf)
self.assert_eq(pidx, psidx)
# Specify `names`
pidx = pd.MultiIndex.from_frame(pdf, names=["state", "observation"])
psidx = ps.MultiIndex.from_frame(psdf, names=["state", "observation"])
self.assert_eq(pidx, psidx)
pidx = pd.MultiIndex.from_frame(pdf, names=("state", "observation"))
psidx = ps.MultiIndex.from_frame(psdf, names=("state", "observation"))
self.assert_eq(pidx, psidx)
# MultiIndex columns
pidx = pd.MultiIndex.from_tuples([("a", "w"), ("b", "x")])
pdf.columns = pidx
psdf = ps.from_pandas(pdf)
pidx = pd.MultiIndex.from_frame(pdf)
psidx = ps.MultiIndex.from_frame(psdf)
self.assert_eq(pidx, psidx)
# tuples for names
pidx = pd.MultiIndex.from_frame(pdf, names=[("a", "w"), ("b", "x")])
psidx = ps.MultiIndex.from_frame(psdf, names=[("a", "w"), ("b", "x")])
self.assert_eq(pidx, psidx)
err_msg = "Input must be a DataFrame"
with self.assertRaisesRegex(TypeError, err_msg):
ps.MultiIndex.from_frame({"a": [1, 2, 3], "b": [4, 5, 6]})
self.assertRaises(TypeError, lambda: ps.MultiIndex.from_frame(psdf, names="ab"))
# non-string names
self.assert_eq(
ps.MultiIndex.from_frame(psdf, names=[0, 1]),
pd.MultiIndex.from_frame(pdf, names=[0, 1]),
)
self.assert_eq(
ps.MultiIndex.from_frame(psdf, names=[("x", 0), ("y", 1)]),
pd.MultiIndex.from_frame(pdf, names=[("x", 0), ("y", 1)]),
)
pdf = pd.DataFrame([["HI", "Temp"], ["HI", "Precip"], ["NJ", "Temp"], ["NJ", "Precip"]])
psdf = ps.from_pandas(pdf)
self.assert_eq(ps.MultiIndex.from_frame(psdf), pd.MultiIndex.from_frame(pdf))
def test_to_series(self):
pidx = self.pdf.index
psidx = self.psdf.index
self.assert_eq(psidx.to_series(), pidx.to_series())
self.assert_eq(psidx.to_series(name="a"), pidx.to_series(name="a"))
# With name
pidx.name = "Koalas"
psidx.name = "Koalas"
self.assert_eq(psidx.to_series(), pidx.to_series())
self.assert_eq(psidx.to_series(name=("x", "a")), pidx.to_series(name=("x", "a")))
# With tupled name
pidx.name = ("x", "a")
psidx.name = ("x", "a")
self.assert_eq(psidx.to_series(), pidx.to_series())
self.assert_eq(psidx.to_series(name="a"), pidx.to_series(name="a"))
self.assert_eq((psidx + 1).to_series(), (pidx + 1).to_series())
# Multiindex
arrays = [[1, 2], ["red", "blue"]]
pidx = pd.MultiIndex.from_arrays(arrays, names=("number", "color"))
psidx = ps.from_pandas(pidx)
if is_ansi_mode_test:
with self.sql_conf(
{
SPARK_CONF_PANDAS_STRUCT_MODE: "row",
}
):
self.assert_eq(
list(psidx.to_series().values),
list(pidx.to_series().values),
)
else:
self.assert_eq(list(psidx.to_series().values), [["1", "red"], ["2", "blue"]])
pidx = self.pdf.set_index("b", append=True).index
psidx = self.psdf.set_index("b", append=True).index
with self.sql_conf({SPARK_CONF_ARROW_ENABLED: False, SPARK_CONF_PANDAS_STRUCT_MODE: "row"}):
self.assert_eq(psidx.to_series(), pidx.to_series(), check_exact=False)
self.assert_eq(psidx.to_series(name="a"), pidx.to_series(name="a"), check_exact=False)
expected_error_message = "Series.name must be a hashable type"
with self.assertRaisesRegex(TypeError, expected_error_message):
psidx.to_series(name=["x", "a"])
def test_to_frame(self):
pidx = self.pdf.index
psidx = self.psdf.index
self.assert_eq(psidx.to_frame(), pidx.to_frame())
self.assert_eq(psidx.to_frame(index=False), pidx.to_frame(index=False))
pidx.name = "a"
psidx.name = "a"
self.assert_eq(psidx.to_frame(), pidx.to_frame())
self.assert_eq(psidx.to_frame(index=False), pidx.to_frame(index=False))
self.assert_eq(psidx.to_frame(name="x"), pidx.to_frame(name="x"))
self.assert_eq(psidx.to_frame(index=False, name="x"), pidx.to_frame(index=False, name="x"))
self.assertRaises(TypeError, lambda: psidx.to_frame(name=["x"]))
# non-string name
self.assert_eq(psidx.to_frame(name=10), pidx.to_frame(name=10))
self.assert_eq(psidx.to_frame(name=("x", 10)), pidx.to_frame(name=("x", 10)))
pidx = self.pdf.set_index("b", append=True).index
psidx = self.psdf.set_index("b", append=True).index
self.assert_eq(psidx.to_frame(), pidx.to_frame())
self.assert_eq(psidx.to_frame(index=False), pidx.to_frame(index=False))
self.assert_eq(psidx.to_frame(name=["x", "y"]), pidx.to_frame(name=["x", "y"]))
self.assert_eq(psidx.to_frame(name=("x", "y")), pidx.to_frame(name=("x", "y")))
self.assert_eq(
psidx.to_frame(index=False, name=["x", "y"]),
pidx.to_frame(index=False, name=["x", "y"]),
)
self.assertRaises(TypeError, lambda: psidx.to_frame(name="x"))
self.assertRaises(ValueError, lambda: psidx.to_frame(name=["x"]))
# non-string names
self.assert_eq(psidx.to_frame(name=[10, 20]), pidx.to_frame(name=[10, 20]))
self.assert_eq(psidx.to_frame(name=("x", 10)), pidx.to_frame(name=("x", 10)))
# Since pandas 1.5.0, the result is changed as below:
# (x, 10) (y, 20)
# b
# 0 4 0 4
# 1 5 1 5
# 3 6 3 6
# 5 3 5 3
# 6 2 6 2
# 8 1 8 1
# 9 0 9 0
# 0 9 0
# 0 9 0
#
# The columns should be `Index([('x', 20), ('y', 20)], dtype='object')`,
# but pandas API on Spark doesn't support such a way for creating Index.
# So, we currently cannot follow the behavior of pandas.
expected_result = ps.DataFrame(
{("x", 10): [0, 1, 3, 5, 6, 8, 9, 9, 9], ("y", 20): [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=ps.MultiIndex.from_tuples(
[(0, 4), (1, 5), (3, 6), (5, 3), (6, 2), (8, 1), (9, 0), (9, 0), (9, 0)],
names=[None, "b"],
),
)
self.assert_eq(psidx.to_frame(name=[("x", 10), ("y", 20)]), expected_result)
def test_to_list(self):
# Index
pidx = pd.Index([1, 2, 3, 4, 5])
psidx = ps.from_pandas(pidx)
# MultiIndex
tuples = [(1, "red"), (1, "blue"), (2, "red"), (2, "green")]
pmidx = pd.MultiIndex.from_tuples(tuples)
psmidx = ps.from_pandas(pmidx)
self.assert_eq(psidx.tolist(), pidx.tolist())
self.assert_eq(psmidx.tolist(), pmidx.tolist())
def test_to_numpy(self):
pidx = pd.Index([1, 2, 3, 4])
psidx = ps.from_pandas(pidx)
self.assert_eq(pidx.to_numpy(copy=True), psidx.to_numpy(copy=True))
class ConversionTests(
ConversionMixin,
PandasOnSparkTestCase,
SQLTestUtils,
):
pass
if __name__ == "__main__":
from pyspark.pandas.tests.indexes.test_conversion import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)