|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  | import unittest | 
|  |  | 
|  | import pandas as pd | 
|  |  | 
|  | from pyspark import pandas as ps | 
|  | from pyspark.testing.pandasutils import PandasOnSparkTestCase | 
|  | from pyspark.testing.sqlutils import SQLTestUtils | 
|  |  | 
|  |  | 
|  | class UniqueMixin: | 
|  | @property | 
|  | def pdf(self): | 
|  | return pd.DataFrame( | 
|  | {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]}, | 
|  | index=[0, 1, 3, 5, 6, 8, 9, 9, 9], | 
|  | ) | 
|  |  | 
|  | @property | 
|  | def psdf(self): | 
|  | return ps.from_pandas(self.pdf) | 
|  |  | 
|  | def test_index_unique(self): | 
|  | psidx = self.psdf.index | 
|  |  | 
|  | # here the output is different than pandas in terms of order | 
|  | expected = [0, 1, 3, 5, 6, 8, 9] | 
|  |  | 
|  | self.assert_eq(expected, sorted(psidx.unique()._to_pandas())) | 
|  | self.assert_eq(expected, sorted(psidx.unique(level=0)._to_pandas())) | 
|  |  | 
|  | expected = [1, 2, 4, 6, 7, 9, 10] | 
|  | self.assert_eq(expected, sorted((psidx + 1).unique()._to_pandas())) | 
|  |  | 
|  | with self.assertRaisesRegex(IndexError, "Too many levels*"): | 
|  | psidx.unique(level=1) | 
|  |  | 
|  | with self.assertRaisesRegex(KeyError, "Requested level (hi)*"): | 
|  | psidx.unique(level="hi") | 
|  |  | 
|  | def test_unique(self): | 
|  | pidx = pd.Index(["a", "b", "a"]) | 
|  | psidx = ps.from_pandas(pidx) | 
|  |  | 
|  | self.assert_eq(psidx.unique().sort_values(), pidx.unique().sort_values()) | 
|  | self.assert_eq(psidx.unique().sort_values(), pidx.unique().sort_values()) | 
|  |  | 
|  | pmidx = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("x", "a")]) | 
|  | psmidx = ps.from_pandas(pmidx) | 
|  |  | 
|  | self.assert_eq(psmidx.unique().sort_values(), pmidx.unique().sort_values()) | 
|  | self.assert_eq(psmidx.unique().sort_values(), pmidx.unique().sort_values()) | 
|  |  | 
|  | with self.assertRaisesRegex( | 
|  | IndexError, "Too many levels: Index has only 1 level, -2 is not a valid level number" | 
|  | ): | 
|  | psidx.unique(level=-2) | 
|  |  | 
|  | def test_index_is_unique(self): | 
|  | indexes = [("a", "b", "c"), ("a", "a", "c"), (1, 3, 3), (1, 2, 3)] | 
|  | names = [None, "ks", "ks", None] | 
|  | is_uniq = [True, False, False, True] | 
|  |  | 
|  | for idx, name, expected in zip(indexes, names, is_uniq): | 
|  | pdf = pd.DataFrame({"a": [1, 2, 3]}, index=pd.Index(idx, name=name)) | 
|  | psdf = ps.from_pandas(pdf) | 
|  |  | 
|  | self.assertEqual(psdf.index.is_unique, expected) | 
|  |  | 
|  | def test_multiindex_is_unique(self): | 
|  | indexes = [ | 
|  | [list("abc"), list("edf")], | 
|  | [list("aac"), list("edf")], | 
|  | [list("aac"), list("eef")], | 
|  | [[1, 4, 4], [4, 6, 6]], | 
|  | ] | 
|  | is_uniq = [True, True, False, False] | 
|  |  | 
|  | for idx, expected in zip(indexes, is_uniq): | 
|  | pdf = pd.DataFrame({"a": [1, 2, 3]}, index=idx) | 
|  | psdf = ps.from_pandas(pdf) | 
|  |  | 
|  | self.assertEqual(psdf.index.is_unique, expected) | 
|  |  | 
|  | def test_index_nunique(self): | 
|  | pidx = pd.Index([1, 1, 2, None]) | 
|  | psidx = ps.from_pandas(pidx) | 
|  |  | 
|  | self.assert_eq(pidx.nunique(), psidx.nunique()) | 
|  | self.assert_eq(pidx.nunique(dropna=True), psidx.nunique(dropna=True)) | 
|  |  | 
|  | def test_multiindex_nunique(self): | 
|  | psidx = ps.MultiIndex.from_tuples([("a", "x", 1), ("b", "y", 2), ("c", "z", 3)]) | 
|  | with self.assertRaisesRegex(NotImplementedError, "notna is not defined for MultiIndex"): | 
|  | psidx.notnull() | 
|  |  | 
|  | def test_multi_index_nunique(self): | 
|  | tuples = [(1, "red"), (1, "blue"), (2, "red"), (2, "green")] | 
|  | pmidx = pd.MultiIndex.from_tuples(tuples) | 
|  | psmidx = ps.from_pandas(pmidx) | 
|  |  | 
|  | with self.assertRaisesRegex(NotImplementedError, "nunique is not defined for MultiIndex"): | 
|  | psmidx.nunique() | 
|  |  | 
|  | def test_index_has_duplicates(self): | 
|  | indexes = [("a", "b", "c"), ("a", "a", "c"), (1, 3, 3), (1, 2, 3)] | 
|  | names = [None, "ks", "ks", None] | 
|  | has_dup = [False, True, True, False] | 
|  |  | 
|  | for idx, name, expected in zip(indexes, names, has_dup): | 
|  | pdf = pd.DataFrame({"a": [1, 2, 3]}, index=pd.Index(idx, name=name)) | 
|  | psdf = ps.from_pandas(pdf) | 
|  |  | 
|  | self.assertEqual(psdf.index.has_duplicates, expected) | 
|  |  | 
|  | def test_multiindex_has_duplicates(self): | 
|  | indexes = [ | 
|  | [list("abc"), list("edf")], | 
|  | [list("aac"), list("edf")], | 
|  | [list("aac"), list("eef")], | 
|  | [[1, 4, 4], [4, 6, 6]], | 
|  | ] | 
|  | has_dup = [False, False, True, True] | 
|  |  | 
|  | for idx, expected in zip(indexes, has_dup): | 
|  | pdf = pd.DataFrame({"a": [1, 2, 3]}, index=idx) | 
|  | psdf = ps.from_pandas(pdf) | 
|  |  | 
|  | self.assertEqual(psdf.index.has_duplicates, expected) | 
|  |  | 
|  |  | 
|  | class UniqueTests( | 
|  | UniqueMixin, | 
|  | PandasOnSparkTestCase, | 
|  | SQLTestUtils, | 
|  | ): | 
|  | pass | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | from pyspark.pandas.tests.indexes.test_unique import *  # noqa: F401 | 
|  |  | 
|  | try: | 
|  | import xmlrunner | 
|  |  | 
|  | testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) | 
|  | except ImportError: | 
|  | testRunner = None | 
|  | unittest.main(testRunner=testRunner, verbosity=2) |