| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| @ENABLE_KAFKA |
| Feature: Sending data to using Kafka streaming platform using PublishKafka |
| In order to send data to a Kafka stream |
| As a user of MiNiFi |
| I need to have PublishKafka processor |
| |
| Background: |
| Given the content of "/tmp/output" is monitored |
| |
| Scenario Outline: A MiNiFi instance transfers data to a kafka broker |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a UpdateAttribute processor |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | UpdateAttribute | kafka_require_num_acks | 1 | |
| | UpdateAttribute | kafka_message_key | unique_message_key_123 | |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Delivery Guarantee | ${kafka_require_num_acks} | |
| | PublishKafka | Request Timeout | 12 s | |
| | PublishKafka | Message Timeout | 13 s | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:${literal(9000):plus(92)} | |
| | PublishKafka | Client Name | client_no_${literal(6):multiply(7)} | |
| | PublishKafka | Kafka Key | ${kafka_message_key} | |
| | PublishKafka | retry.backoff.ms | ${literal(2):multiply(25):multiply(3)} | |
| | PublishKafka | Message Key Field | kafka.key | |
| | PublishKafka | Compress Codec | <compress_codec> | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And the "success" relationship of the GetFile processor is connected to the UpdateAttribute |
| And the "success" relationship of the UpdateAttribute processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| And the Minifi logs contain the following message: " is 'test'" in less than 60 seconds |
| And the Minifi logs contain the following message: "PublishKafka: request.required.acks [1]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: request.timeout.ms [12000]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: message.timeout.ms [13000]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: bootstrap.servers [kafka-broker-${feature_id}:9092]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds |
| And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds |
| And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds |
| |
| Examples: Compression formats |
| | compress_codec | |
| | none | |
| | snappy | |
| | gzip | |
| | lz4 | |
| | zstd | |
| |
| Scenario: PublishKafka sends flowfiles to failure when the broker is not available |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "no broker" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "failure" relationship of the PublishKafka processor is connected to the PutFile |
| |
| When the MiNiFi instance starts up |
| Then a flowfile with the content "no broker" is placed in the monitored directory in less than 60 seconds |
| |
| Scenario: PublishKafka sends can use SSL connect with security properties |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Client Name | LMN | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:9093 | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Batch Size | 10 | |
| | PublishKafka | Compress Codec | none | |
| | PublishKafka | Delivery Guarantee | 1 | |
| | PublishKafka | Request Timeout | 10 sec | |
| | PublishKafka | Message Timeout | 12 sec | |
| | PublishKafka | Security CA | /tmp/resources/root_ca.crt | |
| | PublishKafka | Security Cert | /tmp/resources/minifi_client.crt | |
| | PublishKafka | Security Private Key | /tmp/resources/minifi_client.key | |
| | PublishKafka | Security Protocol | ssl | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| |
| # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set |
| And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds |
| |
| Scenario: A MiNiFi instance transfers data to a kafka broker through SASL Plain security protocol |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Request Timeout | 10 sec | |
| | PublishKafka | Message Timeout | 12 sec | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:9094 | |
| | PublishKafka | Client Name | LMN | |
| | PublishKafka | Security Protocol | sasl_plaintext | |
| | PublishKafka | SASL Mechanism | PLAIN | |
| | PublishKafka | Username | alice | |
| | PublishKafka | Password | alice-secret | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| |
| Scenario: PublishKafka sends can use SASL SSL connect with security properties |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Client Name | LMN | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:9095 | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Batch Size | 10 | |
| | PublishKafka | Compress Codec | none | |
| | PublishKafka | Delivery Guarantee | 1 | |
| | PublishKafka | Request Timeout | 10 sec | |
| | PublishKafka | Message Timeout | 12 sec | |
| | PublishKafka | Security CA | /tmp/resources/root_ca.crt | |
| | PublishKafka | Security Cert | /tmp/resources/minifi_client.crt | |
| | PublishKafka | Security Private Key | /tmp/resources/minifi_client.key | |
| | PublishKafka | Security Protocol | sasl_ssl | |
| | PublishKafka | SASL Mechanism | PLAIN | |
| | PublishKafka | Username | alice | |
| | PublishKafka | Password | alice-secret | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| |
| Scenario: PublishKafka sends can use SASL SSL connect with SSL Context |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Client Name | LMN | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:9095 | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Batch Size | 10 | |
| | PublishKafka | Compress Codec | none | |
| | PublishKafka | Delivery Guarantee | 1 | |
| | PublishKafka | Request Timeout | 10 sec | |
| | PublishKafka | Message Timeout | 12 sec | |
| | PublishKafka | Security Protocol | sasl_ssl | |
| | PublishKafka | SASL Mechanism | PLAIN | |
| | PublishKafka | Username | alice | |
| | PublishKafka | Password | alice-secret | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And an ssl context service is set up for PublishKafka |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| |
| Scenario: PublishKafka sends can use SSL connect with SSL Context Service |
| Given a GetFile processor with the "Input Directory" property set to "/tmp/input" |
| And a file with the content "test" is present in "/tmp/input" |
| And a PublishKafka processor set up to communicate with a kafka broker instance |
| And these processor properties are set: |
| | processor name | property name | property value | |
| | PublishKafka | Client Name | LMN | |
| | PublishKafka | Known Brokers | kafka-broker-${feature_id}:9093 | |
| | PublishKafka | Topic Name | test | |
| | PublishKafka | Batch Size | 10 | |
| | PublishKafka | Compress Codec | none | |
| | PublishKafka | Delivery Guarantee | 1 | |
| | PublishKafka | Request Timeout | 10 sec | |
| | PublishKafka | Message Timeout | 12 sec | |
| | PublishKafka | Security Protocol | ssl | |
| And a PutFile processor with the "Directory" property set to "/tmp/output" |
| And an ssl context service is set up for PublishKafka |
| And the "success" relationship of the GetFile processor is connected to the PublishKafka |
| And the "success" relationship of the PublishKafka processor is connected to the PutFile |
| And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka |
| |
| And a kafka broker is set up in correspondence with the PublishKafka |
| |
| When both instances start up |
| Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds |
| |
| # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set |
| And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds |