| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| fixtures: |
| - name: TEMP_BOOTSTAP_SERVER |
| type: "apache_beam.yaml.integration_tests.temp_kafka_server" |
| |
| pipelines: |
| # Kafka write pipeline |
| - pipeline: |
| type: chain |
| transforms: |
| - type: Create |
| config: |
| elements: |
| - {value: 123} |
| - {value: 456} |
| - {value: 789} |
| - type: MapToFields |
| config: |
| language: python |
| fields: |
| value: |
| callable: | |
| lambda row: str(row.value).encode('utf-8') |
| output_type: bytes |
| - type: WriteToKafka |
| config: |
| format: "RAW" |
| topic: "silly_topic" |
| bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" |
| producer_config_updates: |
| linger.ms: "0" |
| |
| # TODO(#35272): Figure out why this times out when running on github. |
| # Locally runs fine. |
| # Kafka read pipeline |
| # Need a separate read pipeline to make sure the write pipeline is flushed |
| # - pipeline: |
| # type: chain |
| # transforms: |
| # - type: ReadFromKafka |
| # config: |
| # format: "RAW" |
| # topic: "silly_topic" |
| # bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}" |
| # consumer_config: |
| # auto.offset.reset: "earliest" |
| # group.id: "yaml-kafka-test-group" |
| # max_read_time_seconds: 10 # will read forever if not set |
| # - type: MapToFields |
| # config: |
| # language: python |
| # fields: |
| # value: |
| # callable: | |
| # # Kafka RAW format reads messages as bytes in the 'payload' field of a Row |
| # lambda row: row.payload.decode('utf-8') |
| # output_type: string |
| # - type: AssertEqual |
| # config: |
| # elements: |
| # - {value: "123"} |
| # - {value: "456"} |
| # - {value: "789"} |
| |
| options: |
| streaming: true |
| |
| # TODO: Error handling hard to trigger upon initial investigations. Need to |
| # investigate more. |