blob: 5793fd111a6c3974cc04b508a67b851a04a92712 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
fixtures:
- name: TEMP_BOOTSTAP_SERVER
type: "apache_beam.yaml.integration_tests.temp_kafka_server"
pipelines:
# Kafka write pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {value: 123}
- {value: 456}
- {value: 789}
- type: MapToFields
config:
language: python
fields:
value:
callable: |
lambda row: str(row.value).encode('utf-8')
output_type: bytes
- type: WriteToKafka
config:
format: "RAW"
topic: "silly_topic"
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
producer_config_updates:
linger.ms: "0"
# TODO(#35272): Figure out why this times out when running on github.
# Locally runs fine.
# Kafka read pipeline
# Need a separate read pipeline to make sure the write pipeline is flushed
# - pipeline:
# type: chain
# transforms:
# - type: ReadFromKafka
# config:
# format: "RAW"
# topic: "silly_topic"
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
# consumer_config:
# auto.offset.reset: "earliest"
# group.id: "yaml-kafka-test-group"
# max_read_time_seconds: 10 # will read forever if not set
# - type: MapToFields
# config:
# language: python
# fields:
# value:
# callable: |
# # Kafka RAW format reads messages as bytes in the 'payload' field of a Row
# lambda row: row.payload.decode('utf-8')
# output_type: string
# - type: AssertEqual
# config:
# elements:
# - {value: "123"}
# - {value: "456"}
# - {value: "789"}
options:
streaming: true
# TODO: Error handling hard to trigger upon initial investigations. Need to
# investigate more.