blob: a82c98ef7924ac909909d3ce33f6d7ed39731bad [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pandas as pd
import numpy as np
from pyspark import pandas as ps
from pyspark.pandas.config import set_option, reset_option
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
class DiffFramesBasicSlowMixin:
@classmethod
def setUpClass(cls):
super().setUpClass()
set_option("compute.ops_on_diff_frames", True)
@classmethod
def tearDownClass(cls):
reset_option("compute.ops_on_diff_frames")
super().tearDownClass()
def test_update(self):
pdf = pd.DataFrame({"x": [1, 2, 3], "y": [10, 20, 30]})
psdf = ps.from_pandas(pdf)
pser = pdf.x
psser = psdf.x
pser.update(pd.Series([4, 5, 6]))
psser.update(ps.Series([4, 5, 6]))
self.assert_eq(psser.sort_index(), pser.sort_index())
self.assert_eq(psdf.sort_index(), pdf.sort_index())
pser1 = pd.Series([None, 2, 3, 4, 5, 6, 7, 8, None])
pser2 = pd.Series([None, 5, None, 3, 2, 1, None, 0, 0])
psser1 = ps.from_pandas(pser1)
psser2 = ps.from_pandas(pser2)
pser1.update(pser2)
psser1.update(psser2)
self.assert_eq(psser1.sort_index(), pser1)
def test_where(self):
pdf1 = pd.DataFrame({"A": [0, 1, 2, 3, 4], "B": [100, 200, 300, 400, 500]})
pdf2 = pd.DataFrame({"A": [0, -1, -2, -3, -4], "B": [-100, -200, -300, -400, -500]})
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.where(pdf2 > 100), psdf1.where(psdf2 > 100).sort_index())
pdf1 = pd.DataFrame({"A": [-1, -2, -3, -4, -5], "B": [-100, -200, -300, -400, -500]})
pdf2 = pd.DataFrame({"A": [-10, -20, -30, -40, -50], "B": [-5, -4, -3, -2, -1]})
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.where(pdf2 < -250), psdf1.where(psdf2 < -250).sort_index())
# multi-index columns
pdf1 = pd.DataFrame({("X", "A"): [0, 1, 2, 3, 4], ("X", "B"): [100, 200, 300, 400, 500]})
pdf2 = pd.DataFrame(
{("X", "A"): [0, -1, -2, -3, -4], ("X", "B"): [-100, -200, -300, -400, -500]}
)
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.where(pdf2 > 100), psdf1.where(psdf2 > 100).sort_index())
def test_mask(self):
pdf1 = pd.DataFrame({"A": [0, 1, 2, 3, 4], "B": [100, 200, 300, 400, 500]})
pdf2 = pd.DataFrame({"A": [0, -1, -2, -3, -4], "B": [-100, -200, -300, -400, -500]})
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.mask(pdf2 < 100), psdf1.mask(psdf2 < 100).sort_index())
pdf1 = pd.DataFrame({"A": [-1, -2, -3, -4, -5], "B": [-100, -200, -300, -400, -500]})
pdf2 = pd.DataFrame({"A": [-10, -20, -30, -40, -50], "B": [-5, -4, -3, -2, -1]})
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.mask(pdf2 > -250), psdf1.mask(psdf2 > -250).sort_index())
# multi-index columns
pdf1 = pd.DataFrame({("X", "A"): [0, 1, 2, 3, 4], ("X", "B"): [100, 200, 300, 400, 500]})
pdf2 = pd.DataFrame(
{("X", "A"): [0, -1, -2, -3, -4], ("X", "B"): [-100, -200, -300, -400, -500]}
)
psdf1 = ps.from_pandas(pdf1)
psdf2 = ps.from_pandas(pdf2)
self.assert_eq(pdf1.mask(pdf2 < 100), psdf1.mask(psdf2 < 100).sort_index())
def test_to_series_comparison(self):
psidx1 = ps.Index([1, 2, 3, 4, 5])
psidx2 = ps.Index([1, 2, 3, 4, 5])
self.assert_eq((psidx1.to_series() == psidx2.to_series()).all(), True)
psidx1.name = "koalas"
psidx2.name = "koalas"
self.assert_eq((psidx1.to_series() == psidx2.to_series()).all(), True)
def test_pow_and_rpow(self):
pser = pd.Series([1, 2, np.nan])
psser = ps.from_pandas(pser)
pser_other = pd.Series([np.nan, 2, 3])
psser_other = ps.from_pandas(pser_other)
self.assert_eq(pser.pow(pser_other), psser.pow(psser_other).sort_index())
self.assert_eq(pser**pser_other, (psser**psser_other).sort_index())
self.assert_eq(pser.rpow(pser_other), psser.rpow(psser_other).sort_index())
def test_shift(self):
pdf = pd.DataFrame(
{
"Col1": [10, 20, 15, 30, 45],
"Col2": [13, 23, 18, 33, 48],
"Col3": [17, 27, 22, 37, 52],
},
index=np.random.rand(5),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.shift().loc[pdf["Col1"] == 20].astype(int), psdf.shift().loc[psdf["Col1"] == 20]
)
self.assert_eq(
pdf["Col2"].shift().loc[pdf["Col1"] == 20].astype(int),
psdf["Col2"].shift().loc[psdf["Col1"] == 20],
)
def test_diff(self):
pdf = pd.DataFrame(
{
"Col1": [10, 20, 15, 30, 45],
"Col2": [13, 23, 18, 33, 48],
"Col3": [17, 27, 22, 37, 52],
},
index=np.random.rand(5),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(
pdf.diff().loc[pdf["Col1"] == 20].astype(int), psdf.diff().loc[psdf["Col1"] == 20]
)
self.assert_eq(
pdf["Col2"].diff().loc[pdf["Col1"] == 20].astype(int),
psdf["Col2"].diff().loc[psdf["Col1"] == 20],
)
def test_rank(self):
pdf = pd.DataFrame(
{
"Col1": [10, 20, 15, 30, 45],
"Col2": [13, 23, 18, 33, 48],
"Col3": [17, 27, 22, 37, 52],
},
index=np.random.rand(5),
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.rank().loc[pdf["Col1"] == 20], psdf.rank().loc[psdf["Col1"] == 20])
self.assert_eq(
pdf["Col2"].rank().loc[pdf["Col1"] == 20], psdf["Col2"].rank().loc[psdf["Col1"] == 20]
)
class DiffFramesBasicSlowTests(DiffFramesBasicSlowMixin, PandasOnSparkTestCase, SQLTestUtils):
pass
if __name__ == "__main__":
import unittest
from pyspark.pandas.tests.diff_frames_ops.test_basic_slow import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)