blob: 2d62213e286986635a08587b0adaaac30e6be760 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import unittest
import apache_beam as beam
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml.readme_test import create_test_method
from apache_beam.yaml.yaml_transform import YamlTransform
DATA = [
beam.Row(animal='cat', weight=1),
beam.Row(animal='cat', weight=5),
beam.Row(animal='dog', weight=10),
]
class ProgrammingGuideTest(unittest.TestCase):
test_pipelines_constructing_reading = create_test_method(
'RUN',
'test_pipelines_constructing_reading',
'''
# [START pipelines_constructing_reading]
pipeline:
source:
type: ReadFromText
config:
path: ...
# [END pipelines_constructing_reading]
transforms: []
''')
test_create_pcollection = create_test_method(
'RUN',
'test_create_pcollection',
'''
# [START create_pcollection]
pipeline:
transforms:
- type: Create
config:
elements:
- A
- B
- ...
# [END create_pcollection]
transforms: []
''')
def test_group_by(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
result = elements | YamlTransform(
'''
# [START group_by]
type: Combine
config:
group_by: animal
combine:
weight: group
# [END group_by]
''')
assert_that(
result | beam.Map(lambda x: beam.Row(**x._asdict())),
equal_to([
beam.Row(animal='cat', weight=[1, 5]),
beam.Row(animal='dog', weight=[10]),
]))
def test_co_group_by(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: composite
transforms:
# [START cogroupbykey_inputs]
- type: Create
name: CreateEmails
config:
elements:
- { name: "amy", email: "amy@example.com" }
- { name: "carl", email: "carl@example.com" }
- { name: "julia", email: "julia@example.com" }
- { name: "carl", email: "carl@email.com" }
- type: Create
name: CreatePhones
config:
elements:
- { name: "amy", phone: "111-222-3333" }
- { name: "james", phone: "222-333-4444" }
- { name: "amy", phone: "333-444-5555" }
- { name: "carl", phone: "444-555-6666" }
# [END cogroupbykey_inputs]
# [START cogroupbykey]
- type: MapToFields
name: PrepareEmails
input: CreateEmails
config:
language: python
fields:
name: name
email: "[email]"
phone: "[]"
- type: MapToFields
name: PreparePhones
input: CreatePhones
config:
language: python
fields:
name: name
email: "[]"
phone: "[phone]"
- type: Combine
name: CoGropuBy
input: [PrepareEmails, PreparePhones]
config:
group_by: [name]
combine:
email: concat
phone: concat
- type: MapToFields
name: FormatResults
input: CoGropuBy
config:
language: python
fields:
formatted:
"'%s; %s; %s' % (name, sorted(email), sorted(phone))"
# [END cogroupbykey]
output: FormatResults
''')
assert_that(
result | beam.Map(lambda x: x.formatted),
equal_to([
# [START cogroupbykey_formatted_outputs]
"amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
"carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
"james; []; ['222-333-4444']",
"julia; ['julia@example.com']; []",
# [END cogroupbykey_formatted_outputs]
]))
def test_combine_ref(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
result = elements | YamlTransform(
'''
# [START combine_simple_py]
type: Combine
config:
language: python
group_by: animal
combine:
biggest:
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 2
value: weight
# [END combine_simple_py]
''')
assert_that(
result | beam.Map(lambda x: beam.Row(**x._asdict())),
equal_to([
beam.Row(animal='cat', biggest=[5, 1]),
beam.Row(animal='dog', biggest=[10]),
]))
def test_combine_globally(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
result = elements | YamlTransform(
'''
# [START combine_global_sum]
type: Combine
config:
group_by: []
combine:
weight: sum
# [END combine_global_sum]
''')
assert_that(
result | beam.Map(lambda x: beam.Row(**x._asdict())),
equal_to([
beam.Row(weight=16),
]))
def test_combine_per_key(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
result = elements | YamlTransform(
'''
# [START combine_per_key]
type: Combine
config:
group_by: [animal]
combine:
total_weight:
fn: sum
value: weight
average_weight:
fn: mean
value: weight
# [END combine_per_key]
''')
assert_that(
result | beam.Map(lambda x: beam.Row(**x._asdict())),
equal_to([
beam.Row(animal='cat', total_weight=6, average_weight=3),
beam.Row(animal='dog', total_weight=10, average_weight=10),
]))
def test_flatten(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
name: SomeProducingTransform
config:
elements: [a, b, c]
- type: Create
name: AnotherProducingTransform
config:
elements: [d, e, f]
# [START model_multiple_pcollections_flatten]
- type: Flatten
input: [SomeProducingTransform, AnotherProducingTransform]
# [END model_multiple_pcollections_flatten]
output: Flatten
''')
assert_that(
result | beam.Map(lambda x: x.element),
equal_to(['a', 'b', 'c', 'd', 'e', 'f']))
def test_expode(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
name: SomeProducingTransform
config:
elements:
- {line: "a aa"}
- {line: "b"}
# [START model_multiple_output_dofn]
- type: MapToFields
input: SomeProducingTransform
config:
language: python
fields:
word: "line.split()"
- type: Explode
input: MapToFields
config:
fields: word
# [END model_multiple_output_dofn]
output: Explode
''')
assert_that(
result | beam.Map(lambda x: x.word), equal_to(['a', 'aa', 'b']))
def test_schema_output_type(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
result = elements | YamlTransform(
'''
# [START schema_output_type]
type: MapToFields
config:
language: python
fields:
new_field:
expression: "hex(weight)"
output_type: { "type": "string" }
# [END schema_output_type]
''')
assert_that(
result | beam.Map(lambda x: x.new_field),
equal_to(['0x1', '0x5', '0xa']))
def test_fixed_windows(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
_ = elements | YamlTransform(
'''
# [START setting_fixed_windows]
type: WindowInto
windowing:
type: fixed
size: 60s
# [END setting_fixed_windows]
''')
def test_sliding_windows(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
_ = elements | YamlTransform(
'''
# [START setting_sliding_windows]
type: WindowInto
windowing:
type: sliding
size: 5m
period: 30s
# [END setting_sliding_windows]
''')
def test_session_windows(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
_ = elements | YamlTransform(
'''
# [START setting_session_windows]
type: WindowInto
windowing:
type: sessions
gap: 60s
# [END setting_session_windows]
''')
def test_global_windows(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create(DATA)
_ = elements | YamlTransform(
'''
# [START setting_global_window]
type: WindowInto
windowing:
type: global
# [END setting_global_window]
''')
def test_assign_timestamps(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create([
beam.Row(external_timestamp_field='2024-01-01', value=1),
beam.Row(external_timestamp_field='2024-01-01', value=2),
beam.Row(external_timestamp_field='2024-01-02', value=10),
])
_ = elements | YamlTransform(
'''
# [START setting_timestamp]
type: AssignTimestamps
config:
language: python
timestamp:
callable: |
import datetime
def extract_timestamp(x):
raw = datetime.datetime.strptime(
x.external_timestamp_field, "%Y-%m-%d")
return raw.astimezone(datetime.timezone.utc)
# [END setting_timestamp]
''')
def test_partition(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
elements = p | beam.Create([
beam.Row(percentile=1),
beam.Row(percentile=20),
beam.Row(percentile=90),
])
_ = elements | YamlTransform(
'''
# [START model_multiple_pcollections_partition]
type: Partition
config:
by: str(percentile // 10)
language: python
outputs: ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
# [END model_multiple_pcollections_partition]
''')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()