| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| fixtures: |
| - name: BT_INSTANCE |
| type: "apache_beam.yaml.integration_tests.temp_bigtable_table" |
| config: |
| project: "apache-beam-testing" |
| - name: TEMP_DIR |
| # Need distributed filesystem to be able to read and write from a container. |
| type: "apache_beam.yaml.integration_tests.gcs_temp_dir" |
| config: |
| bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" |
| |
| # Tests for BigTable YAML IO |
| |
| pipelines: |
| - pipeline: |
| type: chain |
| transforms: |
| - type: Create |
| config: |
| elements: |
| - {key: 'row1', |
| type: 'SetCell', |
| family_name: "cf1", |
| column_qualifier: "cq1", |
| value: "value1", |
| timestamp_micros: 5000} |
| - {key: 'row1', |
| type: 'SetCell', |
| family_name: "cf1", |
| column_qualifier: "cq2", |
| value: "value2", |
| timestamp_micros: 1000} |
| |
| - type: LogForTesting |
| - type: MapToFields |
| name: ConvertStringsToBytes |
| config: |
| language: python |
| fields: |
| # For 'SetCell' and 'DeleteFromColumn' |
| key: |
| callable: | |
| def convert_to_bytes(row): |
| return bytes(row.key, 'utf-8') if "key" in row._fields else None |
| type: |
| type |
| family_name: |
| family_name |
| column_qualifier: |
| callable: | |
| def convert_to_bytes(row): |
| return bytes(row.column_qualifier, 'utf-8') if 'column_qualifier' in row._fields else None |
| value: |
| callable: | |
| def convert_to_bytes(row): |
| return bytes(row.value, 'utf-8') if 'value' in row._fields else None |
| timestamp_micros: |
| timestamp_micros |
| # The 'type', 'timestamp_micros', 'start_timestamp_micros', 'end_timestamp_micros' |
| # fields are already of the correct type (String, Long) or are optional. |
| # We only need to convert fields that are Strings in YAML but need to be Bytes in Java. |
| |
| - type: WriteToBigTable |
| config: |
| project: 'apache-beam-testing' |
| instance: "{BT_INSTANCE}" |
| table: 'test-table' |
| - pipeline: |
| type: chain |
| transforms: |
| - type: ReadFromBigTable |
| config: |
| project: 'apache-beam-testing' |
| instance: "{BT_INSTANCE}" |
| table: 'test-table' |
| - type: MapToFields |
| config: |
| language: python |
| fields: |
| key: |
| callable: | |
| def convert_to_string(row): |
| return row.key.decode("utf-8") if "key" in row._fields else None |
| family_name: |
| family_name |
| column_qualifier: |
| callable: | |
| def convert_to_string(row): |
| return row.column_qualifier.decode("utf-8") if "column_qualifier" in row._fields else None |
| cells: |
| callable: | |
| def convert_to_string(row): |
| cell_bytes = [] |
| for (value, timestamp) in row.cells: |
| value_bytes = value.decode("utf-8") |
| cell_bytes.append(beam.Row(value=value_bytes, timestamp_micros=timestamp)) |
| return cell_bytes |
| - type: AssertEqual |
| config: |
| elements: |
| - { key: 'row1', |
| family_name: "cf1", |
| column_qualifier: "cq1", |
| cells:[{ |
| value: "value1", |
| timestamp_micros: 5000}]} |
| - { key: 'row1', |
| family_name: "cf1", |
| column_qualifier: "cq2", |
| cells: [{ |
| value: "value2", |
| timestamp_micros: 1000 } ] } |
| - type: LogForTesting |
| |
| - pipeline: |
| type: chain |
| transforms: |
| - type: ReadFromBigTable |
| config: |
| project: 'apache-beam-testing' |
| instance: "{BT_INSTANCE}" |
| table: 'test-table' |
| flatten: False |
| - type: MapToFields |
| config: |
| language: python |
| fields: |
| key: |
| callable: | |
| def convert_to_bytes(row): |
| return row.key.decode("utf-8") if "key" in row._fields else None |
| |
| column_families: |
| column_families |
| # TODO: issue #35790, once fixed we can uncomment this assert |
| # - type: AssertEqual |
| # config: |
| # elements: |
| # - {key: 'row1', |
| # # Use explicit map syntax to match the actual output |
| # column_families: { |
| # cf1: { |
| # cq1: [ |
| # { value: "value1", timestamp_micros: 5000 } |
| # ], |
| # cq2: [ |
| # { value: "value2", timestamp_micros: 1000 } |
| # ] |
| # } |
| # } |
| # } |
| # - {'key': 'row1', |
| # column_families: {cf1: {cq2: |
| # [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value2', timestamp_micros=1000)], 'cq1': [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value1', timestamp_micros=5000)]}}} |
| |
| |
| # - type: LogForTesting |
| |
| |