blob: 82298f5def0979587822f9676a66a0d7197ec3cb [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for interactive utilities without explicitly using InteractiveRunner.
"""
# pytype: skip-file
import importlib
import sys
import unittest
from collections import defaultdict
from typing import NamedTuple
import pandas as pd
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive.testing.mock_env import isolated_env
from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils.windowed_value import PaneInfo
from apache_beam.utils.windowed_value import PaneInfoTiming
def print_with_message(msg):
def printer(elem):
print(msg, elem)
return elem
return printer
class Record(NamedTuple):
name: str
age: int
height: int
_side_effects = defaultdict(int)
def cause_side_effect(elem):
mod = importlib.import_module(__name__)
mod._side_effects[elem] += 1
return elem
def count_side_effects(elem):
mod = importlib.import_module(__name__)
return mod._side_effects[elem]
def clear_side_effect():
mod = importlib.import_module(__name__)
mod._side_effects.clear()
@isolated_env
class NonInteractiveRunnerTest(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_basic(self):
clear_side_effect()
# This test relies on the pipeline cache being populated. Prism doesn't
# consistently populate this cache, forcing FnApiRunner
p = beam.Pipeline('FnApiRunner')
# Initial collection runs the pipeline.
pcoll1 = p | beam.Create(['a', 'b', 'c']) | beam.Map(cause_side_effect)
collected1 = ib.collect(pcoll1)
self.assertEqual(set(collected1[0]), set(['a', 'b', 'c']))
self.assertEqual(count_side_effects('a'), 1)
# Collecting the PCollection again uses the cache.
collected1again = ib.collect(pcoll1)
self.assertEqual(set(collected1again[0]), set(['a', 'b', 'c']))
self.assertEqual(count_side_effects('a'), 1)
# Using the PCollection uses the cache.
pcoll2 = pcoll1 | beam.Map(str.upper)
collected2 = ib.collect(pcoll2)
self.assertEqual(set(collected2[0]), set(['A', 'B', 'C']))
self.assertEqual(count_side_effects('a'), 1)
# Force re-computation.
collected2 = ib.collect(pcoll2, force_compute=True)
self.assertEqual(set(collected2[0]), set(['A', 'B', 'C']))
self.assertEqual(count_side_effects('a'), 2)
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_multiple_collect(self):
clear_side_effect()
p = beam.Pipeline(direct_runner.DirectRunner())
# Initial collection runs the pipeline.
pcollA = p | 'A' >> beam.Create(['a']) | 'As' >> beam.Map(cause_side_effect)
pcollB = p | 'B' >> beam.Create(['b']) | 'Bs' >> beam.Map(cause_side_effect)
collectedA, collectedB = ib.collect(pcollA, pcollB)
self.assertEqual(set(collectedA[0]), set(['a']))
self.assertEqual(set(collectedB[0]), set(['b']))
self.assertEqual(count_side_effects('a'), 1)
self.assertEqual(count_side_effects('b'), 1)
# Collecting the PCollection again uses the cache.
collectedA, collectedB = ib.collect(pcollA, pcollB)
self.assertEqual(set(collectedA[0]), set(['a']))
self.assertEqual(set(collectedB[0]), set(['b']))
self.assertEqual(count_side_effects('a'), 1)
self.assertEqual(count_side_effects('b'), 1)
# Using the PCollection uses the cache.
pcollAA = pcollA | beam.Map(
lambda x: 2 * x) | 'AAs' >> beam.Map(cause_side_effect)
collectedA, collectedB, collectedAA = ib.collect(pcollA, pcollB, pcollAA)
self.assertEqual(set(collectedA[0]), set(['a']))
self.assertEqual(set(collectedB[0]), set(['b']))
self.assertEqual(set(collectedAA[0]), set(['aa']))
self.assertEqual(count_side_effects('a'), 1)
self.assertEqual(count_side_effects('b'), 1)
self.assertEqual(count_side_effects('aa'), 1)
# Duplicates are only computed once.
pcollBB = pcollB | beam.Map(
lambda x: 2 * x) | 'BBs' >> beam.Map(cause_side_effect)
collectedAA, collectedAAagain, collectedBB, collectedBBagain = ib.collect(
pcollAA, pcollAA, pcollBB, pcollBB)
self.assertEqual(set(collectedAA[0]), set(['aa']))
self.assertEqual(set(collectedAAagain[0]), set(['aa']))
self.assertEqual(set(collectedBB[0]), set(['bb']))
self.assertEqual(set(collectedBBagain[0]), set(['bb']))
self.assertEqual(count_side_effects('aa'), 1)
self.assertEqual(count_side_effects('bb'), 1)
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
def process(self, element):
text_line = element.strip()
words = text_line.split()
return words
p = beam.Pipeline(runner=direct_runner.DirectRunner())
# Count the occurrences of each word.
counts = (
p
| beam.Create(['to be or not to be that is the question'])
| 'split' >> beam.ParDo(WordExtractingDoFn())
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1]))))
actual = ib.collect(counts)
self.assertSetEqual(
set(zip(actual[0], actual[1])),
set([
('or', 1),
('that', 1),
('be', 2),
('is', 1),
('question', 1),
('to', 2),
('the', 1),
('not', 1),
]))
# Truncate the precision to millis because the window coder uses millis
# as units then gets upcast to micros.
end_of_window = (GlobalWindow().max_timestamp().micros // 1000) * 1000
df_counts = ib.collect(counts, include_window_info=True, n=10)
df_expected = pd.DataFrame({
0: list(actual[0]),
1: list(actual[1]),
'event_time': [end_of_window] * len(actual),
'windows': [[GlobalWindow()]] * len(actual),
'pane_info': [PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0)] *
len(actual)
},
)
pd.testing.assert_frame_equal(df_expected, df_counts)
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_dataframes(self):
p = beam.Pipeline(runner=direct_runner.DirectRunner())
data = p | beam.Create(
[1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
df = to_dataframe(data)
df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
pd.testing.assert_frame_equal(
df_expected, ib.collect(df, n=10).reset_index(drop=True))
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_dataframes_with_grouped_index(self):
p = beam.Pipeline(runner=direct_runner.DirectRunner())
data = [
Record('a', 20, 170),
Record('a', 30, 170),
Record('b', 22, 180),
Record('c', 18, 150)
]
aggregate = lambda df: df.groupby('height').mean(numeric_only=True)
deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
df_expected = aggregate(pd.DataFrame(data))
pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_dataframes_with_multi_index(self):
p = beam.Pipeline(runner=direct_runner.DirectRunner())
data = [
Record('a', 20, 170),
Record('a', 30, 170),
Record('b', 22, 180),
Record('c', 18, 150)
]
aggregate = lambda df: df.groupby(['name', 'height']).mean()
deferred_df = aggregate(to_dataframe(p | beam.Create(data)))
df_input = pd.DataFrame(data)
df_input.name = df_input.name.astype(pd.StringDtype())
df_expected = aggregate(df_input)
pd.testing.assert_frame_equal(df_expected, ib.collect(deferred_df, n=10))
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_dataframes_same_cell_twice(self):
p = beam.Pipeline(runner=direct_runner.DirectRunner())
data = p | beam.Create(
[1, 2, 3]) | beam.Map(lambda x: beam.Row(square=x * x, cube=x * x * x))
df = to_dataframe(data)
df_expected = pd.DataFrame({'square': [1, 4, 9], 'cube': [1, 8, 27]})
pd.testing.assert_series_equal(
df_expected['square'],
ib.collect(df['square'], n=10).reset_index(drop=True))
pd.testing.assert_series_equal(
df_expected['cube'],
ib.collect(df['cube'], n=10).reset_index(drop=True))
@unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
def test_new_runner_and_options(self):
class MyRunner(beam.runners.PipelineRunner):
run_count = 0
@classmethod
def run_pipeline(cls, pipeline, options):
assert options._all_options['my_option'] == 123
cls.run_count += 1
return direct_runner.DirectRunner().run_pipeline(pipeline, options)
clear_side_effect()
p = beam.Pipeline(direct_runner.DirectRunner())
# Initial collection runs the pipeline.
pcoll1 = p | beam.Create(['a', 'b', 'c']) | beam.Map(cause_side_effect)
collected1 = ib.collect(pcoll1)
self.assertEqual(set(collected1[0]), set(['a', 'b', 'c']))
self.assertEqual(count_side_effects('a'), 1)
# Using the PCollection uses the cache with a different runner and options.
pcoll2 = pcoll1 | beam.Map(str.upper)
collected2 = ib.collect(
pcoll2,
runner=MyRunner(),
options=beam.options.pipeline_options.PipelineOptions(my_option=123))
self.assertEqual(set(collected2[0]), set(['A', 'B', 'C']))
self.assertEqual(count_side_effects('a'), 1)
self.assertEqual(MyRunner.run_count, 1)
if __name__ == '__main__':
unittest.main()