blob: 2f97b83c6e92b1bcfe2c9eded91ba64118b4dd8c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
fixtures:
- name: BT_INSTANCE
type: "apache_beam.yaml.integration_tests.temp_bigtable_table"
config:
project: "apache-beam-testing"
- name: TEMP_DIR
# Need distributed filesystem to be able to read and write from a container.
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
config:
bucket: "gs://temp-storage-for-end-to-end-tests/temp-it"
# Tests for BigTable YAML IO
pipelines:
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {key: 'row1',
type: 'SetCell',
family_name: "cf1",
column_qualifier: "cq1",
value: "value1",
timestamp_micros: 5000}
- {key: 'row1',
type: 'SetCell',
family_name: "cf1",
column_qualifier: "cq2",
value: "value2",
timestamp_micros: 1000}
- type: LogForTesting
- type: MapToFields
name: ConvertStringsToBytes
config:
language: python
fields:
# For 'SetCell' and 'DeleteFromColumn'
key:
callable: |
def convert_to_bytes(row):
return bytes(row.key, 'utf-8') if "key" in row._fields else None
type:
type
family_name:
family_name
column_qualifier:
callable: |
def convert_to_bytes(row):
return bytes(row.column_qualifier, 'utf-8') if 'column_qualifier' in row._fields else None
value:
callable: |
def convert_to_bytes(row):
return bytes(row.value, 'utf-8') if 'value' in row._fields else None
timestamp_micros:
timestamp_micros
# The 'type', 'timestamp_micros', 'start_timestamp_micros', 'end_timestamp_micros'
# fields are already of the correct type (String, Long) or are optional.
# We only need to convert fields that are Strings in YAML but need to be Bytes in Java.
- type: WriteToBigTable
config:
project: 'apache-beam-testing'
instance: "{BT_INSTANCE}"
table: 'test-table'
- pipeline:
type: chain
transforms:
- type: ReadFromBigTable
config:
project: 'apache-beam-testing'
instance: "{BT_INSTANCE}"
table: 'test-table'
- type: MapToFields
config:
language: python
fields:
key:
callable: |
def convert_to_string(row):
return row.key.decode("utf-8") if "key" in row._fields else None
family_name:
family_name
column_qualifier:
callable: |
def convert_to_string(row):
return row.column_qualifier.decode("utf-8") if "column_qualifier" in row._fields else None
cells:
callable: |
def convert_to_string(row):
cell_bytes = []
for (value, timestamp) in row.cells:
value_bytes = value.decode("utf-8")
cell_bytes.append(beam.Row(value=value_bytes, timestamp_micros=timestamp))
return cell_bytes
- type: AssertEqual
config:
elements:
- { key: 'row1',
family_name: "cf1",
column_qualifier: "cq1",
cells:[{
value: "value1",
timestamp_micros: 5000}]}
- { key: 'row1',
family_name: "cf1",
column_qualifier: "cq2",
cells: [{
value: "value2",
timestamp_micros: 1000 } ] }
- type: LogForTesting
- pipeline:
type: chain
transforms:
- type: ReadFromBigTable
config:
project: 'apache-beam-testing'
instance: "{BT_INSTANCE}"
table: 'test-table'
flatten: False
- type: MapToFields
config:
language: python
fields:
key:
callable: |
def convert_to_bytes(row):
return row.key.decode("utf-8") if "key" in row._fields else None
column_families:
column_families
# TODO: issue #35790, once fixed we can uncomment this assert
# - type: AssertEqual
# config:
# elements:
# - {key: 'row1',
# # Use explicit map syntax to match the actual output
# column_families: {
# cf1: {
# cq1: [
# { value: "value1", timestamp_micros: 5000 }
# ],
# cq2: [
# { value: "value2", timestamp_micros: 1000 }
# ]
# }
# }
# }
# - {'key': 'row1',
# column_families: {cf1: {cq2:
# [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value2', timestamp_micros=1000)], 'cq1': [BeamSchema_3281a0ae_fe85_474b_9030_86fbed58833a(value=b'value1', timestamp_micros=5000)]}}}
# - type: LogForTesting