|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | """``PTransforms`` for reading from and writing to Avro files. | 
|  |  | 
|  | Provides two read ``PTransform``s, ``ReadFromAvro`` and ``ReadAllFromAvro``, | 
|  | that produces a ``PCollection`` of records. | 
|  | Each record of this ``PCollection`` will contain a single record read from | 
|  | an Avro file. Records that are of simple types will be mapped into | 
|  | corresponding Python types. Records that are of Avro type 'RECORD' will be | 
|  | mapped to Python dictionaries that comply with the schema contained in the | 
|  | Avro file that contains those records. In this case, keys of each dictionary | 
|  | will contain the corresponding field names and will be of type ``string`` | 
|  | while the values of the dictionary will be of the type defined in the | 
|  | corresponding Avro schema. | 
|  |  | 
|  | For example, if schema of the Avro file is the following. | 
|  | {"namespace": "example.avro","type": "record","name": "User","fields": | 
|  | [{"name": "name", "type": "string"}, | 
|  | {"name": "favorite_number",  "type": ["int", "null"]}, | 
|  | {"name": "favorite_color", "type": ["string", "null"]}]} | 
|  |  | 
|  | Then records generated by read transforms will be dictionaries of the | 
|  | following form. | 
|  | {'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}). | 
|  |  | 
|  | Additionally, this module provides a write ``PTransform`` ``WriteToAvro`` | 
|  | that can be used to write a given ``PCollection`` of Python objects to an | 
|  | Avro file. | 
|  | """ | 
|  | # pytype: skip-file | 
|  | import os | 
|  | from functools import partial | 
|  | from typing import Any | 
|  | from typing import Callable | 
|  | from typing import Dict | 
|  | from typing import List | 
|  | from typing import Union | 
|  |  | 
|  | import fastavro | 
|  | from fastavro.read import block_reader | 
|  | from fastavro.write import Writer | 
|  |  | 
|  | import apache_beam as beam | 
|  | from apache_beam.io import filebasedsink | 
|  | from apache_beam.io import filebasedsource | 
|  | from apache_beam.io import iobase | 
|  | from apache_beam.io.filesystem import CompressionTypes | 
|  | from apache_beam.io.filesystems import FileSystems | 
|  | from apache_beam.io.iobase import Read | 
|  | from apache_beam.portability.api import schema_pb2 | 
|  | from apache_beam.transforms import PTransform | 
|  | from apache_beam.typehints import schemas | 
|  |  | 
|  | __all__ = [ | 
|  | 'ReadFromAvro', | 
|  | 'ReadAllFromAvro', | 
|  | 'ReadAllFromAvroContinuously', | 
|  | 'WriteToAvro' | 
|  | ] | 
|  |  | 
|  |  | 
|  | class ReadFromAvro(PTransform): | 
|  | """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro | 
|  | files.""" | 
|  | def __init__( | 
|  | self, | 
|  | file_pattern=None, | 
|  | min_bundle_size=0, | 
|  | validate=True, | 
|  | use_fastavro=True, | 
|  | as_rows=False): | 
|  | """Initializes :class:`ReadFromAvro`. | 
|  |  | 
|  | Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro | 
|  | files defined by a given file pattern. | 
|  |  | 
|  | If ``/mypath/myavrofiles*`` is a file-pattern that points to a set of Avro | 
|  | files, a :class:`~apache_beam.pvalue.PCollection` for the records in | 
|  | these Avro files can be created in the following manner. | 
|  |  | 
|  | .. testcode:: | 
|  |  | 
|  | with beam.Pipeline() as p: | 
|  | records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*') | 
|  |  | 
|  | .. NOTE: We're not actually interested in this error; but if we get here, | 
|  | it means that the way of calling this transform hasn't changed. | 
|  |  | 
|  | .. testoutput:: | 
|  | :hide: | 
|  |  | 
|  | Traceback (most recent call last): | 
|  | ... | 
|  | OSError: No files found based on the file pattern | 
|  |  | 
|  | Each record of this :class:`~apache_beam.pvalue.PCollection` will contain | 
|  | a single record read from a source. Records that are of simple types will be | 
|  | mapped into corresponding Python types. Records that are of Avro type | 
|  | ``RECORD`` will be mapped to Python dictionaries that comply with the schema | 
|  | contained in the Avro file that contains those records. In this case, keys | 
|  | of each dictionary will contain the corresponding field names and will be of | 
|  | type :class:`str` while the values of the dictionary will be of the type | 
|  | defined in the corresponding Avro schema. | 
|  |  | 
|  | For example, if schema of the Avro file is the following. :: | 
|  |  | 
|  | { | 
|  | "namespace": "example.avro", | 
|  | "type": "record", | 
|  | "name": "User", | 
|  | "fields": [ | 
|  |  | 
|  | {"name": "name", | 
|  | "type": "string"}, | 
|  |  | 
|  | {"name": "favorite_number", | 
|  | "type": ["int", "null"]}, | 
|  |  | 
|  | {"name": "favorite_color", | 
|  | "type": ["string", "null"]} | 
|  |  | 
|  | ] | 
|  | } | 
|  |  | 
|  | Then records generated by :class:`~apache_beam.io._AvroSource` will be | 
|  | dictionaries of the following form. :: | 
|  |  | 
|  | {'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': None}). | 
|  |  | 
|  | Args: | 
|  | file_pattern (str): the file glob to read | 
|  | min_bundle_size (int): the minimum size in bytes, to be considered when | 
|  | splitting the input into bundles. | 
|  | validate (bool): flag to verify that the files exist during the pipeline | 
|  | creation time. | 
|  | use_fastavro (bool): This flag is left for API backwards compatibility | 
|  | and no longer has an effect.  Do not use. | 
|  | as_rows (bool): Whether to return a schema'd PCollection of Beam rows. | 
|  | """ | 
|  | super().__init__() | 
|  | self._source = _FastAvroSource( | 
|  | file_pattern, min_bundle_size, validate=validate) | 
|  | if as_rows: | 
|  | path = FileSystems.match([file_pattern], [1])[0].metadata_list[0].path | 
|  | with FileSystems.open(path) as fin: | 
|  | avro_schema = fastavro.reader(fin).writer_schema | 
|  | beam_schema = avro_schema_to_beam_schema(avro_schema) | 
|  | self._post_process = avro_dict_to_beam_row(avro_schema, beam_schema) | 
|  | else: | 
|  | self._post_process = None | 
|  |  | 
|  | def expand(self, pvalue): | 
|  | records = pvalue.pipeline | Read(self._source) | 
|  | if self._post_process: | 
|  | return records | beam.Map(self._post_process) | 
|  | else: | 
|  | return records | 
|  |  | 
|  | def display_data(self): | 
|  | return {'source_dd': self._source} | 
|  |  | 
|  |  | 
|  | class ReadAllFromAvro(PTransform): | 
|  | """A ``PTransform`` for reading ``PCollection`` of Avro files. | 
|  |  | 
|  | Uses source '_AvroSource' to read a ``PCollection`` of Avro files or file | 
|  | patterns and produce a ``PCollection`` of Avro records. | 
|  |  | 
|  | This implementation is only tested with batch pipeline. In streaming, | 
|  | reading may happen with delay due to the limitation in ReShuffle involved. | 
|  | """ | 
|  |  | 
|  | DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024  # 64MB | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | min_bundle_size=0, | 
|  | desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, | 
|  | use_fastavro=True, | 
|  | with_filename=False, | 
|  | label='ReadAllFiles'): | 
|  | """Initializes ``ReadAllFromAvro``. | 
|  |  | 
|  | Args: | 
|  | min_bundle_size: the minimum size in bytes, to be considered when | 
|  | splitting the input into bundles. | 
|  | desired_bundle_size: the desired size in bytes, to be considered when | 
|  | splitting the input into bundles. | 
|  | use_fastavro (bool): This flag is left for API backwards compatibility | 
|  | and no longer has an effect. Do not use. | 
|  | with_filename: If True, returns a Key Value with the key being the file | 
|  | name and the value being the actual data. If False, it only returns | 
|  | the data. | 
|  | """ | 
|  | source_from_file = partial(_FastAvroSource, min_bundle_size=min_bundle_size) | 
|  | self._read_all_files = filebasedsource.ReadAllFiles( | 
|  | True, | 
|  | CompressionTypes.AUTO, | 
|  | desired_bundle_size, | 
|  | min_bundle_size, | 
|  | source_from_file, | 
|  | with_filename) | 
|  |  | 
|  | self.label = label | 
|  |  | 
|  | def expand(self, pvalue): | 
|  | return pvalue | self.label >> self._read_all_files | 
|  |  | 
|  |  | 
|  | class ReadAllFromAvroContinuously(ReadAllFromAvro): | 
|  | """A ``PTransform`` for reading avro files in given file patterns. | 
|  | This PTransform acts as a Source and produces continuously a ``PCollection`` | 
|  | of Avro records. | 
|  |  | 
|  | For more details, see ``ReadAllFromAvro`` for avro parsing settings; | 
|  | see ``apache_beam.io.fileio.MatchContinuously`` for watching settings. | 
|  |  | 
|  | ReadAllFromAvroContinuously is experimental.  No backwards-compatibility | 
|  | guarantees. Due to the limitation on Reshuffle, current implementation does | 
|  | not scale. | 
|  | """ | 
|  | _ARGS_FOR_MATCH = ( | 
|  | 'interval', | 
|  | 'has_deduplication', | 
|  | 'start_timestamp', | 
|  | 'stop_timestamp', | 
|  | 'match_updated_files', | 
|  | 'apply_windowing') | 
|  | _ARGS_FOR_READ = ( | 
|  | 'min_bundle_size', 'desired_bundle_size', 'use_fastavro', 'with_filename') | 
|  |  | 
|  | def __init__(self, file_pattern, label='ReadAllFilesContinuously', **kwargs): | 
|  | """Initialize the ``ReadAllFromAvroContinuously`` transform. | 
|  |  | 
|  | Accepts args for constructor args of both :class:`ReadAllFromAvro` and | 
|  | :class:`~apache_beam.io.fileio.MatchContinuously`. | 
|  | """ | 
|  | kwargs_for_match = { | 
|  | k: v | 
|  | for (k, v) in kwargs.items() if k in self._ARGS_FOR_MATCH | 
|  | } | 
|  | kwargs_for_read = { | 
|  | k: v | 
|  | for (k, v) in kwargs.items() if k in self._ARGS_FOR_READ | 
|  | } | 
|  | kwargs_additinal = { | 
|  | k: v | 
|  | for (k, v) in kwargs.items() | 
|  | if k not in self._ARGS_FOR_MATCH and k not in self._ARGS_FOR_READ | 
|  | } | 
|  | super().__init__(label=label, **kwargs_for_read, **kwargs_additinal) | 
|  | self._file_pattern = file_pattern | 
|  | self._kwargs_for_match = kwargs_for_match | 
|  |  | 
|  | def expand(self, pbegin): | 
|  | # Importing locally to prevent circular dependency issues. | 
|  | from apache_beam.io.fileio import MatchContinuously | 
|  |  | 
|  | # TODO(BEAM-14497) always reshuffle once gbk always trigger works. | 
|  | return ( | 
|  | pbegin | 
|  | | MatchContinuously(self._file_pattern, **self._kwargs_for_match) | 
|  | | 'ReadAllFiles' >> self._read_all_files._disable_reshuffle()) | 
|  |  | 
|  |  | 
|  | class _AvroUtils(object): | 
|  | @staticmethod | 
|  | def advance_file_past_next_sync_marker(f, sync_marker): | 
|  | buf_size = 10000 | 
|  |  | 
|  | data = f.read(buf_size) | 
|  | while data: | 
|  | pos = data.find(sync_marker) | 
|  | if pos >= 0: | 
|  | # Adjusting the current position to the ending position of the sync | 
|  | # marker. | 
|  | backtrack = len(data) - pos - len(sync_marker) | 
|  | f.seek(-1 * backtrack, os.SEEK_CUR) | 
|  | return True | 
|  | else: | 
|  | if f.tell() >= len(sync_marker): | 
|  | # Backtracking in case we partially read the sync marker during the | 
|  | # previous read. We only have to backtrack if there are at least | 
|  | # len(sync_marker) bytes before current position. We only have to | 
|  | # backtrack (len(sync_marker) - 1) bytes. | 
|  | f.seek(-1 * (len(sync_marker) - 1), os.SEEK_CUR) | 
|  | data = f.read(buf_size) | 
|  |  | 
|  |  | 
|  | class _FastAvroSource(filebasedsource.FileBasedSource): | 
|  | """A source for reading Avro files using the `fastavro` library. | 
|  |  | 
|  | ``_FastAvroSource`` is implemented using the file-based source framework | 
|  | available in module 'filebasedsource'. Hence please refer to module | 
|  | 'filebasedsource' to fully understand how this source implements operations | 
|  | common to all file-based sources such as file-pattern expansion and splitting | 
|  | into bundles for parallel processing. | 
|  |  | 
|  | TODO: remove ``_AvroSource`` in favor of using ``_FastAvroSource`` | 
|  | everywhere once it has been more widely tested | 
|  | """ | 
|  | def read_records(self, file_name, range_tracker): | 
|  | next_block_start = -1 | 
|  |  | 
|  | def split_points_unclaimed(stop_position): | 
|  | if next_block_start >= stop_position: | 
|  | # Next block starts at or after the suggested stop position. Hence | 
|  | # there will not be split points to be claimed for the range ending at | 
|  | # suggested stop position. | 
|  | return 0 | 
|  |  | 
|  | return iobase.RangeTracker.SPLIT_POINTS_UNKNOWN | 
|  |  | 
|  | range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) | 
|  |  | 
|  | start_offset = range_tracker.start_position() | 
|  | if start_offset is None: | 
|  | start_offset = 0 | 
|  |  | 
|  | with self.open_file(file_name) as f: | 
|  | blocks = block_reader(f) | 
|  | sync_marker = blocks._header['sync'] | 
|  |  | 
|  | # We have to start at current position if previous bundle ended at the | 
|  | # end of a sync marker. | 
|  | start_offset = max(0, start_offset - len(sync_marker)) | 
|  | f.seek(start_offset) | 
|  | _AvroUtils.advance_file_past_next_sync_marker(f, sync_marker) | 
|  |  | 
|  | next_block_start = f.tell() | 
|  |  | 
|  | while range_tracker.try_claim(next_block_start): | 
|  | block = next(blocks) | 
|  | next_block_start = block.offset + block.size | 
|  | for record in block: | 
|  | yield record | 
|  |  | 
|  |  | 
|  | _create_avro_source = _FastAvroSource | 
|  |  | 
|  |  | 
|  | class WriteToAvro(beam.transforms.PTransform): | 
|  | """A ``PTransform`` for writing avro files.""" | 
|  | def __init__( | 
|  | self, | 
|  | file_path_prefix, | 
|  | schema=None, | 
|  | codec='deflate', | 
|  | file_name_suffix='', | 
|  | num_shards=0, | 
|  | shard_name_template=None, | 
|  | mime_type='application/x-avro', | 
|  | use_fastavro=True): | 
|  | """Initialize a WriteToAvro transform. | 
|  |  | 
|  | Args: | 
|  | file_path_prefix: The file path to write to. The files written will begin | 
|  | with this prefix, followed by a shard identifier (see num_shards), and | 
|  | end in a common extension, if given by file_name_suffix. In most cases, | 
|  | only this argument is specified and num_shards, shard_name_template, and | 
|  | file_name_suffix use default values. | 
|  | schema: The schema to use (dict). | 
|  | codec: The codec to use for block-level compression. Any string supported | 
|  | by the Avro specification is accepted (for example 'null'). | 
|  | file_name_suffix: Suffix for the files written. | 
|  | num_shards: The number of files (shards) used for output. If not set, the | 
|  | service will decide on the optimal number of shards. | 
|  | Constraining the number of shards is likely to reduce | 
|  | the performance of a pipeline.  Setting this value is not recommended | 
|  | unless you require a specific number of output files. | 
|  | shard_name_template: A template string containing placeholders for | 
|  | the shard number and shard count. When constructing a filename for a | 
|  | particular shard number, the upper-case letters 'S' and 'N' are | 
|  | replaced with the 0-padded shard number and shard count respectively. | 
|  | This argument can be '' in which case it behaves as if num_shards was | 
|  | set to 1 and only one file will be generated. The default pattern used | 
|  | is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template. | 
|  | mime_type: The MIME type to use for the produced files, if the filesystem | 
|  | supports specifying MIME types. | 
|  | use_fastavro (bool): This flag is left for API backwards compatibility | 
|  | and no longer has an effect. Do not use. | 
|  |  | 
|  | Returns: | 
|  | A WriteToAvro transform usable for writing. | 
|  | """ | 
|  | self._schema = schema | 
|  | self._sink_provider = lambda avro_schema: _create_avro_sink( | 
|  | file_path_prefix, | 
|  | avro_schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type) | 
|  |  | 
|  | def expand(self, pcoll): | 
|  | if self._schema: | 
|  | avro_schema = self._schema | 
|  | records = pcoll | 
|  | else: | 
|  | try: | 
|  | beam_schema = schemas.schema_from_element_type(pcoll.element_type) | 
|  | except TypeError as exn: | 
|  | raise ValueError( | 
|  | "An explicit schema is required to write non-schema'd PCollections." | 
|  | ) from exn | 
|  | avro_schema = beam_schema_to_avro_schema(beam_schema) | 
|  | records = pcoll | beam.Map( | 
|  | beam_row_to_avro_dict(avro_schema, beam_schema)) | 
|  | self._sink = self._sink_provider(avro_schema) | 
|  | return records | beam.io.iobase.Write(self._sink) | 
|  |  | 
|  | def display_data(self): | 
|  | return {'sink_dd': self._sink} | 
|  |  | 
|  |  | 
|  | def _create_avro_sink( | 
|  | file_path_prefix, | 
|  | schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type): | 
|  | if "class 'avro.schema" in str(type(schema)): | 
|  | raise ValueError( | 
|  | 'You are using Avro IO with fastavro (default with Beam on ' | 
|  | 'Python 3), but supplying a schema parsed by avro-python3. ' | 
|  | 'Please change the schema to a dict.') | 
|  | return _FastAvroSink( | 
|  | file_path_prefix, | 
|  | schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type) | 
|  |  | 
|  |  | 
|  | class _BaseAvroSink(filebasedsink.FileBasedSink): | 
|  | """A base for a sink for avro files. """ | 
|  | def __init__( | 
|  | self, | 
|  | file_path_prefix, | 
|  | schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type): | 
|  | super().__init__( | 
|  | file_path_prefix, | 
|  | file_name_suffix=file_name_suffix, | 
|  | num_shards=num_shards, | 
|  | shard_name_template=shard_name_template, | 
|  | coder=None, | 
|  | mime_type=mime_type, | 
|  | # Compression happens at the block level using the supplied codec, and | 
|  | # not at the file level. | 
|  | compression_type=CompressionTypes.UNCOMPRESSED) | 
|  | self._schema = schema | 
|  | self._codec = codec | 
|  |  | 
|  | def display_data(self): | 
|  | res = super().display_data() | 
|  | res['codec'] = str(self._codec) | 
|  | res['schema'] = str(self._schema) | 
|  | return res | 
|  |  | 
|  |  | 
|  | class _FastAvroSink(_BaseAvroSink): | 
|  | """A sink for avro files using FastAvro. """ | 
|  | def __init__( | 
|  | self, | 
|  | file_path_prefix, | 
|  | schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type): | 
|  | super().__init__( | 
|  | file_path_prefix, | 
|  | schema, | 
|  | codec, | 
|  | file_name_suffix, | 
|  | num_shards, | 
|  | shard_name_template, | 
|  | mime_type) | 
|  | self.file_handle = None | 
|  |  | 
|  | def open(self, temp_path): | 
|  | self.file_handle = super().open(temp_path) | 
|  | return Writer(self.file_handle, self._schema, self._codec) | 
|  |  | 
|  | def write_record(self, writer, value): | 
|  | writer.write(value) | 
|  |  | 
|  | def close(self, writer): | 
|  | writer.flush() | 
|  | self.file_handle.close() | 
|  |  | 
|  |  | 
|  | AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES = { | 
|  | 'boolean': schema_pb2.BOOLEAN, | 
|  | 'int': schema_pb2.INT32, | 
|  | 'long': schema_pb2.INT64, | 
|  | 'float': schema_pb2.FLOAT, | 
|  | 'double': schema_pb2.DOUBLE, | 
|  | 'bytes': schema_pb2.BYTES, | 
|  | 'string': schema_pb2.STRING, | 
|  | } | 
|  |  | 
|  | BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES = { | 
|  | v: k | 
|  | for k, v in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES.items() | 
|  | } | 
|  |  | 
|  | _AvroSchemaType = Union[str, List, Dict] | 
|  |  | 
|  |  | 
|  | def avro_type_to_beam_type(avro_type: _AvroSchemaType) -> schema_pb2.FieldType: | 
|  | if isinstance(avro_type, str): | 
|  | return avro_type_to_beam_type({'type': avro_type}) | 
|  | elif isinstance(avro_type, list): | 
|  | # Union type | 
|  | return schemas.typing_to_runner_api(Any) | 
|  | type_name = avro_type['type'] | 
|  | if type_name in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES: | 
|  | return schema_pb2.FieldType( | 
|  | atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[type_name]) | 
|  | elif type_name in ('fixed', 'enum'): | 
|  | return schema_pb2.FieldType(atomic_type=schema_pb2.STRING) | 
|  | elif type_name == 'array': | 
|  | return schema_pb2.FieldType( | 
|  | array_type=schema_pb2.ArrayType( | 
|  | element_type=avro_type_to_beam_type(avro_type['items']))) | 
|  | elif type_name == 'map': | 
|  | return schema_pb2.FieldType( | 
|  | map_type=schema_pb2.MapType( | 
|  | key_type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING), | 
|  | value_type=avro_type_to_beam_type(avro_type['values']))) | 
|  | elif type_name == 'record': | 
|  | return schema_pb2.FieldType( | 
|  | row_type=schema_pb2.RowType( | 
|  | schema=schema_pb2.Schema( | 
|  | fields=[ | 
|  | schemas.schema_field( | 
|  | f['name'], avro_type_to_beam_type(f['type'])) | 
|  | for f in avro_type['fields'] | 
|  | ]))) | 
|  | else: | 
|  | raise ValueError(f'Unable to convert {avro_type} to a Beam schema.') | 
|  |  | 
|  |  | 
|  | def avro_schema_to_beam_schema( | 
|  | avro_schema: _AvroSchemaType) -> schema_pb2.Schema: | 
|  | beam_type = avro_type_to_beam_type(avro_schema) | 
|  | if isinstance(avro_schema, dict) and avro_schema['type'] == 'record': | 
|  | return beam_type.row_type.schema | 
|  | else: | 
|  | return schema_pb2.Schema(fields=[schemas.schema_field('record', beam_type)]) | 
|  |  | 
|  |  | 
|  | def avro_dict_to_beam_row( | 
|  | avro_schema: _AvroSchemaType, | 
|  | beam_schema: schema_pb2.Schema) -> Callable[[Any], Any]: | 
|  | if isinstance(avro_schema, str): | 
|  | return avro_dict_to_beam_row({'type': avro_schema}) | 
|  | if avro_schema['type'] == 'record': | 
|  | to_row = avro_value_to_beam_value( | 
|  | schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) | 
|  | else: | 
|  |  | 
|  | def to_row(record): | 
|  | return beam.Row(record=record) | 
|  |  | 
|  | return beam.typehints.with_output_types( | 
|  | schemas.named_tuple_from_schema(beam_schema))( | 
|  | to_row) | 
|  |  | 
|  |  | 
|  | def avro_value_to_beam_value( | 
|  | beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: | 
|  | type_info = beam_type.WhichOneof("type_info") | 
|  | if type_info == "atomic_type": | 
|  | return lambda value: value | 
|  | elif type_info == "array_type": | 
|  | element_converter = avro_value_to_beam_value( | 
|  | beam_type.array_type.element_type) | 
|  | return lambda value: [element_converter(e) for e in value] | 
|  | elif type_info == "iterable_type": | 
|  | element_converter = avro_value_to_beam_value( | 
|  | beam_type.iterable_type.element_type) | 
|  | return lambda value: [element_converter(e) for e in value] | 
|  | elif type_info == "map_type": | 
|  | if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: | 
|  | raise TypeError( | 
|  | f'Only strings allowd as map keys when converting from AVRO, ' | 
|  | f'found {beam_type}') | 
|  | value_converter = avro_value_to_beam_value(beam_type.map_type.value_type) | 
|  | return lambda value: {k: value_converter(v) for (k, v) in value.items()} | 
|  | elif type_info == "row_type": | 
|  | converters = { | 
|  | field.name: avro_value_to_beam_value(field.type) | 
|  | for field in beam_type.row_type.schema.fields | 
|  | } | 
|  | return lambda value: beam.Row( | 
|  | ** | 
|  | {name: convert(value[name]) | 
|  | for (name, convert) in converters.items()}) | 
|  | elif type_info == "logical_type": | 
|  | return lambda value: value | 
|  | else: | 
|  | raise ValueError(f"Unrecognized type_info: {type_info!r}") | 
|  |  | 
|  |  | 
|  | def beam_schema_to_avro_schema( | 
|  | beam_schema: schema_pb2.Schema) -> _AvroSchemaType: | 
|  | return beam_type_to_avro_type( | 
|  | schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) | 
|  |  | 
|  |  | 
|  | def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType: | 
|  | type_info = beam_type.WhichOneof("type_info") | 
|  | if type_info == "atomic_type": | 
|  | return {'type': BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]} | 
|  | elif type_info == "array_type": | 
|  | return { | 
|  | 'type': 'array', | 
|  | 'items': beam_type_to_avro_type(beam_type.array_type.element_type) | 
|  | } | 
|  | elif type_info == "iterable_type": | 
|  | return { | 
|  | 'type': 'array', | 
|  | 'items': beam_type_to_avro_type(beam_type.iterable_type.element_type) | 
|  | } | 
|  | elif type_info == "map_type": | 
|  | if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: | 
|  | raise TypeError( | 
|  | f'Only strings allowd as map keys when converting to AVRO, ' | 
|  | f'found {beam_type}') | 
|  | return { | 
|  | 'type': 'map', | 
|  | 'values': beam_type_to_avro_type(beam_type.map_type.element_type) | 
|  | } | 
|  | elif type_info == "row_type": | 
|  | return { | 
|  | 'type': 'record', | 
|  | 'name': beam_type.row_type.schema.id, | 
|  | 'fields': [{ | 
|  | 'name': field.name, 'type': beam_type_to_avro_type(field.type) | 
|  | } for field in beam_type.row_type.schema.fields], | 
|  | } | 
|  | else: | 
|  | raise ValueError(f"Unconvertale type: {beam_type}") | 
|  |  | 
|  |  | 
|  | def beam_row_to_avro_dict( | 
|  | avro_schema: _AvroSchemaType, beam_schema: schema_pb2.Schema): | 
|  | if isinstance(avro_schema, str): | 
|  | return beam_row_to_avro_dict({'type': avro_schema}, beam_schema) | 
|  | if avro_schema['type'] == 'record': | 
|  | return beam_value_to_avro_value( | 
|  | schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) | 
|  | else: | 
|  | convert = beam_value_to_avro_value(beam_schema) | 
|  | return lambda row: convert(row[0]) | 
|  |  | 
|  |  | 
|  | def beam_value_to_avro_value( | 
|  | beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: | 
|  | type_info = beam_type.WhichOneof("type_info") | 
|  | if type_info == "atomic_type": | 
|  | return lambda value: value | 
|  | elif type_info == "array_type": | 
|  | element_converter = avro_value_to_beam_value( | 
|  | beam_type.array_type.element_type) | 
|  | return lambda value: [element_converter(e) for e in value] | 
|  | elif type_info == "iterable_type": | 
|  | element_converter = avro_value_to_beam_value( | 
|  | beam_type.iterable_type.element_type) | 
|  | return lambda value: [element_converter(e) for e in value] | 
|  | elif type_info == "map_type": | 
|  | if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: | 
|  | raise TypeError( | 
|  | f'Only strings allowd as map keys when converting from AVRO, ' | 
|  | f'found {beam_type}') | 
|  | value_converter = avro_value_to_beam_value(beam_type.map_type.value_type) | 
|  | return lambda value: {k: value_converter(v) for (k, v) in value.items()} | 
|  | elif type_info == "row_type": | 
|  | converters = { | 
|  | field.name: avro_value_to_beam_value(field.type) | 
|  | for field in beam_type.row_type.schema.fields | 
|  | } | 
|  | return lambda value: { | 
|  | name: convert(getattr(value, name)) | 
|  | for (name, convert) in converters.items() | 
|  | } | 
|  | elif type_info == "logical_type": | 
|  | return lambda value: value | 
|  | else: | 
|  | raise ValueError(f"Unrecognized type_info: {type_info!r}") |