blob: 20ce29c93eab7b07c57f9714a60c291b6c56f5e0 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from pyspark.errors import AnalysisException
from pyspark.sql.tests.arrow.test_arrow import ArrowTestsMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.testing.pandasutils import PandasOnSparkTestUtils
class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase, PandasOnSparkTestUtils):
@unittest.skip("Spark Connect does not support fallback.")
def test_createDataFrame_fallback_disabled(self):
super().test_createDataFrame_fallback_disabled()
@unittest.skip("Spark Connect does not support fallback.")
def test_createDataFrame_fallback_enabled(self):
super().test_createDataFrame_fallback_enabled()
def test_createDataFrame_pandas_with_map_type(self):
self.check_createDataFrame_pandas_with_map_type(True)
def test_createDataFrame_pandas_with_struct_type(self):
self.check_createDataFrame_pandas_with_struct_type(True)
def test_createDataFrame_with_ndarray(self):
self.check_createDataFrame_with_ndarray(True)
@unittest.skip("Spark Connect does not support RDD but the tests depend on them.")
def test_no_partition_frame(self):
super().test_no_partition_frame()
@unittest.skip("Spark Connect does not support RDD but the tests depend on them.")
def test_no_partition_toPandas(self):
super().test_no_partition_toPandas()
def test_pandas_self_destruct(self):
df = self.spark.range(100).select("id", "id", "id")
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
self_destruct_pdf = df.toPandas()
with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):
no_self_destruct_pdf = df.toPandas()
self.assert_eq(self_destruct_pdf, no_self_destruct_pdf)
@unittest.skip("Spark Connect does not support RDD but the tests depend on them.")
def test_toPandas_batch_order(self):
super().test_toPandas_batch_order()
def test_toPandas_empty_df_arrow_enabled(self):
self.check_toPandas_empty_df_arrow_enabled(True)
def test_create_data_frame_to_pandas_timestamp_ntz(self):
self.check_create_data_frame_to_pandas_timestamp_ntz(True)
def test_create_data_frame_to_pandas_day_time_internal(self):
self.check_create_data_frame_to_pandas_day_time_internal(True)
def test_createDataFrame_pandas_respect_session_timezone(self):
self.check_createDataFrame_pandas_respect_session_timezone(True)
def test_toPandas_respect_session_timezone(self):
self.check_toPandas_respect_session_timezone(True)
def test_cached_local_relation_changing_values(self):
self.check_cached_local_relation_changing_values()
def test_large_cached_local_relation_same_values(self):
self.check_large_cached_local_relation_same_values()
def test_large_local_relation_size_limit_exceeded(self):
import random
import string
conf_key = "spark.sql.session.localRelationSizeLimit"
original_limit = self.spark.conf.get(conf_key)
try:
new_limit = 50 * 1024 * 1024
self.spark.conf.set(conf_key, new_limit)
row_size = 1000
row_count = 64 * 1000
suffix = "abcdef"
str_value = (
"".join(random.choices(string.ascii_letters + string.digits, k=row_size)) + suffix
)
data = [(i, str_value) for i in range(row_count)]
with self.assertRaisesRegex(
AnalysisException, f"LOCAL_RELATION_SIZE_LIMIT_EXCEEDED.*{new_limit}"
):
df = self.spark.createDataFrame(data, ["col1", "col2"])
df.count()
finally:
self.spark.conf.set(conf_key, original_limit)
def test_toPandas_with_array_type(self):
self.check_toPandas_with_array_type(True)
@unittest.skip("Spark Connect does not support fallback.")
def test_toPandas_fallback_disabled(self):
super().test_toPandas_fallback_disabled()
@unittest.skip("Spark Connect does not support fallback.")
def test_toPandas_fallback_enabled(self):
super().test_toPandas_fallback_enabled()
def test_toPandas_with_map_type(self):
self.check_toPandas_with_map_type(True)
def test_toPandas_with_map_type_nulls(self):
self.check_toPandas_with_map_type_nulls(True)
def test_createDataFrame_pandas_with_array_type(self):
self.check_createDataFrame_pandas_with_array_type(True)
def test_createDataFrame_pandas_with_int_col_names(self):
self.check_createDataFrame_pandas_with_int_col_names(True)
def test_timestamp_nat(self):
self.check_timestamp_nat(True)
def test_toPandas_error(self):
self.check_toPandas_error(True)
def test_toPandas_duplicate_field_names(self):
self.check_toPandas_duplicate_field_names(True)
def test_createDataFrame_pandas_duplicate_field_names(self):
self.check_createDataFrame_pandas_duplicate_field_names(True)
def test_toPandas_empty_rows(self):
self.check_toPandas_empty_rows(True)
def test_toPandas_empty_columns(self):
self.check_toPandas_empty_columns(True)
def test_createDataFrame_pandas_nested_timestamp(self):
self.check_createDataFrame_pandas_nested_timestamp(True)
def test_toPandas_nested_timestamp(self):
self.check_toPandas_nested_timestamp(True)
def test_toPandas_timestmap_tzinfo(self):
self.check_toPandas_timestmap_tzinfo(True)
def test_createDataFrame_udt(self):
self.check_createDataFrame_udt(True)
def test_toPandas_udt(self):
self.check_toPandas_udt(True)
def test_create_dataframe_namedtuples(self):
self.check_create_dataframe_namedtuples(True)
if __name__ == "__main__":
from pyspark.testing import main
main()