| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # A pipeline that both writes to and reads from the same Kafka topic. |
| |
| pipeline: |
| transforms: |
| - type: ReadFromText |
| name: ReadFromGCS |
| config: |
| path: gs://dataflow-samples/shakespeare/kinglear.txt |
| |
| - type: MapToFields |
| name: BuildKafkaRecords |
| input: ReadFromGCS |
| config: |
| language: python |
| fields: |
| value: |
| callable: | |
| def func(row): |
| return row.line.encode('utf-8') |
| output_type: bytes |
| |
| - type: WriteToKafka |
| name: SendRecordsToKafka |
| input: BuildKafkaRecords |
| config: |
| format: "RAW" |
| topic: "{{ TOPIC }}" |
| bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" |
| producer_config_updates: |
| sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ |
| username={{ USERNAME }} \ |
| password={{ PASSWORD }};" |
| security.protocol: "SASL_PLAINTEXT" |
| sasl.mechanism: "PLAIN" |
| |
| - type: ReadFromKafka |
| name: ReadFromMyTopic |
| config: |
| format: "RAW" |
| topic: "{{ TOPIC }}" |
| bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}" |
| auto_offset_reset_config: earliest |
| consumer_config: |
| sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \ |
| username={{ USERNAME }} \ |
| password={{ PASSWORD }};" |
| security.protocol: "SASL_PLAINTEXT" |
| sasl.mechanism: "PLAIN" |
| |
| - type: MapToFields |
| name: ParseKafkaRecords |
| input: ReadFromMyTopic |
| config: |
| language: python |
| fields: |
| text: |
| callable: | |
| def func(row): |
| # Kafka RAW format reads messages as bytes |
| # in the 'payload' field of a Row |
| return row.payload.decode('utf-8') |
| |
| - type: LogForTesting |
| input: ParseKafkaRecords |
| |
| # Since the pipeline both writes to and reads from a Kafka topic, we expect |
| # the first pipeline component to write the rows containing the `value` |
| # field as bytes to Kafka, and the second pipeline component to read the byte |
| # messages from Kafka before parsing them as string in the new `text` field. |
| # Expected: |
| # Row(value=b'Fool\tThou shouldst not have been old till thou hadst') |
| # Row(value=b'\tbeen wise.') |
| # Row(value=b'KING LEAR\tNothing will come of nothing: speak again.') |
| # Row(value=b'\tNever, never, never, never, never!') |
| # Row(text='Fool\tThou shouldst not have been old till thou hadst') |
| # Row(text='\tbeen wise.') |
| # Row(text='KING LEAR\tNothing will come of nothing: speak again.') |
| # Row(text='\tNever, never, never, never, never!') |