blob: a7ce09318820158b30eedca609881952a42ed7f7 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# To run the pipelines locally:
# python -m apache_beam.examples.sinks.test_write_bounded
# This file contains multiple examples of writing bounded PCollection to files
import argparse
import json
import logging
import pyarrow
import apache_beam as beam
from apache_beam.io.fileio import WriteToFiles
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners.runner import PipelineResult
from apache_beam.transforms.util import LogElements
def run(argv=None, save_main_session=True) -> PipelineResult:
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
output = (
p | beam.Create([{
'age': 10
}, {
'age': 20
}, {
'age': 30
}])
| beam.LogElements(
prefix='before write ', with_window=False, level=logging.INFO))
#TextIO
output2 = output | 'Write to text' >> WriteToText(
file_path_prefix="__output_batch__/ouput_WriteToText",
file_name_suffix=".txt",
shard_name_template='-U-SSSSS-of-NNNNN')
_ = output2 | 'LogElements after WriteToText' >> LogElements(
prefix='after WriteToText ', with_window=False, level=logging.INFO)
#FileIO
output3 = (
output | 'Serialize' >> beam.Map(json.dumps)
| 'Write to files' >>
WriteToFiles(path="__output_batch__/output_WriteToFiles"))
_ = output3 | 'LogElements after WriteToFiles' >> LogElements(
prefix='after WriteToFiles ', with_window=False, level=logging.INFO)
#ParquetIO
output4 = output | 'Write' >> beam.io.WriteToParquet(
file_path_prefix="__output_batch__/output_parquet",
schema=pyarrow.schema([('age', pyarrow.int64())]))
_ = output4 | 'LogElements after WriteToParquet' >> LogElements(
prefix='after WriteToParquet ', with_window=False, level=logging.INFO)
_ = output | 'Write parquet' >> beam.io.WriteToParquet(
file_path_prefix="__output_batch__/output_WriteToParquet",
schema=pyarrow.schema([('age', pyarrow.int64())]),
record_batch_size=10,
num_shards=0)
# Execute the pipeline and return the result.
result = p.run()
result.wait_until_finish()
return result
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()