blob: db528c27feadf09da1db990792729d155fe5914e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from typing import cast
from pyspark.sql.types import (
ArrayType,
IntegerType,
MapType,
StringType,
StructType,
Row,
)
from pyspark.testing.sqlutils import (
have_pandas,
have_pyarrow,
pandas_requirement_message,
pyarrow_requirement_message,
)
if have_pandas:
import pandas as pd
import numpy as np
from pandas.testing import assert_series_equal
from pyspark.sql.pandas.types import _create_converter_from_pandas, _create_converter_to_pandas
if have_pyarrow:
import pyarrow as pa # noqa: F401
@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
class ConverterTests(unittest.TestCase):
def test_converter_to_pandas_array(self):
# _element_conv is None
conv = _create_converter_to_pandas(ArrayType(IntegerType()))
pser = pd.Series([[1, 2, 3, None], np.array([4, 5, None]), None])
self.assertIs(conv(pser), pser)
# _element_conv is not None
conv = _create_converter_to_pandas(
ArrayType(StructType().add("a", IntegerType())), struct_in_pandas="dict"
)
pser = pd.Series([[Row(a=1), Row(a=2), None], np.array([{"a": 3}, None]), None])
assert_series_equal(
conv(pser), pd.Series([[{"a": 1}, {"a": 2}, None], np.array([{"a": 3}, None]), None])
)
# ndarray_as_list=True
# _element_conv is None
conv = _create_converter_to_pandas(ArrayType(IntegerType()), ndarray_as_list=True)
pser = pd.Series([[1, 2, 3, None], np.array([4, 5, None]), None])
assert_series_equal(conv(pser), pd.Series([[1, 2, 3, None], [4, 5, None], None]))
# _element_conv is not None
conv = _create_converter_to_pandas(
ArrayType(StructType().add("a", IntegerType())),
struct_in_pandas="dict",
ndarray_as_list=True,
)
pser = pd.Series([[Row(a=1), Row(a=2), None], np.array([{"a": 3}, None]), None])
assert_series_equal(
conv(pser), pd.Series([[{"a": 1}, {"a": 2}, None], [{"a": 3}, None], None])
)
def test_converter_from_pandas_array(self):
# _element_conv is None
conv = _create_converter_from_pandas(ArrayType(IntegerType()))
pser = pd.Series([[1, 2, 3, None], np.array([4, 5, None]), None])
assert_series_equal(
conv(pser),
pd.Series([[1, 2, 3, None], [4, 5, None], None]),
)
# _element_conv is not None
conv = _create_converter_from_pandas(ArrayType(StructType().add("a", IntegerType())))
pser = pd.Series(
[[{"a": 1}, {"a": 2}, {"a": 3}, None], np.array([{"a": 4}, {"a": 5}, None]), None]
)
assert_series_equal(
conv(pser),
pd.Series([[{"a": 1}, {"a": 2}, {"a": 3}, None], [{"a": 4}, {"a": 5}, None], None]),
)
# ignore_unexpected_complex_type_values=True
# _element_conv is None
conv = _create_converter_from_pandas(
ArrayType(IntegerType()), ignore_unexpected_complex_type_values=True
)
pser = pd.Series([[1, 2, 3, None], np.array([4, 5, None]), None, 100])
assert_series_equal(conv(pser), pd.Series([[1, 2, 3, None], [4, 5, None], None, 100]))
# _element_conv is not None
conv = _create_converter_from_pandas(
ArrayType(StructType().add("a", IntegerType())),
ignore_unexpected_complex_type_values=True,
)
pser = pd.Series(
[[{"a": 1}, {"a": 2}, {"a": 3}, None], np.array([{"a": 4}, {"a": 5}, None]), None, 100]
)
assert_series_equal(
conv(pser),
pd.Series(
[[{"a": 1}, {"a": 2}, {"a": 3}, None], [{"a": 4}, {"a": 5}, None], None, 100]
),
)
def test_converter_to_pandas_map(self):
# _key_conv is None and _value_conv is None
conv = _create_converter_to_pandas(MapType(StringType(), IntegerType()))
pser = pd.Series(
[[("x", 1), ("y", 2), ("z", None), (None, 3)], {"x": 4, "y": None, None: 5}, None]
)
assert_series_equal(
conv(pser),
pd.Series([{"x": 1, "y": 2, "z": None, None: 3}, {"x": 4, "y": None, None: 5}, None]),
)
# _key_conv is None and _value_conv is not None
conv = _create_converter_to_pandas(
MapType(StringType(), StructType().add("a", IntegerType())), struct_in_pandas="row"
)
pser = pd.Series(
[
[("x", Row(a=1)), ("y", Row(a=2)), ("z", None), (None, Row(a=3))],
{"x": Row(a=4), "y": None, None: Row(a=5)},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": Row(a=1), "y": Row(a=2), "z": None, None: Row(a=3)},
{"x": Row(a=4), "y": None, None: Row(a=5)},
None,
]
),
)
# _key_conv is not None and _value_conv is None
conv = _create_converter_to_pandas(
MapType(StructType().add("a", StringType()), IntegerType()), struct_in_pandas="row"
)
pser = pd.Series(
[
[(Row(a="x"), 1), (Row(a="y"), 2), (Row(a="z"), None), (None, 3)],
{Row(a="x"): 4, Row(a="y"): None, None: 5},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{Row(a="x"): 1, Row(a="y"): 2, Row(a="z"): None, None: 3},
{Row(a="x"): 4, Row(a="y"): None, None: 5},
None,
]
),
)
def test_converter_from_pandas_map(self):
# _key_conv is None and _value_conv is None
conv = _create_converter_from_pandas(MapType(StringType(), IntegerType()))
pser = pd.Series([{"x": 1, "y": 2, "z": None, None: 3}, {"x": 4, "y": None, None: 5}, None])
assert_series_equal(
conv(pser),
pd.Series(
[
[("x", 1), ("y", 2), ("z", None), (None, 3)],
[("x", 4), ("y", None), (None, 5)],
None,
]
),
)
# _key_conv is None and _value_conv is not None
conv = _create_converter_from_pandas(
MapType(StringType(), StructType().add("a", IntegerType()))
)
pser = pd.Series(
[
{"x": Row(a=1), "y": Row(a=2), "z": None, None: Row(a=3)},
{"x": Row(a=4), "y": None, None: Row(a=5)},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
[("x", {"a": 1}), ("y", {"a": 2}), ("z", None), (None, {"a": 3})],
[("x", {"a": 4}), ("y", None), (None, {"a": 5})],
None,
]
),
)
# _key_conv is None and _value_conv is not None
conv = _create_converter_from_pandas(
MapType(StructType().add("a", StringType()), IntegerType())
)
pser = pd.Series(
[
{Row(a="x"): 1, Row(a="y"): 2, Row(a="z"): None, None: 3},
{Row(a="x"): 4, Row(a="y"): None, None: 5},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
[({"a": "x"}, 1), ({"a": "y"}, 2), ({"a": "z"}, None), (None, 3)],
[({"a": "x"}, 4), ({"a": "y"}, None), (None, 5)],
None,
]
),
)
# ignore_unexpected_complex_type_values=True
# _key_conv is None and _value_conv is None
conv = _create_converter_from_pandas(
MapType(StringType(), IntegerType()), ignore_unexpected_complex_type_values=True
)
pser = pd.Series(
[{"x": 1, "y": 2, "z": None, None: 3}, {"x": 4, "y": None, None: 5}, None, 100]
)
assert_series_equal(
conv(pser),
pd.Series(
[
[("x", 1), ("y", 2), ("z", None), (None, 3)],
[("x", 4), ("y", None), (None, 5)],
None,
100,
]
),
)
# _key_conv is None and _value_conv is not None
conv = _create_converter_from_pandas(
MapType(StringType(), StructType().add("a", IntegerType())),
ignore_unexpected_complex_type_values=True,
)
pser = pd.Series(
[
{"x": Row(a=1), "y": Row(a=2), "z": None, None: Row(a=3)},
{"x": Row(a=4), "y": None, None: Row(a=5)},
None,
100,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
[("x", {"a": 1}), ("y", {"a": 2}), ("z", None), (None, {"a": 3})],
[("x", {"a": 4}), ("y", None), (None, {"a": 5})],
None,
100,
]
),
)
# _key_conv is None and _value_conv is not None
conv = _create_converter_from_pandas(
MapType(StructType().add("a", StringType()), IntegerType()),
ignore_unexpected_complex_type_values=True,
)
pser = pd.Series(
[
{Row(a="x"): 1, Row(a="y"): 2, Row(a="z"): None, None: 3},
{Row(a="x"): 4, Row(a="y"): None, None: 5},
None,
100,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
[({"a": "x"}, 1), ({"a": "y"}, 2), ({"a": "z"}, None), (None, 3)],
[({"a": "x"}, 4), ({"a": "y"}, None), (None, 5)],
None,
100,
]
),
)
def test_converter_to_pandas_struct(self):
# struct_in_pandas="row"
# all the convs are None
conv = _create_converter_to_pandas(
StructType().add("x", StringType()).add("y", IntegerType()), struct_in_pandas="row"
)
pser = pd.Series(
[
Row(x="a", y=1),
Row(x="b", y=2),
Row(x="c", y=None),
Row(x=None, y=3),
{"x": "d", "y": 4},
{"x": "e"},
{"y": 5},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
Row(x="a", y=1),
Row(x="b", y=2),
Row(x="c", y=None),
Row(x=None, y=3),
Row(x="d", y=4),
Row(x="e", y=None),
Row(x=None, y=5),
None,
]
),
)
# one of the convs is not None
conv = _create_converter_to_pandas(
StructType().add("x", StringType()).add("y", StructType().add("i", IntegerType())),
struct_in_pandas="row",
)
pser = pd.Series(
[
Row(x="a", y=Row(i=1)),
Row(x="b", y={"i": 2}),
Row(x="c", y=None),
Row(x=None, y=Row(i=3)),
{"x": "d", "y": Row(i=4)},
{"x": "e"},
{"y": {"i": 5}},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
Row(x="a", y=Row(i=1)),
Row(x="b", y=Row(i=2)),
Row(x="c", y=None),
Row(x=None, y=Row(i=3)),
Row(x="d", y=Row(i=4)),
Row(x="e", y=None),
Row(x=None, y=Row(i=5)),
None,
]
),
)
# struct_in_pandas="dict"
# all the convs are None
conv = _create_converter_to_pandas(
StructType().add("x", StringType()).add("y", IntegerType()), struct_in_pandas="dict"
)
pser = pd.Series(
[
Row(x="a", y=1),
Row(x="b", y=2),
Row(x="c", y=None),
Row(x=None, y=3),
{"x": "d", "y": 4},
{"x": "e"},
{"y": 5},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": 1},
{"x": "b", "y": 2},
{"x": "c", "y": None},
{"x": None, "y": 3},
{"x": "d", "y": 4},
{"x": "e", "y": None},
{"x": None, "y": 5},
None,
]
),
)
# one of the convs is not None
conv = _create_converter_to_pandas(
StructType().add("x", StringType()).add("y", StructType().add("i", IntegerType())),
struct_in_pandas="dict",
)
pser = pd.Series(
[
Row(x="a", y=Row(i=1)),
Row(x="b", y={"i": 2}),
Row(x="c", y=None),
Row(x=None, y=Row(i=3)),
{"x": "d", "y": Row(i=4)},
{"x": "e"},
{"y": {"i": 5}},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": {"i": 1}},
{"x": "b", "y": {"i": 2}},
{"x": "c", "y": None},
{"x": None, "y": {"i": 3}},
{"x": "d", "y": {"i": 4}},
{"x": "e", "y": None},
{"x": None, "y": {"i": 5}},
None,
]
),
)
def test_converter_from_pandas_struct(self):
# all the convs are None
conv = _create_converter_from_pandas(
StructType().add("x", StringType()).add("y", IntegerType())
)
pser = pd.Series(
[
Row(x="a", y=1),
Row(x="b", y=2),
Row(x="c", y=None),
Row(x=None, y=3),
{"x": "d", "y": 4},
{"x": "e"},
{"y": 5},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": 1},
{"x": "b", "y": 2},
{"x": "c", "y": None},
{"x": None, "y": 3},
{"x": "d", "y": 4},
{"x": "e", "y": None},
{"x": None, "y": 5},
None,
]
),
)
# one of the convs is not None
conv = _create_converter_from_pandas(
StructType().add("x", StringType()).add("y", StructType().add("i", IntegerType()))
)
pser = pd.Series(
[
Row(x="a", y=Row(i=1)),
Row(x="b", y={"i": 2}),
Row(x="c", y=None),
Row(x=None, y=Row(i=3)),
{"x": "d", "y": Row(i=4)},
{"x": "e"},
{"y": {"i": 5}},
None,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": {"i": 1}},
{"x": "b", "y": {"i": 2}},
{"x": "c", "y": None},
{"x": None, "y": {"i": 3}},
{"x": "d", "y": {"i": 4}},
{"x": "e", "y": None},
{"x": None, "y": {"i": 5}},
None,
]
),
)
# ignore_unexpected_complex_type_values=True
# all the convs are None
conv = _create_converter_from_pandas(
StructType().add("x", StringType()).add("y", IntegerType()),
ignore_unexpected_complex_type_values=True,
)
pser = pd.Series(
[
Row(x="a", y=1),
Row(x="b", y=2),
Row(x="c", y=None),
Row(x=None, y=3),
{"x": "d", "y": 4},
{"x": "e"},
{"y": 5},
None,
100,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": 1},
{"x": "b", "y": 2},
{"x": "c", "y": None},
{"x": None, "y": 3},
{"x": "d", "y": 4},
{"x": "e", "y": None},
{"x": None, "y": 5},
None,
100,
]
),
)
# one of the convs is not None
conv = _create_converter_from_pandas(
StructType().add("x", StringType()).add("y", StructType().add("i", IntegerType())),
ignore_unexpected_complex_type_values=True,
)
pser = pd.Series(
[
Row(x="a", y=Row(i=1)),
Row(x="b", y={"i": 2}),
Row(x="c", y=None),
Row(x=None, y=Row(i=3)),
{"x": "d", "y": Row(i=4)},
{"x": "e"},
{"y": {"i": 5}},
None,
100,
]
)
assert_series_equal(
conv(pser),
pd.Series(
[
{"x": "a", "y": {"i": 1}},
{"x": "b", "y": {"i": 2}},
{"x": "c", "y": None},
{"x": None, "y": {"i": 3}},
{"x": "d", "y": {"i": 4}},
{"x": "e", "y": None},
{"x": None, "y": {"i": 5}},
None,
100,
]
),
)
if __name__ == "__main__":
from pyspark.sql.tests.pandas.test_converter import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)