| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| name: StressTests Java KafkaIO |
| |
| on: |
| schedule: |
| - cron: '0 14 * * 0' |
| workflow_dispatch: |
| |
| #Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event |
| permissions: |
| actions: write |
| pull-requests: read |
| checks: read |
| contents: read |
| deployments: read |
| id-token: none |
| issues: read |
| discussions: read |
| packages: read |
| pages: read |
| repository-projects: read |
| security-events: read |
| statuses: read |
| |
| # This allows a subsequently queued workflow run to interrupt previous runs |
| concurrency: |
| group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' |
| cancel-in-progress: true |
| |
| env: |
| DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} |
| GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} |
| GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} |
| INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }} |
| INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }} |
| |
| jobs: |
| beam_StressTests_Java_KafkaIO: |
| if: | |
| github.event_name == 'workflow_dispatch' || |
| (github.event_name == 'schedule' && github.repository == 'apache/beam') || |
| github.event.comment.body == 'Run Stress Tests Java KafkaIO' |
| runs-on: [self-hosted, ubuntu-20.04, highmem] |
| timeout-minutes: 720 |
| name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) |
| strategy: |
| matrix: |
| job_name: ["beam_StressTests_Java_KafkaIO"] |
| job_phrase: ["Run Stress Tests Java KafkaIO"] |
| steps: |
| - uses: actions/checkout@v4 |
| - name: Setup repository |
| uses: ./.github/actions/setup-action |
| with: |
| comment_phrase: ${{ matrix.job_phrase }} |
| github_token: ${{ secrets.GITHUB_TOKEN }} |
| github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) |
| - name: Setup environment |
| uses: ./.github/actions/setup-environment-action |
| - name: Set k8s access |
| uses: ./.github/actions/setup-k8s-access |
| with: |
| cluster_name: kafka-workflows |
| k8s_namespace: ${{ matrix.job_name }}-${{ github.run_id }} |
| cluster_zone: us-central1 |
| - name: Install Kafka |
| id: install_kafka |
| run: | |
| echo "Deploying Kafka cluster using existing .test-infra/kubernetes/kafka-cluster configuration..." |
| kubectl apply -R -f ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/ |
| |
| # Wait for pods to be created and ready |
| echo "Waiting for Kafka cluster to be ready..." |
| sleep 180 |
| |
| # Check pod status |
| echo "Checking pod status..." |
| kubectl get pods -l app=kafka |
| kubectl get pods -l app=zookeeper |
| |
| # Wait for at least one Kafka pod to be ready |
| echo "Waiting for Kafka pods to be ready..." |
| kubectl wait --for=condition=ready pod -l app=kafka --timeout=300s || echo "Kafka pods not ready, continuing anyway" |
| |
| # Wait for Zookeeper to be ready |
| echo "Waiting for Zookeeper pods to be ready..." |
| kubectl wait --for=condition=ready pod -l app=zookeeper --timeout=300s || echo "Zookeeper pods not ready, continuing anyway" |
| |
| - name: Set up Kafka brokers |
| id: set_brokers |
| run: | |
| echo "Setting up Kafka brokers for existing cluster configuration..." |
| declare -a kafka_service_brokers |
| declare -a kafka_service_brokers_ports |
| |
| for INDEX in {0..2}; do |
| echo "Setting up broker ${INDEX}..." |
| |
| # Try to get LoadBalancer IP |
| LB_IP=$(kubectl get svc outside-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo "") |
| |
| if [ -n "$LB_IP" ] && [ "$LB_IP" != "null" ]; then |
| echo "Using LoadBalancer IP: $LB_IP" |
| kafka_service_brokers[$INDEX]=$LB_IP |
| else |
| echo "LoadBalancer IP not available, using NodePort approach..." |
| # Get the first node's internal IP |
| NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}') |
| kafka_service_brokers[$INDEX]=$NODE_IP |
| fi |
| |
| # Get the port |
| PORT=$(kubectl get svc outside-${INDEX} -o jsonpath='{.spec.ports[0].port}' 2>/dev/null || echo "9094") |
| kafka_service_brokers_ports[$INDEX]=$PORT |
| |
| echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT |
| echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT |
| |
| echo "Broker ${INDEX}: ${kafka_service_brokers[$INDEX]}:${kafka_service_brokers_ports[$INDEX]}" |
| done |
| |
| - name: Create Kafka topic |
| id: create_topic |
| run: | |
| echo "Creating Kafka topic 'beam'..." |
| |
| # Get the first available Kafka pod |
| KAFKA_POD=$(kubectl get pods -l app=kafka -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "") |
| |
| if [ -z "$KAFKA_POD" ]; then |
| echo "No Kafka pods found, skipping topic creation" |
| exit 0 |
| fi |
| |
| echo "Using Kafka pod: $KAFKA_POD" |
| |
| # Wait a bit more for the pod to be fully operational |
| echo "Waiting for pod to be fully operational..." |
| sleep 60 |
| |
| # Create the topic using the correct container and path |
| echo "Creating topic 'beam'..." |
| kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --create --topic beam --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 || echo "Topic may already exist" |
| |
| # Verify topic was created |
| echo "Verifying topic creation..." |
| kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 || echo "Could not list topics" |
| - name: run Kafka StressTest Large |
| uses: ./.github/actions/gradle-command-self-hosted-action |
| with: |
| gradle-command: :it:kafka:KafkaStressTestLarge --info -DbootstrapServers="${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}" -DinfluxHost="http://10.128.0.96:8086" -DinfluxDatabase="beam_test_metrics" -DinfluxMeasurement="java_stress_test_kafka" |