blob: 2e560c013417c90747cda646fe66a87d525bea04 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
import unittest
import warnings
import numpy as np
import pandas as pd
import pytest
from parameterized import parameterized
import apache_beam as beam
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import frames
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.doctests import teststring
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive.testing.mock_env import isolated_env
# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
GROUPBY_DF = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'foo': [None if i % 11 == 0 else i for i in range(100)],
'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],
'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],
'bool': [i % 17 == 0 for i in range(100)],
'str': [str(i) for i in range(100)],
})
if PD_VERSION < (2, 0):
# All these are things that are fixed in the Pandas 2 transition.
pytestmark = pytest.mark.filterwarnings("ignore::FutureWarning")
def _get_deferred_args(*args):
return [
frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(arg, arg[0:0])) for arg in args
]
class _AbstractFrameTest(unittest.TestCase):
"""Test sub-class with utilities for verifying DataFrame operations."""
def _run_error_test(
self, func, *args, construction_time=True, distributed=True):
"""Verify that func(*args) raises the same exception in pandas and in Beam.
Note that by default this only checks for exceptions that the Beam DataFrame
API raises during expression generation (i.e. construction time).
Exceptions raised while the pipeline is executing are less helpful, but
are sometimes unavoidable (e.g. data validation exceptions), to check for
these exceptions use construction_time=False."""
deferred_args = _get_deferred_args(*args)
# Get expected error
try:
expected = func(*args)
except Exception as e:
expected_error = e
else:
raise AssertionError(
"Expected an error, but executing with pandas successfully "
f"returned:\n{expected}")
# Get actual error
if construction_time:
try:
_ = func(*deferred_args)._expr
except Exception as e:
actual = e
else:
raise AssertionError(
f"Expected an error:\n{expected_error}\nbut Beam successfully "
f"generated an expression.")
else: # not construction_time
# Check for an error raised during pipeline execution
expr = func(*deferred_args)._expr
session_type = (
expressions.PartitioningSession
if distributed else expressions.Session)
try:
result = session_type({}).evaluate(expr)
except Exception as e:
actual = e
else:
raise AssertionError(
f"Expected an error:\n{expected_error}\nbut Beam successfully "
f"Computed the result:\n{result}.")
# Verify
if (not isinstance(actual, type(expected_error)) or
str(expected_error) not in str(actual)):
raise AssertionError(
f'Expected {expected_error!r} to be raised, but got {actual!r}'
) from actual
def _run_inplace_test(self, func, arg, **kwargs):
"""Verify an inplace operation performed by func.
Checks that func performs the same inplace operation on arg, in pandas and
in Beam."""
def wrapper(df):
df = df.copy()
func(df)
return df
self._run_test(wrapper, arg, **kwargs)
def _run_test(
self,
func,
*args,
distributed=True,
nonparallel=False,
check_proxy=True,
lenient_dtype_check=False):
"""Verify that func(*args) produces the same result in pandas and in Beam.
Args:
distributed (bool): Whether or not to use PartitioningSession to
simulate parallel execution.
nonparallel (bool): Whether or not this function contains a
non-parallelizable operation. If True, the expression will be
generated twice, once outside of an allow_non_parallel_operations
block (to verify NonParallelOperation is raised), and again inside
of an allow_non_parallel_operations block to actually generate an
expression to verify.
check_proxy (bool): Whether or not to check that the proxy of the
generated expression matches the actual result, defaults to True.
This option should NOT be set to False in tests added for new
operations if at all possible. Instead make sure the new operation
produces the correct proxy. This flag only exists as an escape hatch
until existing failures can be addressed
(https://github.com/apache/beam/issues/20926).
lenient_dtype_check (bool): Whether or not to check that numeric columns
are still numeric between actual and proxy. i.e. verify that they
are at least int64 or float64, and not necessarily have the exact
same dtype. This may need to be set to True for some non-deferred
operations, where the dtype of the values in the proxy are not known
ahead of time, causing int64 to float64 coercion issues.
"""
# Compute expected value
expected = func(*args)
# Compute actual value
deferred_args = _get_deferred_args(*args)
if nonparallel:
# First run outside a nonparallel block to confirm this raises as expected
with self.assertRaises(expressions.NonParallelOperation) as raised:
func(*deferred_args)
if raised.exception.msg.startswith(
"Encountered non-parallelizable form of"):
raise AssertionError(
"Default NonParallelOperation raised, please specify a reason in "
"the Singleton() partitioning requirement for this operation."
) from raised.exception
# Re-run in an allow non parallel block to get an expression to verify
with beam.dataframe.allow_non_parallel_operations():
expr = func(*deferred_args)._expr
else:
expr = func(*deferred_args)._expr
# Compute the result of the generated expression
session_type = (
expressions.PartitioningSession if distributed else expressions.Session)
actual = session_type({}).evaluate(expr)
# Verify
if isinstance(expected, pd.core.generic.NDFrame):
if distributed:
if expected.index.is_unique:
expected = expected.sort_index()
actual = actual.sort_index()
elif isinstance(expected, pd.Series):
expected = expected.sort_values()
actual = actual.sort_values()
else:
expected = expected.sort_values(list(expected.columns))
actual = actual.sort_values(list(actual.columns))
if isinstance(expected, pd.Series):
if lenient_dtype_check:
pd.testing.assert_series_equal(
expected.astype('Float64'), actual.astype('Float64'))
else:
pd.testing.assert_series_equal(expected, actual)
elif isinstance(expected, pd.DataFrame):
if lenient_dtype_check:
pd.testing.assert_frame_equal(
expected.astype('Float64'), actual.astype('Float64'))
else:
pd.testing.assert_frame_equal(expected, actual)
else:
raise ValueError(
f"Expected value is a {type(expected)},"
"not a Series or DataFrame.")
else:
# Expectation is not a pandas object
if isinstance(expected, float):
if np.isnan(expected):
cmp = np.isnan
else:
cmp = lambda x: np.isclose(expected, x)
else:
cmp = lambda x: x == expected
self.assertTrue(
cmp(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
if check_proxy:
# Verify that the actual result agrees with the proxy
proxy = expr.proxy()
if type(actual) in (np.float32, np.float64):
self.assertTrue(type(actual) == type(proxy) or np.isnan(proxy))
else:
self.assertEqual(type(actual), type(proxy))
if isinstance(expected, pd.core.generic.NDFrame):
if isinstance(expected, pd.Series):
if lenient_dtype_check:
self.assertEqual(
actual.astype('Float64').dtype, proxy.astype('Float64').dtype)
else:
self.assertEqual(actual.dtype, proxy.dtype)
self.assertEqual(actual.name, proxy.name)
elif isinstance(expected, pd.DataFrame):
if lenient_dtype_check:
pd.testing.assert_series_equal(
actual.astype('Float64').dtypes, proxy.astype('Float64').dtypes)
else:
pd.testing.assert_series_equal(actual.dtypes, proxy.dtypes)
else:
raise ValueError(
f"Expected value is a {type(expected)},"
"not a Series or DataFrame.")
self.assertEqual(actual.index.names, proxy.index.names)
for i in range(actual.index.nlevels):
if lenient_dtype_check:
self.assertEqual(
actual.astype('Float64').index.get_level_values(i).dtype,
proxy.astype('Float64').index.get_level_values(i).dtype)
else:
self.assertEqual(
actual.index.get_level_values(i).dtype,
proxy.index.get_level_values(i).dtype)
class DeferredFrameTest(_AbstractFrameTest):
"""Miscellaneous tessts for DataFrame operations."""
def test_series_arithmetic(self):
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
self._run_test(lambda a, b: a - 2 * b, a, b)
self._run_test(lambda a, b: a.subtract(2).multiply(b).divide(a), a, b)
def test_dataframe_arithmetic(self):
df = pd.DataFrame({'a': [1, 2, 3], 'b': [100, 200, 300]})
df2 = pd.DataFrame({'a': [3000, 1000, 2000], 'b': [7, 11, 13]})
self._run_test(lambda df, df2: df - 2 * df2, df, df2)
self._run_test(
lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)
@unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")
def test_value_counts_dropna_false(self):
df = pd.DataFrame({
'first_name': ['John', 'Anne', 'John', 'Beth'],
'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']
})
# TODO(https://github.com/apache/beam/issues/21014): Remove the
# assertRaises this when the underlying bug in
# https://github.com/pandas-dev/pandas/issues/36470 is fixed.
with self.assertRaises(NotImplementedError):
self._run_test(lambda df: df.value_counts(dropna=False), df)
def test_get_column(self):
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_test(lambda df: df['Animal'], df)
self._run_test(lambda df: df.Speed, df)
self._run_test(lambda df: df.get('Animal'), df)
self._run_test(lambda df: df.get('FOO', df.Animal), df)
def test_series_xs(self):
# pandas doctests only verify DataFrame.xs, here we verify Series.xs as well
d = {
'num_legs': [4, 4, 2, 2],
'num_wings': [0, 0, 2, 2],
'class': ['mammal', 'mammal', 'mammal', 'bird'],
'animal': ['cat', 'dog', 'bat', 'penguin'],
'locomotion': ['walks', 'walks', 'flies', 'walks']
}
df = pd.DataFrame(data=d)
df = df.set_index(['class', 'animal', 'locomotion'])
self._run_test(lambda df: df.num_legs.xs('mammal'), df)
self._run_test(lambda df: df.num_legs.xs(('mammal', 'dog')), df)
self._run_test(lambda df: df.num_legs.xs('cat', level=1), df)
self._run_test(
lambda df: df.num_legs.xs(('bird', 'walks'), level=[0, 'locomotion']),
df)
def test_dataframe_xs(self):
# Test cases reported in BEAM-13421
df = pd.DataFrame(
np.array([
['state', 'day1', 12],
['state', 'day1', 1],
['state', 'day2', 14],
['county', 'day1', 9],
]),
columns=['provider', 'time', 'value'])
self._run_test(lambda df: df.xs('state'), df.set_index(['provider']))
self._run_test(
lambda df: df.xs('state'), df.set_index(['provider', 'time']))
def test_set_column(self):
def new_column(df):
df['NewCol'] = df['Speed']
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_inplace_test(new_column, df)
def test_set_column_from_index(self):
def new_column(df):
df['NewCol'] = df.index
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
'Speed': [380., 370., 24., 26.]
})
self._run_inplace_test(new_column, df)
def test_tz_with_utc_zone_set_explicitly(self):
test = """
>>> s = pd.Series(["1/1/2020 10:00:00+00:00", "2/1/2020 11:00:00+03:00"])
>>> s = pd.to_datetime(s, utc=True)
>>> s
0 2020-01-01 10:00:00+00:00
1 2020-02-01 08:00:00+00:00
dtype: datetime64[ns, UTC]
>>> s.dt.tz
datetime.timezone.utc
"""
teststring(test)
def test_tz_localize_ambiguous_series(self):
# This replicates a tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.Series(
range(3),
index=pd.DatetimeIndex([
'2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'
]))
ambiguous = pd.Series([True, True, False], index=s.index)
self._run_test(
lambda s, ambiguous: s.tz_localize('CET', ambiguous=ambiguous),
s,
ambiguous)
def test_tz_convert(self):
# This replicates a tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.Series(
range(3),
index=pd.DatetimeIndex([
'2018-10-27 01:20:00', '2018-10-27 02:36:00', '2018-10-27 03:46:00'
],
tz='Europe/Berlin'))
self._run_test(lambda s: s.tz_convert('America/Los_Angeles'), s)
def test_sort_index_columns(self):
df = pd.DataFrame({
'c': range(10),
'a': range(10),
'b': range(10),
np.nan: range(10),
})
self._run_test(lambda df: df.sort_index(axis=1), df)
self._run_test(lambda df: df.sort_index(axis=1, ascending=False), df)
self._run_test(lambda df: df.sort_index(axis=1, na_position='first'), df)
def test_where_callable_args(self):
df = pd.DataFrame(
np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])
self._run_test(
lambda df: df.where(lambda df: df % 2 == 0, lambda df: df * 10), df)
def test_where_concrete_args(self):
df = pd.DataFrame(
np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])
self._run_test(
lambda df: df.where(
df % 2 == 0, pd.Series({
'A': 123, 'B': 456
}), axis=1),
df)
def test_combine_dataframe(self):
df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(
lambda df, df2: df.combine(df2, take_smaller),
df,
df2,
nonparallel=True)
def test_combine_dataframe_fill(self):
df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2
self._run_test(
lambda df1, df2: df1.combine(df2, take_smaller, fill_value=-5),
df1,
df2,
nonparallel=True)
def test_combine_Series(self):
s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
self._run_test(
lambda s1, s2: s1.combine(s2, max),
s1,
s2,
nonparallel=True,
check_proxy=False)
def test_combine_first_dataframe(self):
df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})
self._run_test(lambda df1, df2: df1.combine_first(df2), df1, df2)
def test_combine_first_series(self):
s1 = pd.Series([1, np.nan])
s2 = pd.Series([3, 4])
self._run_test(lambda s1, s2: s1.combine_first(s2), s1, s2)
def test_add_prefix(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})
s = pd.Series([1, 2, 3, 4])
self._run_test(lambda df: df.add_prefix('col_'), df)
self._run_test(lambda s: s.add_prefix('col_'), s)
def test_add_suffix(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})
s = pd.Series([1, 2, 3, 4])
self._run_test(lambda df: df.add_suffix('_col'), df)
self._run_test(lambda s: s.add_prefix('_col'), s)
def test_set_index(self):
df = pd.DataFrame({
# [19, 18, ..]
'index1': reversed(range(20)), # [15, 16, .., 0, 1, .., 13, 14]
'index2': np.roll(range(20), 5), # ['', 'a', 'bb', ...]
'values': [chr(ord('a') + i) * i for i in range(20)],
})
self._run_test(lambda df: df.set_index(['index1', 'index2']), df)
self._run_test(lambda df: df.set_index(['index1', 'index2'], drop=True), df)
self._run_test(lambda df: df.set_index('values'), df)
self._run_error_test(lambda df: df.set_index('bad'), df)
self._run_error_test(
lambda df: df.set_index(['index2', 'bad', 'really_bad']), df)
def test_set_axis(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}, index=['X', 'Y', 'Z'])
self._run_test(lambda df: df.set_axis(['I', 'II'], axis='columns'), df)
self._run_test(lambda df: df.set_axis([0, 1], axis=1), df)
self._run_inplace_test(
lambda df: df.set_axis(['i', 'ii'], axis='columns'), df)
with self.assertRaises(NotImplementedError):
self._run_test(lambda df: df.set_axis(['a', 'b', 'c'], axis='index'), df)
self._run_test(lambda df: df.set_axis([0, 1, 2], axis=0), df)
def test_series_set_axis(self):
s = pd.Series(list(range(3)), index=['X', 'Y', 'Z'])
with self.assertRaises(NotImplementedError):
self._run_test(lambda s: s.set_axis(['a', 'b', 'c']), s)
self._run_test(lambda s: s.set_axis([1, 2, 3]), s)
def test_series_drop_ignore_errors(self):
midx = pd.MultiIndex(
levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],
codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])
s = pd.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)
# drop() requires singleton partitioning unless errors are ignored
# Add some additional tests here to make sure the implementation works in
# non-singleton partitioning.
self._run_test(lambda s: s.drop('lama', level=0, errors='ignore'), s)
self._run_test(lambda s: s.drop(('cow', 'speed'), errors='ignore'), s)
self._run_test(lambda s: s.drop('falcon', level=0, errors='ignore'), s)
def test_dataframe_drop_ignore_errors(self):
midx = pd.MultiIndex(
levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],
codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])
df = pd.DataFrame(
index=midx,
columns=['big', 'small'],
data=[[45, 30], [200, 100], [1.5, 1], [30, 20], [250, 150], [1.5, 0.8],
[320, 250], [1, 0.8], [0.3, 0.2]])
# drop() requires singleton partitioning unless errors are ignored
# Add some additional tests here to make sure the implementation works in
# non-singleton partitioning.
self._run_test(
lambda df: df.drop(index='lama', level=0, errors='ignore'), df)
self._run_test(
lambda df: df.drop(index=('cow', 'speed'), errors='ignore'), df)
self._run_test(
lambda df: df.drop(index='falcon', level=0, errors='ignore'), df)
self._run_test(
lambda df: df.drop(index='cow', columns='small', errors='ignore'), df)
def test_merge(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({
'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
})
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
self._run_test(
lambda df1, df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df1, df2: df1.merge(
df2, left_on='lkey', right_on='rkey', suffixes=('_left', '_right')).
rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_left_join(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
self._run_test(
lambda df1, df2: df1.merge(df2, how='left', on='a').rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_on_index(self):
# This is from the pandas doctests, but fails due to re-indexing being
# order-sensitive.
df1 = pd.DataFrame({
'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
}).set_index('lkey')
df2 = pd.DataFrame({
'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
}).set_index('rkey')
self._run_test(
lambda df1, df2: df1.merge(df2, left_index=True, right_index=True),
df1,
df2,
check_proxy=False)
def test_merge_same_key(self):
df1 = pd.DataFrame({
'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
})
df2 = pd.DataFrame({
'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
})
self._run_test(
lambda df1, df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df1, df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).
rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_same_key_doctest(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
self._run_test(
lambda df1, df2: df1.merge(df2, how='left', on='a').rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
# Test without specifying 'on'
self._run_test(
lambda df1, df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_merge_same_key_suffix_collision(self):
df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})
df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})
self._run_test(
lambda df1, df2: df1.merge(
df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
# Test without specifying 'on'
self._run_test(
lambda df1, df2: df1.merge(
df2, how='left', suffixes=('_lsuffix', '_rsuffix')).rename(
index=lambda x: '*'),
df1,
df2,
nonparallel=True,
check_proxy=False)
def test_swaplevel(self):
df = pd.DataFrame(
{"Grade": ["A", "B", "A", "C"]},
index=[
["Final exam", "Final exam", "Coursework", "Coursework"],
["History", "Geography", "History", "Geography"],
["January", "February", "March", "April"],
])
self._run_test(lambda df: df.swaplevel(), df)
def test_value_counts_with_nans(self):
# similar to doctests that verify value_counts, but include nan values to
# make sure we handle them correctly.
df = pd.DataFrame({
'num_legs': [2, 4, 4, 6, np.nan, np.nan],
'num_wings': [2, 0, 0, 0, np.nan, 2]
},
index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])
self._run_test(lambda df: df.value_counts(), df)
self._run_test(lambda df: df.value_counts(normalize=True), df)
# Ensure we don't drop rows due to nan values in unused columns.
self._run_test(lambda df: df.value_counts('num_wings'), df)
if PD_VERSION >= (1, 3):
# dropna=False is new in pandas 1.3
# TODO(https://github.com/apache/beam/issues/21014): Remove the
# assertRaises this when the underlying bug in
# https://github.com/pandas-dev/pandas/issues/36470 is fixed.
with self.assertRaises(NotImplementedError):
self._run_test(lambda df: df.value_counts(dropna=False), df)
# Test the defaults.
self._run_test(lambda df: df.num_wings.value_counts(), df)
self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)
self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)
# Test the combination interactions.
for normalize in (True, False):
for dropna in (True, False):
self._run_test(
lambda df, dropna=dropna, normalize=normalize: df.num_wings.
value_counts(dropna=dropna, normalize=normalize),
df)
def test_value_counts_does_not_support_sort(self):
df = pd.DataFrame({
'num_legs': [2, 4, 4, 6, np.nan, np.nan],
'num_wings': [2, 0, 0, 0, np.nan, 2]
},
index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])
with self.assertRaisesRegex(frame_base.WontImplementError,
r"value_counts\(sort\=True\)"):
self._run_test(lambda df: df.value_counts(sort=True), df)
with self.assertRaisesRegex(frame_base.WontImplementError,
r"value_counts\(sort\=True\)"):
self._run_test(lambda df: df.num_wings.value_counts(sort=True), df)
def test_series_getitem(self):
s = pd.Series([x**2 for x in range(10)])
self._run_test(lambda s: s[...], s)
self._run_test(lambda s: s[:], s)
self._run_test(lambda s: s[s < 10], s)
self._run_test(lambda s: s[lambda s: s < 10], s)
s.index = s.index.map(float)
self._run_test(lambda s: s[1.5:6], s)
def test_series_truncate(self):
s = pd.Series(['a', 'b', 'c', 'd', 'e', 'f'])
self._run_test(lambda s: s.truncate(before=1, after=3), s)
def test_dataframe_truncate(self):
df = pd.DataFrame({
'C': list('abcde'), 'B': list('fghij'), 'A': list('klmno')
},
index=[1, 2, 3, 4, 5])
self._run_test(lambda df: df.truncate(before=1, after=3), df)
self._run_test(lambda df: df.truncate(before='A', after='B', axis=1), df)
self._run_test(lambda df: df['A'].truncate(before=2, after=4), df)
@parameterized.expand([
(pd.Series(range(10)), ), # unique
(pd.Series(list(range(100)) + [0]), ), # non-unique int
(pd.Series(list(range(100)) + [0]) / 100, ), # non-unique flt
(pd.Series(['a', 'b', 'c', 'd']), ), # unique str
(pd.Series(['a', 'b', 'a', 'c', 'd']), ), # non-unique str
])
def test_series_is_unique(self, series):
self._run_test(lambda s: s.is_unique, series)
@parameterized.expand([
(pd.Series(range(10)), ), # False
(pd.Series([1, 2, np.nan, 3, np.nan]), ), # True
(pd.Series(['a', 'b', 'c', 'd', 'e']), ), # False
(pd.Series(['a', 'b', None, 'c', None]), ), # True
])
def test_series_hasnans(self, series):
self._run_test(lambda s: s.hasnans, series)
def test_dataframe_getitem(self):
df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})
self._run_test(lambda df: df['A'], df)
self._run_test(lambda df: df[['A', 'B']], df)
self._run_test(lambda df: df[:], df)
self._run_test(lambda df: df[df.A < 10], df)
df.index = df.index.map(float)
self._run_test(lambda df: df[1.5:4], df)
def test_loc(self):
dates = pd.date_range('1/1/2000', periods=8)
# TODO(https://github.com/apache/beam/issues/20765):
# We do not preserve the freq attribute on a DateTime index
dates.freq = None
df = pd.DataFrame(
np.arange(32).reshape((8, 4)),
index=dates,
columns=['A', 'B', 'C', 'D'])
self._run_test(lambda df: df.loc[:], df)
self._run_test(lambda df: df.loc[:, 'A'], df)
self._run_test(lambda df: df.loc[:dates[3]], df)
self._run_test(lambda df: df.loc[df.A > 10], df)
self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)
self._run_test(lambda df: df.C.loc[df.A > 10], df)
self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))
@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_sort(self):
# yapf: disable
df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},
columns=['int', 'str'],
index=[1, 3, 5])
df2 = pd.DataFrame({'int': [4, 5, 6], 'str': ['d', 'e', 'f']},
columns=['str', 'int'],
index=[2, 4, 6])
# yapf: enable
self._run_test(lambda df1, df2: df1.append(df2, sort=True), df1, df2)
self._run_test(lambda df1, df2: df1.append(df2, sort=False), df1, df2)
self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2)
self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2)
def test_smallest_largest(self):
df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df)
self._run_test(lambda df: df.nsmallest(3, 'A', keep='all'), df)
self._run_test(lambda df: df.nlargest(3, ['A', 'B'], keep='all'), df)
def test_series_cov_corr(self):
for s in [pd.Series([1, 2, 3]),
pd.Series(range(100)),
pd.Series([x**3 for x in range(-50, 50)])]:
self._run_test(lambda s: s.std(), s)
self._run_test(lambda s: s.var(), s)
self._run_test(lambda s: s.corr(s), s)
self._run_test(lambda s: s.corr(s + 1), s)
self._run_test(lambda s: s.corr(s * s), s)
self._run_test(lambda s: s.cov(s * s), s)
self._run_test(lambda s: s.skew(), s)
self._run_test(lambda s: s.kurtosis(), s)
self._run_test(lambda s: s.kurt(), s)
def test_dataframe_cov_corr(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
df.loc[df.index[:5], 'a'] = np.nan
df.loc[df.index[5:10], 'b'] = np.nan
self._run_test(lambda df: df.corr(), df)
self._run_test(lambda df: df.cov(), df)
self._run_test(lambda df: df.corr(min_periods=12), df)
self._run_test(lambda df: df.cov(min_periods=12), df)
self._run_test(lambda df: df.corrwith(df.a), df)
self._run_test(lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]), df)
df2 = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
self._run_test(
lambda df, df2: df.corrwith(df2, axis=1), df, df2, check_proxy=False)
def test_corrwith_bad_axis(self):
df = pd.DataFrame({'a': range(3), 'b': range(3, 6), 'c': range(6, 9)})
self._run_error_test(lambda df: df.corrwith(df.a, axis=2), df)
self._run_error_test(lambda df: df.corrwith(df, axis=5), df)
@unittest.skipIf(PD_VERSION < (1, 2), "na_action added in pandas 1.2.0")
@pytest.mark.filterwarnings(
"ignore:The default of observed=False is deprecated:FutureWarning")
def test_applymap_na_action(self):
# Replicates a doctest for na_action which is incompatible with
# doctest framework
df = pd.DataFrame([[pd.NA, 2.12], [3.356, 4.567]])
self._run_test(
lambda df: df.applymap(lambda x: len(str(x)), na_action='ignore'),
df,
# TODO: generate proxy using naive type inference on fn
check_proxy=False)
@unittest.skipIf(PD_VERSION < (2, 1), "map added in 2.1.0")
def test_map_na_action(self):
# Replicates a doctest for na_action which is incompatible with
# doctest framework
df = pd.DataFrame([[pd.NA, 2.12], [3.356, 4.567]])
self._run_test(
lambda df: df.map(lambda x: len(str(x)), na_action='ignore'),
df,
# TODO: generate proxy using naive type inference on fn
check_proxy=False)
def test_dataframe_eval_query(self):
df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
self._run_test(lambda df: df.eval('foo = a + b - c'), df)
self._run_test(lambda df: df.query('a > b + c'), df)
self._run_inplace_test(lambda df: df.eval('foo = a + b - c'), df)
# Verify that attempting to access locals raises a useful error
deferred_df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(df, df[0:0]))
self.assertRaises(
NotImplementedError, lambda: deferred_df.eval('foo = a + @b - c'))
self.assertRaises(
NotImplementedError, lambda: deferred_df.query('a > @b + c'))
def test_index_name_assignment(self):
df = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
df = df.set_index(['a', 'b'], drop=False)
def change_index_names(df):
df.index.names = ['A', None]
self._run_inplace_test(change_index_names, df)
def test_quantile(self):
df = pd.DataFrame(
np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
self._run_test(
lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)
self._run_test(
lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)
with self.assertRaisesRegex(frame_base.WontImplementError,
r"df\.quantile\(q=0\.1, axis='columns'\)"):
self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
def test_dataframe_melt(self):
df = pd.DataFrame({
'A': {
0: 'a', 1: 'b', 2: 'c'
},
'B': {
0: 1, 1: 3, 2: 5
},
'C': {
0: 2, 1: 4, 2: 6
}
})
self._run_test(
lambda df: df.melt(id_vars=['A'], value_vars=['B'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'], value_vars=['B'], var_name='myVarname', value_name=
'myValname', ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),
df)
df.columns = [list('ABC'), list('DEF')]
self._run_test(
lambda df: df.melt(
col_level=0, id_vars=['A'], value_vars=['B'], ignore_index=False),
df)
self._run_test(
lambda df: df.melt(
id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
df)
def test_fillna_columns(self):
df = pd.DataFrame(
[[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
[np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
columns=list('ABCD'))
self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
self._run_test(
lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
self._run_test(
lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
# Intended behavior is unclear here. See
# https://github.com/pandas-dev/pandas/issues/40989
# self._run_test(lambda df: df.fillna(axis='columns', value=100,
# limit=2), df)
def test_dataframe_fillna_dataframe_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))
self._run_test(lambda df, df2: df.fillna(df2), df, df2)
def test_dataframe_fillna_series_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
s = pd.Series(range(4), index=list("ABCE"))
self._run_test(lambda df, s: df.fillna(s), df, s)
def test_series_fillna_series_as_value(self):
df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))
self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)
def test_dataframe_column_fillna_constant_as_value(self):
from apache_beam.dataframe import convert
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
with beam.Pipeline(None) as p:
pcoll = (
p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x))
df = convert.to_dataframe(pcoll)
df_new = df['x'].fillna(0)
assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0]))
@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
self._run_error_test(
lambda s1, s2: s1.append(s2, verify_integrity=True),
df1['A'],
df2['A'],
construction_time=False)
self._run_error_test(
lambda df1, df2: df1.append(df2, verify_integrity=True),
df1,
df2,
construction_time=False)
def test_categorical_groupby(self):
df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
df['B'] = df['B'].astype(pd.CategoricalDtype(list('cab')))
df = df.set_index('B')
# TODO(BEAM-11190): These aggregations can be done in index partitions, but
# it will require a little more complex logic
self._run_test(
lambda df: df.groupby(level=0, observed=False).sum(),
df,
nonparallel=True)
self._run_test(
lambda df: df.groupby(level=0, observed=False).mean(),
df,
nonparallel=True)
def test_astype_categorical(self):
df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
categorical_dtype = pd.CategoricalDtype(df.B.unique())
self._run_test(lambda df: df.B.astype(categorical_dtype), df)
@unittest.skipIf(
PD_VERSION < (1, 2), "DataFrame.unstack not supported in pandas <1.2.x")
def test_astype_categorical_with_unstack(self):
df = pd.DataFrame({
'index1': ['one', 'one', 'two', 'two'],
'index2': ['a', 'b', 'a', 'b'],
'data': np.arange(1.0, 5.0),
})
def with_categorical_index(df):
df.index1 = df.index1.astype(pd.CategoricalDtype(['one', 'two']))
df.index2 = df.index2.astype(pd.CategoricalDtype(['a', 'b']))
df.set_index(['index1', 'index2'], drop=True)
return df
self._run_test(
lambda df: with_categorical_index(df).unstack(level=-1),
df,
check_proxy=False)
def test_dataframe_sum_nonnumeric_raises(self):
# Attempting a numeric aggregation with the str column present should
# raise, and suggest the numeric_only argument
with self.assertRaisesRegex(frame_base.WontImplementError, 'numeric_only'):
self._run_test(lambda df: df.sum(), GROUPBY_DF)
# numeric_only=True should work
self._run_test(lambda df: df.sum(numeric_only=True), GROUPBY_DF)
# projecting only numeric columns should too
self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)
def test_insert(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)
self._run_inplace_test(
lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])),
df,
check_proxy=False)
self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)
def test_insert_does_not_support_list_value(self):
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
with self.assertRaisesRegex(frame_base.WontImplementError,
r"insert\(value=list\)"):
self._run_inplace_test(lambda df: df.insert(1, 'C', [7, 8, 9]), df)
def test_drop_duplicates(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
self._run_test(lambda df: df.drop_duplicates(keep=False), df)
self._run_test(
lambda df: df.drop_duplicates(subset=['brand'], keep=False), df)
self._run_test(
lambda df: df.drop_duplicates(subset=['brand', 'style'], keep=False),
df)
@parameterized.expand([
(
lambda base: base.from_dict({
'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']
}), ),
(
lambda base: base.from_dict({
'row_1': [3, 2, 1, 0], 'row_2': ['a', 'b', 'c', 'd']
}, orient='index'), ),
(
lambda base: base.from_records(
np.array([(3, 'a'), (2, 'b'), (1, 'c'),
(0, 'd')], dtype=[('col_1', 'i4'),
('col_2', 'U1')])), ),
])
def test_create_methods(self, func):
expected = func(pd.DataFrame)
deferred_df = func(frames.DeferredDataFrame)
actual = expressions.Session({}).evaluate(deferred_df._expr)
pd.testing.assert_frame_equal(actual, expected)
def test_replace(self):
# verify a replace() doctest case that doesn't quite work in Beam as it uses
# the default method='pad'
df = pd.DataFrame({'A': ['bat', 'foo', 'bait'], 'B': ['abc', 'bar', 'xyz']})
self._run_test(
lambda df: df.replace(
regex={
r'^ba.$': 'new', 'foo': 'xyz'
}, method=None),
df)
def test_sample_columns(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
self._run_test(lambda df: df.sample(axis=1, n=2, random_state=1), df)
self._run_error_test(lambda df: df.sample(axis=1, n=10, random_state=2), df)
self._run_test(
lambda df: df.sample(axis=1, n=10, random_state=3, replace=True), df)
def test_cat(self):
# Replicate the doctests from CategorigcalAccessor
# These tests don't translate into pandas_doctests_test.py because it
# tries to use astype("category") in Beam, which makes a non-deferred
# column type.
s = pd.Series(list("abbccc")).astype("category")
self._run_test(lambda s: s.cat.rename_categories(list("cba")), s)
self._run_test(lambda s: s.cat.reorder_categories(list("cba")), s)
self._run_test(lambda s: s.cat.add_categories(["d", "e"]), s)
self._run_test(lambda s: s.cat.remove_categories(["a", "c"]), s)
self._run_test(lambda s: s.cat.set_categories(list("abcde")), s)
self._run_test(lambda s: s.cat.as_ordered(), s)
self._run_test(lambda s: s.cat.as_unordered(), s)
self._run_test(lambda s: s.cat.codes, s)
@parameterized.expand(frames.ELEMENTWISE_DATETIME_PROPERTIES)
def test_dt_property(self, prop_name):
# Generate a series with a lot of unique timestamps
s = pd.Series(
pd.date_range('1/1/2000', periods=100, freq='m') +
pd.timedelta_range(start='0 days', end='70 days', periods=100))
self._run_test(lambda s: getattr(s.dt, prop_name), s)
@parameterized.expand([
('month_name', {}),
('day_name', {}),
('normalize', {}),
(
'strftime',
{
'date_format': '%B %d, %Y, %r'
},
),
('tz_convert', {
'tz': 'Europe/Berlin'
}),
])
def test_dt_method(self, op, kwargs):
# Generate a series with a lot of unique timestamps
s = pd.Series(
pd.date_range(
'1/1/2000', periods=100, freq='m', tz='America/Los_Angeles') +
pd.timedelta_range(start='0 days', end='70 days', periods=100))
self._run_test(lambda s: getattr(s.dt, op)(**kwargs), s)
def test_dt_tz_localize_ambiguous_series(self):
# This replicates a dt.tz_localize doctest:
# s.tz_localize('CET', ambiguous=np.array([True, True, False]))
# But using a DeferredSeries instead of a np array
s = pd.to_datetime(
pd.Series([
'2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'
]))
ambiguous = pd.Series([True, True, False], index=s.index)
self._run_test(
lambda s, ambiguous: s.dt.tz_localize('CET', ambiguous=ambiguous),
s,
ambiguous)
def test_dt_tz_localize_nonexistent(self):
# This replicates dt.tz_localize doctests that exercise `nonexistent`.
# However they specify ambiguous='NaT' because the default,
# ambiguous='infer', is not supported.
s = pd.to_datetime(
pd.Series(['2015-03-29 02:30:00', '2015-03-29 03:30:00']))
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_forward'),
s)
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_backward'),
s)
self._run_test(
lambda s: s.dt.tz_localize(
'Europe/Warsaw', ambiguous='NaT', nonexistent=pd.Timedelta('1H')),
s)
def test_compare_series(self):
s1 = pd.Series(["a", "b", "c", "d", "e"])
s2 = pd.Series(["a", "a", "c", "b", "e"])
self._run_test(lambda s1, s2: s1.compare(s2), s1, s2)
self._run_test(lambda s1, s2: s1.compare(s2, align_axis=0), s1, s2)
self._run_test(lambda s1, s2: s1.compare(s2, keep_shape=True), s1, s2)
self._run_test(
lambda s1, s2: s1.compare(s2, keep_shape=True, keep_equal=True), s1, s2)
def test_compare_dataframe(self):
df1 = pd.DataFrame(
{
"col1": ["a", "a", "b", "b", "a"],
"col2": [1.0, 2.0, 3.0, np.nan, 5.0],
"col3": [1.0, 2.0, 3.0, 4.0, 5.0]
},
columns=["col1", "col2", "col3"],
)
df2 = df1.copy()
df2.loc[0, 'col1'] = 'c'
df2.loc[2, 'col3'] = 4.0
# Skipped because keep_shape=False won't be implemented
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"compare\(align_axis\=1, keep_shape\=False\) is not allowed"):
self._run_test(lambda df1, df2: df1.compare(df2), df1, df2)
self._run_test(
lambda df1, df2: df1.compare(df2, align_axis=0),
df1,
df2,
check_proxy=False)
self._run_test(lambda df1, df2: df1.compare(df2, keep_shape=True), df1, df2)
self._run_test(
lambda df1, df2: df1.compare(df2, align_axis=0, keep_shape=True),
df1,
df2)
self._run_test(
lambda df1, df2: df1.compare(df2, keep_shape=True, keep_equal=True),
df1,
df2)
self._run_test(
lambda df1, df2: df1.compare(
df2, align_axis=0, keep_shape=True, keep_equal=True),
df1,
df2)
def test_idxmin(self):
df = pd.DataFrame({
'consumption': [10.51, 103.11, 55.48],
'co2_emissions': [37.2, 19.66, 1712]
},
index=['Pork', 'Wheat Products', 'Beef'])
df2 = df.copy()
df2.loc['Pork', 'co2_emissions'] = None
df2.loc['Wheat Products', 'consumption'] = None
df2.loc['Beef', 'co2_emissions'] = None
df3 = pd.DataFrame({
'consumption': [1.1, 2.2, 3.3], 'co2_emissions': [3.3, 2.2, 1.1]
},
index=[0, 1, 2])
s = pd.Series(data=[4, 3, None, 1], index=['A', 'B', 'C', 'D'])
s2 = pd.Series(data=[1, 2, 3], index=[1, 2, 3])
self._run_test(lambda df: df.idxmin(), df)
self._run_test(lambda df: df.idxmin(skipna=False), df)
self._run_test(lambda df: df.idxmin(axis=1), df)
self._run_test(lambda df: df.idxmin(axis=1, skipna=False), df)
self._run_test(lambda df2: df2.idxmin(), df2)
self._run_test(lambda df2: df2.idxmin(axis=1), df2)
self._run_test(lambda df2: df2.idxmin(skipna=False), df2, check_proxy=False)
self._run_test(
lambda df2: df2.idxmin(axis=1, skipna=False), df2, check_proxy=False)
self._run_test(lambda df3: df3.idxmin(), df3)
self._run_test(lambda df3: df3.idxmin(axis=1), df3)
self._run_test(lambda df3: df3.idxmin(skipna=False), df3)
self._run_test(lambda df3: df3.idxmin(axis=1, skipna=False), df3)
self._run_test(lambda s: s.idxmin(), s)
self._run_test(lambda s: s.idxmin(skipna=False), s, check_proxy=False)
self._run_test(lambda s2: s2.idxmin(), s2)
self._run_test(lambda s2: s2.idxmin(skipna=False), s2)
def test_idxmax(self):
df = pd.DataFrame({
'consumption': [10.51, 103.11, 55.48],
'co2_emissions': [37.2, 19.66, 1712]
},
index=['Pork', 'Wheat Products', 'Beef'])
df2 = df.copy()
df2.loc['Pork', 'co2_emissions'] = None
df2.loc['Wheat Products', 'consumption'] = None
df2.loc['Beef', 'co2_emissions'] = None
df3 = pd.DataFrame({
'consumption': [1.1, 2.2, 3.3], 'co2_emissions': [3.3, 2.2, 1.1]
},
index=[0, 1, 2])
s = pd.Series(data=[1, None, 4, 1], index=['A', 'B', 'C', 'D'])
s2 = pd.Series(data=[1, 2, 3], index=[1, 2, 3])
self._run_test(lambda df: df.idxmax(), df)
self._run_test(lambda df: df.idxmax(skipna=False), df)
self._run_test(lambda df: df.idxmax(axis=1), df)
self._run_test(lambda df: df.idxmax(axis=1, skipna=False), df)
self._run_test(lambda df2: df2.idxmax(), df2)
self._run_test(lambda df2: df2.idxmax(axis=1), df2)
self._run_test(
lambda df2: df2.idxmax(axis=1, skipna=False), df2, check_proxy=False)
self._run_test(lambda df2: df2.idxmax(skipna=False), df2, check_proxy=False)
self._run_test(lambda df3: df3.idxmax(), df3)
self._run_test(lambda df3: df3.idxmax(axis=1), df3)
self._run_test(lambda df3: df3.idxmax(skipna=False), df3)
self._run_test(lambda df3: df3.idxmax(axis=1, skipna=False), df3)
self._run_test(lambda s: s.idxmax(), s)
self._run_test(lambda s: s.idxmax(skipna=False), s, check_proxy=False)
self._run_test(lambda s2: s2.idxmax(), s2)
self._run_test(lambda s2: s2.idxmax(skipna=False), s2)
def test_pipe(self):
def df_times(df, column, times):
df[column] = df[column] * times
return df
def df_times_shuffled(column, times, df):
return df_times(df, column, times)
def s_times(s, times):
return s * times
def s_times_shuffled(times, s):
return s_times(s, times)
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]}, index=[0, 1, 2])
s = pd.Series([1, 2, 3, 4, 5], index=[0, 1, 2, 3, 4])
self._run_inplace_test(lambda df: df.pipe(df_times, 'A', 2), df)
self._run_inplace_test(
lambda df: df.pipe((df_times_shuffled, 'df'), 'A', 2), df)
self._run_test(lambda s: s.pipe(s_times, 2), s)
self._run_test(lambda s: s.pipe((s_times_shuffled, 's'), 2), s)
def test_unstack_pandas_series_not_multiindex(self):
# Pandas should throw a ValueError if performing unstack
# on a Series without MultiIndex
s = pd.Series([1, 2, 3, 4], index=['one', 'two', 'three', 'four'])
with self.assertRaises((AttributeError, ValueError)):
self._run_test(lambda s: s.unstack(), s)
def test_unstack_non_categorical_index(self):
index = pd.MultiIndex.from_tuples([('one', 'a'), ('one', 'b'), ('two', 'a'),
('two', 'b')])
index = index.set_levels(
index.levels[0].astype(pd.CategoricalDtype(['one', 'two'])), level=0)
s = pd.Series(np.arange(1.0, 5.0), index=index)
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"unstack\(\) is only supported on DataFrames if"):
self._run_test(lambda s: s.unstack(level=-1), s)
def _unstack_get_categorical_index(self):
index = pd.MultiIndex.from_tuples([('one', 'a'), ('one', 'b'), ('two', 'a'),
('two', 'b')])
index = index.set_levels(
index.levels[0].astype(pd.CategoricalDtype(['one', 'two'])), level=0)
index = index.set_levels(
index.levels[1].astype(pd.CategoricalDtype(['a', 'b'])), level=1)
return index
def test_unstack_pandas_example1(self):
index = self._unstack_get_categorical_index()
s = pd.Series(np.arange(1.0, 5.0), index=index)
self._run_test(lambda s: s.unstack(level=-1), s)
def test_unstack_pandas_example2(self):
index = self._unstack_get_categorical_index()
s = pd.Series(np.arange(1.0, 5.0), index=index)
self._run_test(lambda s: s.unstack(level=0), s)
@unittest.skipIf(
sys.version_info >= (3, 12) and PD_VERSION < (2, 3),
'https://github.com/pandas-dev/pandas/issues/58604')
def test_unstack_pandas_example3(self):
index = self._unstack_get_categorical_index()
s = pd.Series(np.arange(1.0, 5.0), index=index)
df = s.unstack(level=0)
if PD_VERSION < (1, 2):
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"unstack\(\) is not supported when using pandas < 1.2.0"):
self._run_test(lambda df: df.unstack(), df)
else:
self._run_test(lambda df: df.unstack(), df)
@unittest.skipIf(
PD_VERSION < (1, 4),
"Cannot set dtype of index to boolean for pandas<1.4")
def test_unstack_bool(self):
index = pd.MultiIndex.from_tuples([(True, 'a'), (True, 'b'), (False, 'a'),
(False, 'b')])
index = index.set_levels(index.levels[0].astype('boolean'), level=0)
index = index.set_levels(
index.levels[1].astype(pd.CategoricalDtype(['a', 'b'])), level=1)
s = pd.Series(np.arange(1.0, 5.0), index=index)
self._run_test(lambda s: s.unstack(level=0), s)
def test_unstack_series_multiple_index_levels(self):
tuples = list(
zip(
*[
["bar", "bar", "bar", "bar", "baz", "baz", "baz", "baz"],
["one", "one", "two", "two", "one", "one", "two", "two"],
["A", "B", "A", "B", "A", "B", "A", "B"],
]))
index = pd.MultiIndex.from_tuples(
tuples, names=["first", "second", "third"])
index = index.set_levels(
index.levels[0].astype(pd.CategoricalDtype(['bar', 'baz'])), level=0)
index = index.set_levels(
index.levels[1].astype(pd.CategoricalDtype(['one', 'two'])), level=1)
index = index.set_levels(
index.levels[2].astype(pd.CategoricalDtype(['A', 'B'])), level=2)
df = pd.Series(np.random.randn(8), index=index)
self._run_test(lambda df: df.unstack(level=['first', 'third']), df)
def test_unstack_series_multiple_index_and_column_levels(self):
columns = pd.MultiIndex.from_tuples(
[
("A", "cat", "long"),
("B", "cat", "long"),
("A", "dog", "short"),
("B", "dog", "short"),
],
names=["exp", "animal", "hair_length"],
)
index = pd.MultiIndex.from_product(
[['one', 'two'], ['a', 'b'], ['bar', 'baz']],
names=["first", "second", "third"])
index = index.set_levels(
index.levels[0].astype(pd.CategoricalDtype(['one', 'two'])), level=0)
index = index.set_levels(
index.levels[1].astype(pd.CategoricalDtype(['a', 'b'])), level=1)
index = index.set_levels(
index.levels[2].astype(pd.CategoricalDtype(['bar', 'baz'])), level=2)
df = pd.DataFrame(np.random.randn(8, 4), index=index, columns=columns)
df = df.stack(level=["animal", "hair_length"])
self._run_test(lambda df: df.unstack(level=['second', 'third']), df)
self._run_test(lambda df: df.unstack(level=['second']), df)
def test_pivot_non_categorical(self):
df = pd.DataFrame({
'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']
})
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"pivot\(\) of non-categorical type is not supported"):
self._run_test(
lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
def test_pivot_pandas_example1(self):
# Simple test 1
df = pd.DataFrame({
'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']
})
df['bar'] = df['bar'].astype(
pd.CategoricalDtype(categories=['A', 'B', 'C']))
self._run_test(
lambda df: df.pivot(index='foo', columns='bar', values='baz'), df)
self._run_test(
lambda df: df.pivot(index=['foo'], columns='bar', values='baz'), df)
def test_pivot_pandas_example3(self):
# Multiple values
df = pd.DataFrame({
'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']
})
df['bar'] = df['bar'].astype(
pd.CategoricalDtype(categories=['A', 'B', 'C']))
self._run_test(
lambda df: df.pivot(index='foo', columns='bar', values=['baz', 'zoo']),
df)
self._run_test(
lambda df: df.pivot(
index='foo', columns=['bar'], values=['baz', 'zoo']),
df)
def test_pivot_pandas_example4(self):
# Multiple columns
df = pd.DataFrame({
"lev1": [1, 1, 1, 2, 2, 2],
"lev2": [1, 1, 2, 1, 1, 2],
"lev3": [1, 2, 1, 2, 1, 2],
"lev4": [1, 2, 3, 4, 5, 6],
"values": [0, 1, 2, 3, 4, 5]
})
df['lev2'] = df['lev2'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['lev3'] = df['lev3'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['values'] = df['values'].astype('Int64')
self._run_test(
lambda df: df.pivot(
index="lev1", columns=["lev2", "lev3"], values="values"),
df)
def test_pivot_pandas_example5(self):
# Multiple index
df = pd.DataFrame({
"lev1": [1, 1, 1, 2, 2, 2],
"lev2": [1, 1, 2, 1, 1, 2],
"lev3": [1, 2, 1, 2, 1, 2],
"lev4": [1, 2, 3, 4, 5, 6],
"values": [0, 1, 2, 3, 4, 5]
})
df['lev3'] = df['lev3'].astype(pd.CategoricalDtype(categories=[1, 2]))
# Cast to nullable Int64 because Beam doesn't do the correct conversion to
# float64
df['values'] = df['values'].astype('Int64')
if PD_VERSION < (1, 4):
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"pivot\(\) is not supported when pandas<1.4 and index is a Multi"):
self._run_test(
lambda df: df.pivot(
index=["lev1", "lev2"], columns=["lev3"], values="values"),
df)
else:
self._run_test(
lambda df: df.pivot(
index=["lev1", "lev2"], columns=["lev3"], values="values"),
df)
def test_pivot_pandas_example6(self):
# Value error when there are duplicates
df = pd.DataFrame({
"foo": ['one', 'one', 'two', 'two'],
"bar": ['A', 'A', 'B', 'C'],
"baz": [1, 2, 3, 4]
})
df['bar'] = df['bar'].astype(
pd.CategoricalDtype(categories=['A', 'B', 'C']))
self._run_error_test(
lambda df: df.pivot(index='foo', columns='bar', values='baz'),
df,
construction_time=False)
def test_pivot_no_index_provided_on_single_level_index(self):
# Multiple columns, no index value provided
df = pd.DataFrame({
"lev1": [1, 1, 1, 2, 2, 2],
"lev2": [1, 1, 2, 1, 1, 2],
"lev3": [1, 2, 1, 2, 1, 2],
"lev4": [1, 2, 3, 4, 5, 6],
"values": [0, 1, 2, 3, 4, 5]
})
df['lev2'] = df['lev2'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['lev3'] = df['lev3'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['values'] = df['values'].astype('Int64')
self._run_test(
lambda df: df.pivot(columns=["lev2", "lev3"], values="values"), df)
def test_pivot_no_index_provided_on_multiindex(self):
# Multiple columns, no index value provided
tuples = list(
zip(
*[
["bar", "bar", "bar", "baz", "baz", "baz"],
[
"one",
"two",
"three",
"one",
"two",
"three",
],
]))
index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"])
df = pd.DataFrame({
"lev1": [1, 1, 1, 2, 2, 2],
"lev2": [1, 1, 2, 1, 1, 2],
"lev3": [1, 2, 1, 2, 1, 2],
"lev4": [1, 2, 3, 4, 5, 6],
"values": [0, 1, 2, 3, 4, 5]
},
index=index)
df['lev2'] = df['lev2'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['lev3'] = df['lev3'].astype(pd.CategoricalDtype(categories=[1, 2]))
df['values'] = df['values'].astype('float64')
df['lev1'] = df['lev1'].astype('int64')
df['lev4'] = df['lev4'].astype('int64')
if PD_VERSION < (1, 4):
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"pivot\(\) is not supported when pandas<1.4 and index is a Multi"):
self._run_test(lambda df: df.pivot(columns=["lev2", "lev3"]), df)
else:
self._run_test(
lambda df: df.pivot(columns=["lev2", "lev3"]),
df,
lenient_dtype_check=True)
# pandas doesn't support kurtosis on GroupBys:
# https://github.com/pandas-dev/pandas/issues/40139
ALL_GROUPING_AGGREGATIONS = sorted(
set(frames.ALL_AGGREGATIONS) - set(('kurt', 'kurtosis')))
AGGREGATIONS_WHERE_NUMERIC_ONLY_DEFAULTS_TO_TRUE_IN_PANDAS_1 = set(
frames.ALL_AGGREGATIONS) - set((
'nunique',
'size',
'count',
'idxmin',
'idxmax',
'mode',
'rank',
'all',
'any',
'describe'))
def numeric_only_kwargs_for_pandas_2(agg_type: str) -> dict[str, bool]:
"""Get proper arguments for numeric_only.
Behavior for numeric_only in these methods changed in Pandas 2 to default
to False instead of True, so explicitly make it True in Pandas 2."""
if PD_VERSION >= (2, 0) and (
agg_type in AGGREGATIONS_WHERE_NUMERIC_ONLY_DEFAULTS_TO_TRUE_IN_PANDAS_1):
return {'numeric_only': True}
else:
return {}
class GroupByTest(_AbstractFrameTest):
"""Tests for DataFrame/Series GroupBy operations."""
@staticmethod
def median_sum_fn(x):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Mean of empty slice")
return (x.foo + x.bar).median()
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_agg(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: df.groupby('group').agg(agg_type, **kwargs),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_with_filter(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: getattr(df[df.foo > 30].groupby('group'), agg_type)
(**kwargs),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: getattr(df.groupby('group'), agg_type)(**kwargs),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_series(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: getattr(df[df.foo > 40].groupby(df.group), agg_type)
(**kwargs),
GROUPBY_DF,
check_proxy=False)
def test_groupby_user_guide(self):
# Example from https://pandas.pydata.org/docs/user_guide/groupby.html
arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
df = pd.DataFrame({
'A': [1, 1, 1, 1, 2, 2, 3, 3], 'B': np.arange(8)
},
index=index)
self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_project_series(self, agg_type):
df = GROUPBY_DF
if agg_type == 'describe':
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"SeriesGroupBy.describe fails")
if agg_type in ('corr', 'cov'):
self.skipTest(
"https://github.com/apache/beam/issues/20895: "
"SeriesGroupBy.{corr, cov} do not raise the expected error.")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: getattr(df.groupby('group').foo, agg_type)(**kwargs), df)
self._run_test(
lambda df: getattr(df.groupby('group').bar, agg_type)(**kwargs), df)
self._run_test(
lambda df: getattr(df.groupby('group')['foo'], agg_type)(**kwargs), df)
self._run_test(
lambda df: getattr(df.groupby('group')['bar'], agg_type)(**kwargs), df)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_groupby_project_dataframe(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
self._run_test(
lambda df: getattr(df.groupby('group')[['bar', 'baz']], agg_type)
(**kwargs),
GROUPBY_DF,
check_proxy=False)
def test_groupby_errors_bad_projection(self):
df = GROUPBY_DF
# non-existent projection column
self._run_error_test(
lambda df: df.groupby('group')[['bar', 'baz']].bar.median(), df)
self._run_error_test(lambda df: df.groupby('group')[['bad']].median(), df)
self._run_error_test(lambda df: df.groupby('group').bad.median(), df)
self._run_error_test(
lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(), df)
self._run_error_test(lambda df: df.groupby('group')[['bat']].sum(), df)
self._run_error_test(lambda df: df.groupby('group').bat.sum(), df)
def test_groupby_errors_non_existent_label(self):
df = GROUPBY_DF
# non-existent grouping label
self._run_error_test(
lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(), df)
self._run_error_test(lambda df: df.groupby('bad').foo.sum(), df)
def test_groupby_callable(self):
df = GROUPBY_DF
kwargs = numeric_only_kwargs_for_pandas_2('sum')
self._run_test(lambda df: df.groupby(lambda x: x % 2).foo.sum(**kwargs), df)
kwargs = numeric_only_kwargs_for_pandas_2('median')
self._run_test(lambda df: df.groupby(lambda x: x % 5).median(**kwargs), df)
def test_groupby_apply(self):
df = GROUPBY_DF
# Note this is the same as DataFrameGroupBy.describe. Using it here is
# just a convenient way to test apply() with a user fn that returns a Series
describe = lambda df: df.describe()
self._run_test(lambda df: df.groupby('group').foo.apply(describe), df)
self._run_test(
lambda df: df.groupby('group')[['foo', 'bar']].apply(describe), df)
self._run_test(lambda df: df.groupby('group').apply(self.median_sum_fn), df)
self._run_test(
lambda df: df.set_index('group').foo.groupby(level=0).apply(describe),
df)
self._run_test(lambda df: df.groupby(level=0).apply(self.median_sum_fn), df)
self._run_test(lambda df: df.groupby(lambda x: x % 3).apply(describe), df)
self._run_test(
lambda df: df.bar.groupby(lambda x: x % 3).apply(describe), df)
self._run_test(
lambda df: df.set_index(['str', 'group', 'bool']).groupby(
level='group').apply(self.median_sum_fn),
df)
def test_groupby_apply_preserves_column_order(self):
df = GROUPBY_DF
self._run_test(
lambda df: df[['foo', 'group', 'bar']].groupby(
'group', group_keys=False).apply(lambda x: x),
df)
def test_groupby_transform(self):
df = pd.DataFrame({
"Date": [
"2015-05-08",
"2015-05-07",
"2015-05-06",
"2015-05-05",
"2015-05-08",
"2015-05-07",
"2015-05-06",
"2015-05-05"
],
"Data": [5, 8, 6, 1, 50, 100, 60, 120],
})
self._run_test(lambda df: df.groupby('Date')['Data'].transform(np.sum), df)
self._run_test(
lambda df: df.groupby('Date')['Data'].transform(
lambda x: (x - x.mean()) / x.std()),
df)
def test_groupby_pipe(self):
df = GROUPBY_DF
kwargs = numeric_only_kwargs_for_pandas_2('sum')
self._run_test(
lambda df: df.groupby('group').pipe(lambda x: x.sum(**kwargs)), df)
self._run_test(
lambda df: df.groupby('group')['bool'].pipe(lambda x: x.any()), df)
self._run_test(
lambda df: df.groupby(['group', 'foo']).pipe(
(lambda a, x: x.sum(numeric_only=a), 'x'), False),
df,
check_proxy=False)
def test_groupby_apply_modified_index(self):
df = GROUPBY_DF
# If apply fn modifies the index then the output will include the grouped
# index
self._run_test(
lambda df: df.groupby('group').apply(
lambda x: x[x.foo > x.foo.median()]),
df)
@unittest.skip('https://github.com/apache/beam/issues/20762')
def test_groupby_aggregate_grouped_column(self):
df = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
'foo': [None if i % 11 == 0 else i for i in range(100)],
'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],
'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],
})
self._run_test(lambda df: df.groupby('group').group.count(), df)
self._run_test(lambda df: df.groupby('group')[['group', 'bar']].count(), df)
self._run_test(
lambda df: df.groupby('group')[['group', 'bar']].apply(
lambda x: x.describe()),
df)
@parameterized.expand((x, ) for x in [
0,
[1],
3,
[0, 3],
[2, 1],
['foo', 0],
[1, 'str'],
[3, 0, 2, 1], ])
def test_groupby_level_agg(self, level):
df = GROUPBY_DF.set_index(['group', 'foo', 'bar', 'str'], drop=False)
self._run_test(lambda df: df.groupby(level=level).bar.max(), df)
self._run_test(
lambda df: df.groupby(level=level).sum(numeric_only=True), df)
self._run_test(
lambda df: df.groupby(level=level).apply(self.median_sum_fn), df)
@unittest.skipIf(PD_VERSION < (1, 1), "drop_na added in pandas 1.1.0")
def test_groupby_count_na(self):
# Verify we can do a groupby.count() that doesn't drop NaN values
self._run_test(
lambda df: df.groupby('foo', dropna=True).bar.count(), GROUPBY_DF)
self._run_test(
lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
def test_groupby_sum_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
'bad': [np.nan, np.nan, np.nan, 4],
'group': ['a', 'b', 'a', 'b']
})
self._run_test(lambda df: df.groupby('group').sum(min_count=2), df)
@unittest.skipIf(
PD_VERSION >= (2, 0), "dtypes on groups is deprecated in Pandas 2.")
def test_groupby_dtypes(self):
self._run_test(
lambda df: df.groupby('group').dtypes, GROUPBY_DF, check_proxy=False)
self._run_test(
lambda df: df.groupby(level=0).dtypes, GROUPBY_DF, check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_dataframe_groupby_series(self, agg_type):
if agg_type == 'describe' and PD_VERSION < (1, 2):
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"DataFrameGroupBy.describe fails in pandas < 1.2")
def agg(df, group_by):
kwargs = numeric_only_kwargs_for_pandas_2(agg_type)
return df[df.foo > 40].groupby(group_by).agg(agg_type, **kwargs)
self._run_test(lambda df: agg(df, df.group), GROUPBY_DF, check_proxy=False)
self._run_test(
lambda df: agg(df, df.foo % 3), GROUPBY_DF, check_proxy=False)
@parameterized.expand(ALL_GROUPING_AGGREGATIONS)
def test_series_groupby_series(self, agg_type):
if agg_type == 'describe':
self.skipTest(
"https://github.com/apache/beam/issues/20967: proxy generation of "
"SeriesGroupBy.describe fails")
if agg_type in ('corr', 'cov'):
self.skipTest(
"https://github.com/apache/beam/issues/20895: "
"SeriesGroupBy.{corr, cov} do not raise the expected error.")
self._run_test(
lambda df: df[df.foo < 40].bar.groupby(df.group).agg(agg_type),
GROUPBY_DF)
self._run_test(
lambda df: df[df.foo < 40].bar.groupby(df.foo % 3).agg(agg_type),
GROUPBY_DF)
def test_groupby_series_apply(self):
df = GROUPBY_DF
# Note this is the same as DataFrameGroupBy.describe. Using it here is
# just a convenient way to test apply() with a user fn that returns a Series
describe = lambda df: df.describe()
self._run_test(lambda df: df.groupby(df.group).foo.apply(describe), df)
self._run_test(
lambda df: df.groupby(df.group)[['foo', 'bar']].apply(describe), df)
self._run_test(
lambda df: df.groupby(df.group).apply(self.median_sum_fn), df)
def test_groupby_multiindex_keep_nans(self):
# Due to https://github.com/pandas-dev/pandas/issues/36470
# groupby(dropna=False) doesn't work with multiple columns
with self.assertRaisesRegex(NotImplementedError,
"https://github.com/apache/beam/issues/21014"):
self._run_test(
lambda df: df.groupby(['foo', 'bar'], dropna=False).sum(), GROUPBY_DF)
NONPARALLEL_METHODS = ['quantile', 'describe', 'median', 'sem']
# mad was removed in pandas 2
if PD_VERSION < (2, 0):
NONPARALLEL_METHODS.append('mad')
class AggregationTest(_AbstractFrameTest):
"""Tests for global aggregation methods on DataFrame/Series."""
# corr, cov on Series require an other argument
@parameterized.expand(
sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov'])))
def test_series_agg(self, agg_method):
s = pd.Series(list(range(16)))
nonparallel = agg_method in NONPARALLEL_METHODS
# TODO(https://github.com/apache/beam/issues/20926): max and min produce
# the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda s: s.agg(agg_method),
s,
nonparallel=nonparallel,
check_proxy=check_proxy)
# corr, cov on Series require an other argument
# Series.size is a property
@parameterized.expand(
sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov', 'size'])))
def test_series_agg_method(self, agg_method):
s = pd.Series(list(range(16)))
nonparallel = agg_method in NONPARALLEL_METHODS
# TODO(https://github.com/apache/beam/issues/20926): max and min produce
# the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda s: getattr(s, agg_method)(),
s,
nonparallel=nonparallel,
check_proxy=check_proxy)
@parameterized.expand(frames.ALL_AGGREGATIONS)
def test_dataframe_agg(self, agg_method):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
nonparallel = agg_method in NONPARALLEL_METHODS
# TODO(https://github.com/apache/beam/issues/20926): max and min produce
# the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda df: df.agg(agg_method),
df,
nonparallel=nonparallel,
check_proxy=check_proxy)
# DataFrame.size is a property
@parameterized.expand(sorted(set(frames.ALL_AGGREGATIONS) - set(['size'])))
def test_dataframe_agg_method(self, agg_method):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
nonparallel = agg_method in NONPARALLEL_METHODS
# TODO(https://github.com/apache/beam/issues/20926): max and min produce
# the wrong proxy
check_proxy = agg_method not in ('max', 'min')
self._run_test(
lambda df: getattr(df, agg_method)(),
df,
nonparallel=nonparallel,
check_proxy=check_proxy)
def test_series_agg_modes(self):
s = pd.Series(list(range(16)))
self._run_test(lambda s: s.agg('sum'), s)
self._run_test(lambda s: s.agg(['sum']), s)
self._run_test(lambda s: s.agg(['sum', 'mean']), s)
self._run_test(lambda s: s.agg(['mean']), s)
self._run_test(lambda s: s.agg('mean'), s)
def test_dataframe_agg_modes(self):
df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
self._run_test(lambda df: df.agg('sum'), df)
self._run_test(lambda df: df.agg(['sum', 'mean']), df)
self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_series_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level=0), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.median(level=0),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['foo', 'group']).bar.count(level=1),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level=1), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.max(level='foo'),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
GROUPBY_DF)
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_dataframe_agg_level(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).max(
level=0, numeric_only=False),
GROUPBY_DF,
check_proxy=False)
# pandas implementation doesn't respect numeric_only argument here
# (https://github.com/pandas-dev/pandas/issues/40788), it
# always acts as if numeric_only=True. Our implmentation respects it so we
# need to make it explicit.
self._run_test(
lambda df: df.set_index(['group', 'foo']).sum(
level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo'])[['bar']].count(level=1),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).count(level=1), GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).max(
level=1, numeric_only=False),
GROUPBY_DF,
check_proxy=False)
# sum with str columns is order-sensitive
self._run_test(
lambda df: df.set_index(['group', 'foo']).sum(
level=1, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index(['group', 'foo']).median(
level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.drop('str', axis=1).set_index(['foo', 'group']).median(
level=1, numeric_only=True),
GROUPBY_DF)
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_series_agg_multifunc_level(self):
# level= is ignored for multiple agg fns
self._run_test(
lambda df: df.set_index(['group', 'foo']).bar.agg(['min', 'max'], level=
0),
GROUPBY_DF)
def test_series_mean_skipna(self):
df = pd.DataFrame({
'one': [i if i % 8 == 0 else np.nan for i in range(8)],
'two': [i if i % 4 == 0 else np.nan for i in range(8)],
'three': [i if i % 2 == 0 else np.nan for i in range(8)],
})
self._run_test(lambda df: df.one.mean(skipna=False), df)
self._run_test(lambda df: df.two.mean(skipna=False), df)
self._run_test(lambda df: df.three.mean(skipna=False), df)
self._run_test(lambda df: df.one.mean(skipna=True), df)
self._run_test(lambda df: df.two.mean(skipna=True), df)
self._run_test(lambda df: df.three.mean(skipna=True), df)
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_dataframe_agg_multifunc_level(self):
# level= is ignored for multiple agg fns
self._run_test(
lambda df: df.set_index(['group', 'foo']).agg(['min', 'max'], level=0),
GROUPBY_DF,
check_proxy=False)
@parameterized.expand([(True, ), (False, )])
@unittest.skipIf(
PD_VERSION < (1, 2),
"pandas 1.1.0 produces different dtypes for these examples")
def test_dataframe_agg_numeric_only(self, numeric_only):
# Note other aggregation functions can fail on this input with
# numeric_only={False,None}. These are the only ones that actually work for
# the string inputs.
self._run_test(
lambda df: df.max(numeric_only=numeric_only),
GROUPBY_DF,
check_proxy=False)
self._run_test(
lambda df: df.min(numeric_only=numeric_only),
GROUPBY_DF,
check_proxy=False)
@unittest.skip(
"pandas implementation doesn't respect numeric_only= with "
"level= (https://github.com/pandas-dev/pandas/issues/40788)")
def test_dataframe_agg_level_numeric_only(self):
self._run_test(
lambda df: df.set_index('foo').sum(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').max(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').mean(level=0, numeric_only=True),
GROUPBY_DF)
self._run_test(
lambda df: df.set_index('foo').median(level=0, numeric_only=True),
GROUPBY_DF)
def test_dataframe_agg_bool_only(self):
df = pd.DataFrame({
'all': [True for i in range(10)],
'any': [i % 3 == 0 for i in range(10)],
'int': range(10)
})
self._run_test(lambda df: df.all(), df)
self._run_test(lambda df: df.any(), df)
self._run_test(lambda df: df.all(bool_only=True), df)
self._run_test(lambda df: df.any(bool_only=True), df)
@unittest.skip(
"pandas doesn't implement bool_only= with level= "
"(https://github.com/pandas-dev/pandas/blob/"
"v1.2.3/pandas/core/generic.py#L10573)")
def test_dataframe_agg_level_bool_only(self):
df = pd.DataFrame({
'all': [True for i in range(10)],
'any': [i % 3 == 0 for i in range(10)],
'int': range(10)
})
self._run_test(lambda df: df.set_index('int', drop=False).all(level=0), df)
self._run_test(lambda df: df.set_index('int', drop=False).any(level=0), df)
self._run_test(
lambda df: df.set_index('int', drop=False).all(level=0, bool_only=True),
df)
self._run_test(
lambda df: df.set_index('int', drop=False).any(level=0, bool_only=True),
df)
def test_series_agg_np_size(self):
self._run_test(
lambda df: df.set_index(['group', 'foo']).agg(np.size),
GROUPBY_DF,
check_proxy=False)
def test_df_agg_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.agg('mean', bool_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.agg('any', numeric_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.agg('median', min_count=3, numeric_only=True), GROUPBY_DF)
def test_series_agg_method_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.foo.median(min_count=3), GROUPBY_DF)
self._run_error_test(
lambda df: df.foo.agg('median', min_count=3), GROUPBY_DF)
@unittest.skipIf(
PD_VERSION < (1, 3),
(
"DataFrame.agg raises a different exception from the "
"aggregation methods. Fixed in "
"https://github.com/pandas-dev/pandas/pull/40543."))
def test_df_agg_method_invalid_kwarg_raises(self):
self._run_error_test(lambda df: df.mean(bool_only=True), GROUPBY_DF)
self._run_error_test(lambda df: df.any(numeric_only=True), GROUPBY_DF)
self._run_error_test(
lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
def test_agg_min_count(self):
df = pd.DataFrame({
'good': [1, 2, 3, np.nan],
'bad': [np.nan, np.nan, np.nan, 4],
},
index=['a', 'b', 'a', 'b'])
self._run_test(lambda df: df.sum(level=0, min_count=2), df)
self._run_test(lambda df: df.sum(min_count=3), df, nonparallel=True)
self._run_test(lambda df: df.sum(min_count=1), df, nonparallel=True)
self._run_test(lambda df: df.good.sum(min_count=2), df, nonparallel=True)
self._run_test(lambda df: df.bad.sum(min_count=2), df, nonparallel=True)
def test_series_agg_std(self):
s = pd.Series(range(10))
self._run_test(lambda s: s.agg('std'), s)
self._run_test(lambda s: s.agg('var'), s)
self._run_test(lambda s: s.agg(['std', 'sum']), s)
self._run_test(lambda s: s.agg(['var']), s)
def test_std_all_na(self):
s = pd.Series([np.nan] * 10)
self._run_test(lambda s: s.agg('std'), s)
self._run_test(lambda s: s.std(), s)
def test_df_agg_operations_on_columns(self):
self._run_test(
lambda df: df.groupby('group').agg(
mean_foo=('foo', lambda x: np.mean(x)),
median_bar=('bar', lambda x: np.median(x)),
sum_baz=('baz', 'sum'),
count_bool=('bool', 'count'), ),
GROUPBY_DF)
def test_std_mostly_na_with_ddof(self):
df = pd.DataFrame({
'one': [i if i % 8 == 0 else np.nan for i in range(8)],
'two': [i if i % 4 == 0 else np.nan for i in range(8)],
'three': [i if i % 2 == 0 else np.nan for i in range(8)],
},
index=pd.MultiIndex.from_arrays(
[list(range(8)), list(reversed(range(8)))],
names=['forward', None]))
self._run_test(lambda df: df.std(), df) # ddof=1
self._run_test(lambda df: df.std(ddof=0), df)
self._run_test(lambda df: df.std(ddof=2), df)
self._run_test(lambda df: df.std(ddof=3), df)
self._run_test(lambda df: df.std(ddof=4), df)
def test_dataframe_std(self):
self._run_test(lambda df: df.std(numeric_only=True), GROUPBY_DF)
self._run_test(lambda df: df.var(numeric_only=True), GROUPBY_DF)
def test_dataframe_mode(self):
self._run_test(
lambda df: df.mode(), GROUPBY_DF, nonparallel=True, check_proxy=False)
self._run_test(
lambda df: df.mode(numeric_only=True),
GROUPBY_DF,
nonparallel=True,
check_proxy=False)
self._run_test(
lambda df: df.mode(dropna=True, numeric_only=True),
GROUPBY_DF,
nonparallel=True,
check_proxy=False)
def test_series_mode(self):
self._run_test(lambda df: df.foo.mode(), GROUPBY_DF, nonparallel=True)
self._run_test(
lambda df: df.baz.mode(dropna=True), GROUPBY_DF, nonparallel=True)
class BeamSpecificTest(unittest.TestCase):
"""Tests for functionality that's specific to the Beam DataFrame API.
These features don't exist in pandas so we must verify them independently."""
def assert_frame_data_equivalent(
self, actual, expected, check_column_subset=False, extra_col_value=0):
"""Verify that actual is the same as expected, ignoring the index and order
of the data.
Note: In order to perform non-deferred column operations in Beam, we have
to enumerate all possible categories of data, even if they are ultimately
unobserved. The default Pandas implementation on the other hand does not
produce unobserved columns. This means when conducting tests, we need to
account for the fact that the Beam result may be a superset of that of the
Pandas result.
If ``check_column_subset`` is `True`, we verify that all of the columns in
the Dataframe returned from the Pandas implementation is contained in the
Dataframe created from the Beam implementation.
We also check if all columns that exist in the Beam implementation but
not in the Pandas implementation are all equal to the ``extra_col_value``
to ensure that they were not erroneously populated.
"""
if check_column_subset:
if isinstance(expected, pd.DataFrame):
expected_cols = set(expected.columns)
actual_cols = set(actual.columns)
# Verifying that expected columns is a subset of the actual columns
if not set(expected_cols).issubset(set(actual_cols)):
raise AssertionError(
f"Expected columns:\n{expected.columns}\n is not a"
f"subset of {actual.columns}.")
# Verifying that columns that don't exist in expected
# but do in actual, are all equal to `extra_col_value` (default of 0)
extra_columns = actual_cols - expected_cols
if extra_columns:
actual_extra_only = actual[list(extra_columns)]
if np.isnan(extra_col_value):
extra_cols_all_match = actual_extra_only.isna().all().all()
else:
extra_cols_all_match = actual_extra_only.eq(
extra_col_value).all().all()
if not extra_cols_all_match:
raise AssertionError(
f"Extra columns:{extra_columns}\n should all "
f"be {extra_col_value}, but got \n{actual_extra_only}.")
# Filtering actual to contain only columns in expected
actual = actual[expected.columns]
def sort_and_drop_index(df):
if isinstance(df, pd.Series):
df = df.sort_values()
elif isinstance(df, pd.DataFrame):
df = df.sort_values(by=list(df.columns))
return df.reset_index(drop=True)
actual = sort_and_drop_index(actual)
expected = sort_and_drop_index(expected)
if isinstance(expected, pd.Series):
pd.testing.assert_series_equal(actual, expected)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(actual, expected)
def _evaluate(self, func, *args, distributed=True):
deferred_args = [
frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(arg, arg[0:0])) for arg in args
]
session_type = (
expressions.PartitioningSession if distributed else expressions.Session)
return session_type({}).evaluate(func(*deferred_args)._expr)
def test_drop_duplicates_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.drop_duplicates(keep='any'), df)
# Verify that the result is the same as conventional drop_duplicates
self.assert_frame_data_equivalent(result, df.drop_duplicates())
def test_drop_duplicates_keep_any_subset(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(
lambda df: df.drop_duplicates(keep='any', subset=['brand']), df)
self.assertTrue(result.brand.unique)
self.assert_frame_data_equivalent(
result.brand, df.drop_duplicates(subset=['brand']).brand)
def test_series_drop_duplicates_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.brand.drop_duplicates(keep='any'), df)
self.assert_frame_data_equivalent(result, df.brand.drop_duplicates())
def test_duplicated_keep_any(self):
df = pd.DataFrame({
'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],
'style': ['cup', 'cup', 'cup', 'pack', 'pack'],
'rating': [4, 4, 3.5, 15, 5]
})
result = self._evaluate(lambda df: df.duplicated(keep='any'), df)
# Verify that the result is the same as conventional duplicated
self.assert_frame_data_equivalent(result, df.duplicated())
def test_get_dummies_not_categoricaldtype(self):
# Should not work because series is not a CategoricalDtype type
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"get_dummies\(\) of non-categorical type is not supported"):
s = pd.Series(['a ,b', 'a', 'a, d'])
self._evaluate(lambda s: s.str.get_dummies(','), s)
# bool series do not work because they are not a CategoricalDtype type
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"get_dummies\(\) of non-categorical type is not supported"):
s = pd.Series([True, False, False, True])
self._evaluate(lambda s: s.str.get_dummies(), s)
def test_get_dummies_comma_separator(self):
s = pd.Series(['a ,b', 'a', 'a, d', 'c'])
s = s.astype(pd.CategoricalDtype(categories=['a ,b', 'c', 'b', 'a,d']))
result = self._evaluate(lambda s: s.str.get_dummies(','), s)
self.assert_frame_data_equivalent(
result, s.str.get_dummies(','), check_column_subset=True)
def test_get_dummies_pandas_doc_example1(self):
s = pd.Series(['a|b', 'a', 'a|c'])
s = s.astype(pd.CategoricalDtype(categories=['a|b', 'a', 'a|c']))
result = self._evaluate(lambda s: s.str.get_dummies(), s)
self.assert_frame_data_equivalent(
result, s.str.get_dummies(), check_column_subset=True)
def test_get_dummies_pandas_doc_example2(self):
# Shouldn't still work even though np.nan is not considered a category
# because we automatically create a nan column
s = pd.Series(['a|b', np.nan, 'a|c'])
s = s.astype(pd.CategoricalDtype(categories=['a|b', 'a|c']))
result = self._evaluate(lambda s: s.str.get_dummies(), s)
self.assert_frame_data_equivalent(
result, s.str.get_dummies(), check_column_subset=True)
def test_get_dummies_pass_nan_as_category(self):
# Explicitly pass 'nan' as a category
s = pd.Series(['a|b', 'b|c', 'a|c', 'c', 'd'])
s = s.astype(pd.CategoricalDtype(categories=['a', 'b', 'c', 'nan']))
result = self._evaluate(lambda s: s.str.get_dummies(), s)
self.assert_frame_data_equivalent(
result, s.str.get_dummies(), check_column_subset=True)
def test_get_dummies_bools_casted_to_string(self):
s = pd.Series([True, False, False, True]).astype('str')
s = s.astype(pd.CategoricalDtype(categories=['True', 'False']))
result = self._evaluate(lambda s: s.str.get_dummies(), s)
self.assert_frame_data_equivalent(
result, s.str.get_dummies(), check_column_subset=True)
def test_nsmallest_any(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(
lambda df: df.population.nsmallest(3, keep='any'), df)
# keep='any' should produce the same result as keep='first',
# but not necessarily with the same index
self.assert_frame_data_equivalent(result, df.population.nsmallest(3))
def test_nlargest_any(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(
lambda df: df.population.nlargest(3, keep='any'), df)
# keep='any' should produce the same result as keep='first',
# but not necessarily with the same index
self.assert_frame_data_equivalent(result, df.population.nlargest(3))
def test_pivot_pandas_example2(self):
# Simple test 2
df = pd.DataFrame({
'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']
})
df['bar'] = df['bar'].astype(
pd.CategoricalDtype(categories=['A', 'B', 'C']))
result = self._evaluate(lambda df: df.pivot(index='foo', columns='bar'), df)
# When there are multiple values, dtypes default to object.
# Thus, need to convert to numeric with pd.to_numeric
self.assert_frame_data_equivalent(
result['baz'].apply(pd.to_numeric),
df.pivot(index='foo', columns='bar')['baz'])
def test_sample(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
result = self._evaluate(lambda df: df.sample(n=3), df)
self.assertEqual(len(result), 3)
series_result = self._evaluate(lambda df: df.GDP.sample(n=3), df)
self.assertEqual(len(series_result), 3)
self.assertEqual(series_result.name, "GDP")
def test_sample_with_weights(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
weights = pd.Series([0, 0, 0, 0, 0, 0, 0, 1, 1], index=df.index)
result = self._evaluate(
lambda df, weights: df.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(result), 2)
self.assertEqual(set(result.index), set(["Tuvalu", "Anguilla"]))
series_result = self._evaluate(
lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(series_result), 2)
self.assertEqual(series_result.name, "GDP")
self.assertEqual(set(series_result.index), set(["Tuvalu", "Anguilla"]))
def test_sample_with_missing_weights(self):
df = pd.DataFrame({
'population': [
59000000,
65000000,
434000,
434000,
434000,
337000,
337000,
11300,
11300
],
'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],
'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]
},
index=[
"Italy",
"France",
"Malta",
"Maldives",
"Brunei",
"Iceland",
"Nauru",
"Tuvalu",
"Anguilla"
])
# Missing weights are treated as 0
weights = pd.Series([.1, .01, np.nan, 0],
index=["Nauru", "Iceland", "Anguilla", "Italy"])
result = self._evaluate(
lambda df, weights: df.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(result), 2)
self.assertEqual(set(result.index), set(["Nauru", "Iceland"]))
series_result = self._evaluate(
lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)
self.assertEqual(len(series_result), 2)
self.assertEqual(series_result.name, "GDP")
self.assertEqual(set(series_result.index), set(["Nauru", "Iceland"]))
def test_sample_with_weights_distribution(self):
target_prob = 0.25
num_samples = 100
num_targets = 200
num_other_elements = 10000
target_weight = target_prob / num_targets
other_weight = (1 - target_prob) / num_other_elements
self.assertTrue(target_weight > other_weight * 10, "weights too close")
result = self._evaluate(
lambda s, weights: s.sample(n=num_samples, weights=weights).sum(),
# The first elements are 1, the rest are all 0. This means that when
# we sum all the sampled elements (above), the result should be the
# number of times the first elements (aka targets) were sampled.
pd.Series([1] * num_targets + [0] * num_other_elements),
pd.Series([target_weight] * num_targets +
[other_weight] * num_other_elements))
# With the above constants, the probability of violating this invariant
# (as computed using the Bernoulli distribution) is about 0.0012%.
expected = num_samples * target_prob
self.assertTrue(expected / 3 < result < expected * 2, (expected, result))
def test_split_pandas_examples_no_expand(self):
# if expand=False (default), then no need to cast dtype to be
# CategoricalDtype.
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
result = self._evaluate(lambda s: s.str.split(), s)
self.assert_frame_data_equivalent(result, s.str.split())
result = self._evaluate(lambda s: s.str.rsplit(), s)
self.assert_frame_data_equivalent(result, s.str.rsplit())
result = self._evaluate(lambda s: s.str.split(n=2), s)
self.assert_frame_data_equivalent(result, s.str.split(n=2))
result = self._evaluate(lambda s: s.str.rsplit(n=2), s)
self.assert_frame_data_equivalent(result, s.str.rsplit(n=2))
result = self._evaluate(lambda s: s.str.split(pat="/"), s)
self.assert_frame_data_equivalent(result, s.str.split(pat="/"))
def test_split_pandas_examples_expand_not_categorical(self):
# When expand=True, there is exception because series is not categorical
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"split\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.split(expand=True), s)
with self.assertRaisesRegex(
frame_base.WontImplementError,
r"rsplit\(\) of non-categorical type is not supported"):
self._evaluate(lambda s: s.str.rsplit(expand=True), s)
def test_split_pandas_examples_expand_pat_is_string_literal1(self):
# When expand=True and pattern is treated as a string literal
s = pd.Series([
"this is a regular sentence",
"https://docs.python.org/3/tutorial/index.html",
np.nan
])
s = s.astype(
pd.CategoricalDtype(
categories=[
'this is a regular sentence',
'https://docs.python.org/3/tutorial/index.html'
]))
result = self._evaluate(lambda s: s.str.split(expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(expand=True))
result = self._evaluate(lambda s: s.str.rsplit("/", n=1, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.rsplit("/", n=1, expand=True))
@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_string_literal2(self):
# when regex is None (default) regex pat is string literal if len(pat) == 1
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r".", expand=True), s)
self.assert_frame_data_equivalent(result, s.str.split(r".", expand=True))
# When regex=False, pat is interpreted as the string itself
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=False, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=False, expand=True))
@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pandas_examples_expand_pat_is_regex(self):
# when regex is None (default) regex pat is compiled if len(pat) != 1
s = pd.Series(["foo and bar plus baz"])
s = s.astype(pd.CategoricalDtype(categories=["foo and bar plus baz"]))
result = self._evaluate(lambda s: s.str.split(r"and|plus", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"and|plus", expand=True))
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(lambda s: s.str.split(r"\.jpg", expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", expand=True))
# When regex=True, pat is interpreted as a regex
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=True))
# A compiled regex can be passed as pat
result = self._evaluate(
lambda s: s.str.split(re.compile(r"\.jpg"), expand=True), s)
self.assert_frame_data_equivalent(
result, s.str.split(re.compile(r"\.jpg"), expand=True))
@unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4")
def test_split_pat_is_regex(self):
# regex=True, but expand=False
s = pd.Series(['foojpgbar.jpg']).astype('category')
s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"]))
result = self._evaluate(
lambda s: s.str.split(r"\.jpg", regex=True, expand=False), s)
self.assert_frame_data_equivalent(
result, s.str.split(r"\.jpg", regex=True, expand=False))
def test_astype_categorical_rejected(self):
df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
with self.assertRaisesRegex(frame_base.WontImplementError,
r"astype\(dtype='category'\)"):
self._evaluate(lambda df: df.B.astype('category'), df)
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
_ = frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(pd.Series([1, 2, 3]))).replace(
'a', 'b', limit=1)
def test_disallow_non_parallel(self):
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
def test_allow_non_parallel_in_context(self):
with beam.dataframe.allow_non_parallel_operations():
self._use_non_parallel_operation()
def test_allow_non_parallel_nesting(self):
# disallowed
with beam.dataframe.allow_non_parallel_operations():
# allowed
self._use_non_parallel_operation()
with beam.dataframe.allow_non_parallel_operations(False):
# disallowed again
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
# allowed
self._use_non_parallel_operation()
# disallowed
with self.assertRaises(expressions.NonParallelOperation):
self._use_non_parallel_operation()
class ConstructionTimeTest(unittest.TestCase):
"""Tests for operations that can be executed eagerly."""
DF = pd.DataFrame({
'str_col': ['foo', 'bar'] * 3,
'int_col': [1, 2] * 3,
'flt_col': [1.1, 2.2] * 3,
'cat_col': pd.Series(list('aabbca'), dtype="category"),
'datetime_col': pd.Series(
pd.date_range(
'1/1/2000', periods=6, freq='m', tz='America/Los_Angeles'))
})
DEFERRED_DF = frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(DF.iloc[:0]))
def _run_test(self, fn):
expected = fn(self.DF)
actual = fn(self.DEFERRED_DF)
if isinstance(expected, pd.Index):
pd.testing.assert_index_equal(expected, actual)
elif isinstance(expected, pd.Series):
pd.testing.assert_series_equal(expected, actual)
elif isinstance(expected, pd.DataFrame):
pd.testing.assert_frame_equal(expected, actual)
else:
self.assertEqual(expected, actual)
@parameterized.expand(DF.columns)
def test_series_name(self, col_name):
self._run_test(lambda df: df[col_name].name)
@parameterized.expand(DF.columns)
def test_series_dtype(self, col_name):
self._run_test(lambda df: df[col_name].dtype)
self._run_test(lambda df: df[col_name].dtypes)
def test_dataframe_columns(self):
self._run_test(lambda df: list(df.columns))
def test_dataframe_dtypes(self):
self._run_test(lambda df: list(df.dtypes))
def test_categories(self):
self._run_test(lambda df: df.cat_col.cat.categories)
def test_categorical_ordered(self):
self._run_test(lambda df: df.cat_col.cat.ordered)
def test_groupby_ndim(self):
self._run_test(lambda df: df.groupby('int_col').ndim)
def test_groupby_project_ndim(self):
self._run_test(lambda df: df.groupby('int_col').flt_col.ndim)
self._run_test(
lambda df: df.groupby('int_col')[['flt_col', 'str_col']].ndim)
def test_get_column_default_None(self):
# .get just returns default_value=None at construction time if the column
# doesn't exist
self._run_test(lambda df: df.get('FOO'))
def test_datetime_tz(self):
self._run_test(lambda df: df.datetime_col.dt.tz)
class DocstringTest(unittest.TestCase):
@parameterized.expand([
(frames.DeferredDataFrame, pd.DataFrame),
(frames.DeferredSeries, pd.Series),
#(frames._DeferredIndex, pd.Index),
(frames._DeferredStringMethods, pd.Series.str),
(
frames._DeferredCategoricalMethods,
pd.core.arrays.categorical.CategoricalAccessor),
(frames.DeferredGroupBy, pd.core.groupby.generic.DataFrameGroupBy),
(frames._DeferredGroupByCols, pd.core.groupby.generic.DataFrameGroupBy),
(
frames._DeferredDatetimeMethods,
pd.core.indexes.accessors.DatetimeProperties),
])
def test_docs_defined(self, beam_type, pd_type):
beam_attrs = set(dir(beam_type))
pd_attrs = set(dir(pd_type))
docstring_required = sorted([
attr for attr in beam_attrs.intersection(pd_attrs)
if getattr(pd_type, attr).__doc__ and not attr.startswith('_')
])
docstring_missing = [
attr for attr in docstring_required
if not getattr(beam_type, attr).__doc__
]
self.assertTrue(
len(docstring_missing) == 0,
f'{beam_type.__name__} is missing a docstring for '
f'{len(docstring_missing)}/{len(docstring_required)} '
f'({len(docstring_missing)/len(docstring_required):%}) '
f'operations:\n{docstring_missing}')
class ReprTest(unittest.TestCase):
def test_basic_dataframe(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['group', 'foo', 'bar', 'baz', 'bool', "
"'str'], index=<unnamed>)"))
def test_dataframe_with_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index('group')))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool', 'str'], "
"index='group')"))
def test_dataframe_with_partial_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index([GROUPBY_DF.index, 'group'])))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool', 'str'], "
"indexes=[<unnamed>, 'group'])"))
def test_dataframe_with_named_multi_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index(['str', 'group'])))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=['foo', 'bar', 'baz', 'bool'], "
"indexes=['str', 'group'])"))
def test_dataframe_with_multiple_column_levels(self):
df = pd.DataFrame({
'foofoofoo': ['one', 'one', 'one', 'two', 'two', 'two'],
'barbar': ['A', 'B', 'C', 'A', 'B', 'C'],
'bazzy': [1, 2, 3, 4, 5, 6],
'zoop': ['x', 'y', 'z', 'q', 'w', 't']
})
df = df.pivot(index='foofoofoo', columns='barbar')
df = frame_base.DeferredFrame.wrap(expressions.ConstantExpression(df))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=[('bazzy', 'A'), ('bazzy', 'B'), "
"('bazzy', 'C'), ('zoop', 'A'), ('zoop', 'B'), ('zoop', 'C')], "
"index='foofoofoo')"))
def test_dataframe_with_multiple_column_and_multiple_index_levels(self):
df = pd.DataFrame({
'foofoofoo': ['one', 'one', 'one', 'two', 'two', 'two'],
'barbar': ['A', 'B', 'C', 'A', 'B', 'C'],
'bazzy': [1, 2, 3, 4, 5, 6],
'zoop': ['x', 'y', 'z', 'q', 'w', 't']
})
df = df.pivot(index='foofoofoo', columns='barbar')
df.index = [['a', 'b'], df.index]
# pandas repr displays this:
# bazzy zoop
# barbar A B C A B C
# foofoofoo
# a one 1 2 3 x y z
# b two 4 5 6 q w t
df = frame_base.DeferredFrame.wrap(expressions.ConstantExpression(df))
self.assertEqual(
repr(df),
(
"DeferredDataFrame(columns=[('bazzy', 'A'), ('bazzy', 'B'), "
"('bazzy', 'C'), ('zoop', 'A'), ('zoop', 'B'), ('zoop', 'C')], "
"indexes=[<unnamed>, 'foofoofoo'])"))
def test_basic_series(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF['bool']))
self.assertEqual(
repr(df), "DeferredSeries(name='bool', dtype=bool, index=<unnamed>)")
def test_series_with_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(GROUPBY_DF.set_index('group')['str']))
self.assertEqual(
repr(df), "DeferredSeries(name='str', dtype=object, index='group')")
def test_series_with_partial_named_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index([GROUPBY_DF.index, 'group'])['bar']))
self.assertEqual(
repr(df),
(
"DeferredSeries(name='bar', dtype=float64, "
"indexes=[<unnamed>, 'group'])"))
def test_series_with_named_multi_index(self):
df = frame_base.DeferredFrame.wrap(
expressions.ConstantExpression(
GROUPBY_DF.set_index(['str', 'group'])['baz']))
self.assertEqual(
repr(df),
"DeferredSeries(name='baz', dtype=float64, indexes=['str', 'group'])")
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@isolated_env
class InteractiveDataFrameTest(unittest.TestCase):
def test_collect_merged_dataframes(self):
p = beam.Pipeline(InteractiveRunner())
pcoll_1 = (
p
| 'Create data 1' >> beam.Create([(1, 'a'), (2, 'b'), (3, 'c'),
(4, 'd')])
|
'To rows 1' >> beam.Select(col_1=lambda x: x[0], col_2=lambda x: x[1]))
df_1 = to_dataframe(pcoll_1)
pcoll_2 = (
p
| 'Create data 2' >> beam.Create([(5, 'e'), (6, 'f'), (7, 'g'),
(8, 'h')])
|
'To rows 2' >> beam.Select(col_3=lambda x: x[0], col_4=lambda x: x[1]))
df_2 = to_dataframe(pcoll_2)
df_merged = df_1.merge(df_2, left_index=True, right_index=True)
pd_df = ib.collect(df_merged).sort_values(by='col_1')
self.assertEqual(pd_df.shape, (4, 4))
self.assertEqual(list(pd_df['col_1']), [1, 2, 3, 4])
self.assertEqual(list(pd_df['col_2']), ['a', 'b', 'c', 'd'])
self.assertEqual(list(pd_df['col_3']), [5, 6, 7, 8])
self.assertEqual(list(pd_df['col_4']), ['e', 'f', 'g', 'h'])
if __name__ == '__main__':
unittest.main()