| # | 
 | # Licensed to the Apache Software Foundation (ASF) under one or more | 
 | # contributor license agreements.  See the NOTICE file distributed with | 
 | # this work for additional information regarding copyright ownership. | 
 | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
 | # (the "License"); you may not use this file except in compliance with | 
 | # the License.  You may obtain a copy of the License at | 
 | # | 
 | #    http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | # | 
 |  | 
 |  | 
 | fixtures: | 
 |   - name: BT_INSTANCE | 
 |     type: "apache_beam.yaml.integration_tests.temp_bigtable_table" | 
 |     config: | 
 |       project: "apache-beam-testing" | 
 |   - name: TEMP_DIR | 
 |     # Need distributed filesystem to be able to read and write from a container. | 
 |     type: "apache_beam.yaml.integration_tests.gcs_temp_dir" | 
 |     config: | 
 |       bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" | 
 |  | 
 |   # Tests for BigTable YAML IO | 
 |  | 
 | pipelines: | 
 |   - pipeline: | 
 |       type: chain | 
 |       transforms: | 
 |         - type: Create | 
 |           config: | 
 |             elements: | 
 |               - {key: 'row1', | 
 |                  type: 'SetCell', | 
 |                  family_name: "cf1", | 
 |                  column_qualifier: "cq1", | 
 |                  value: "value1", | 
 |                  timestamp_micros: 5000} | 
 |               - {key: 'row1', | 
 |                  type: 'SetCell', | 
 |                  family_name: "cf1", | 
 |                  column_qualifier: "cq2", | 
 |                  value: "value2", | 
 |                  timestamp_micros: 1000} | 
 |  | 
 |         - type: LogForTesting | 
 |         - type: MapToFields | 
 |           name: ConvertStringsToBytes | 
 |           config: | 
 |             language: python | 
 |             fields: | 
 |               # For 'SetCell' and 'DeleteFromColumn' | 
 |               key: | 
 |                 callable: | | 
 |                   def convert_to_bytes(row): | 
 |                     return bytes(row.key, 'utf-8') if "key" in row._fields else None | 
 |               type: | 
 |                 type | 
 |               family_name: | 
 |                 family_name | 
 |               column_qualifier: | 
 |                 callable: | | 
 |                   def convert_to_bytes(row): | 
 |                     return bytes(row.column_qualifier, 'utf-8') if 'column_qualifier' in row._fields else None | 
 |               value: | 
 |                 callable: | | 
 |                   def convert_to_bytes(row): | 
 |                     return bytes(row.value, 'utf-8') if 'value' in row._fields else None | 
 |               timestamp_micros: | 
 |                 timestamp_micros | 
 |               # The 'type', 'timestamp_micros', 'start_timestamp_micros', 'end_timestamp_micros' | 
 |               # fields are already of the correct type (String, Long) or are optional. | 
 |               # We only need to convert fields that are Strings in YAML but need to be Bytes in Java. | 
 |  | 
 |         - type: WriteToBigTable | 
 |           config: | 
 |             project: 'apache-beam-testing' | 
 |             instance: "{BT_INSTANCE}" | 
 |             table: 'test-table' | 
 |   - pipeline: | 
 |       type: chain | 
 |       transforms: | 
 |         - type: ReadFromBigTable | 
 |           config: | 
 |             project: 'apache-beam-testing' | 
 |             instance: "{BT_INSTANCE}" | 
 |             table: 'test-table' | 
 |         - type: MapToFields | 
 |           config: | 
 |             language: python | 
 |             fields: | 
 |               key: | 
 |                 callable: | | 
 |                   def convert_to_string(row): | 
 |                     return row.key.decode("utf-8") if "key" in row._fields else None | 
 |               family_name: | 
 |                 family_name | 
 |               column_qualifier: | 
 |                 callable: | | 
 |                   def convert_to_string(row): | 
 |                     return row.column_qualifier.decode("utf-8") if "column_qualifier" in row._fields else None | 
 |               cells: | 
 |                 callable: | | 
 |                   def convert_to_string(row): | 
 |                     cell_bytes = [] | 
 |                     for (value, timestamp) in row.cells: | 
 |                       value_bytes = value.decode("utf-8") | 
 |                       cell_bytes.append(beam.Row(value=value_bytes, timestamp_micros=timestamp)) | 
 |                     return cell_bytes | 
 |         - type: AssertEqual | 
 |           config: | 
 |             elements: | 
 |               - { key: 'row1', | 
 |                   family_name: "cf1", | 
 |                   column_qualifier: "cq1", | 
 |                   cells:[{ | 
 |                     value: "value1", | 
 |                     timestamp_micros: 5000}]} | 
 |               - { key: 'row1', | 
 |                   family_name: "cf1", | 
 |                   column_qualifier: "cq2", | 
 |                   cells: [{ | 
 |                     value: "value2", | 
 |                     timestamp_micros: 1000 } ] } | 
 |         - type: LogForTesting | 
 |  | 
 |   - pipeline: | 
 |       type: chain | 
 |       transforms: | 
 |         - type: ReadFromBigTable | 
 |           config: | 
 |             project: 'apache-beam-testing' | 
 |             instance: "{BT_INSTANCE}" | 
 |             table: 'test-table' | 
 |             flatten: False | 
 |         - type: MapToFields | 
 |           config: | 
 |             language: python | 
 |             fields: | 
 |               key: | 
 |                 callable: | | 
 |                   def convert_to_bytes(row): | 
 |                     return row.key.decode("utf-8") if "key" in row._fields else None | 
 |  | 
 |               column_families: | 
 |                 column_families | 
 | #        TODO: issue  #35790, once fixed we can uncomment this assert | 
 | #        - type: AssertEqual | 
 | #          config: | 
 | #            elements: | 
 | #              - {key: 'row1', | 
 | #                # Use explicit map syntax to match the actual output | 
 | #                 column_families: { | 
 | #                   cf1: { | 
 | #                     cq1: [ | 
 | #                       { value: "value1", timestamp_micros: 5000 } | 
 | #                     ], | 
 | #                     cq2: [ | 
 | #                       { value: "value2", timestamp_micros: 1000 } | 
 | #                     ] | 
 | #                   } | 
 | #                 } | 
 | #              } | 
 |         #                - {'key': 'row1', | 
 |         #                   column_families: {cf1: {cq2: | 
 |         #                                             [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value2', timestamp_micros=1000)], 'cq1': [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value1', timestamp_micros=5000)]}}} | 
 |  | 
 |  | 
 | #        - type: LogForTesting | 
 |  | 
 |  |