blob: 0576ee748d79be9b177546e16ed3e0cea5d86820 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: StressTests Java KafkaIO
on:
schedule:
- cron: '0 14 * * 0'
workflow_dispatch:
#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true
env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }}
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}
jobs:
beam_StressTests_Java_KafkaIO:
if: |
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Stress Tests Java KafkaIO'
runs-on: [self-hosted, ubuntu-20.04, highmem]
timeout-minutes: 720
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_StressTests_Java_KafkaIO"]
job_phrase: ["Run Stress Tests Java KafkaIO"]
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: Set k8s access
uses: ./.github/actions/setup-k8s-access
with:
cluster_name: kafka-workflows
k8s_namespace: ${{ matrix.job_name }}-${{ github.run_id }}
cluster_zone: us-central1
- name: Install Kafka
id: install_kafka
run: |
echo "Deploying Kafka cluster using existing .test-infra/kubernetes/kafka-cluster configuration..."
kubectl apply -R -f ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
# Wait for pods to be created and ready
echo "Waiting for Kafka cluster to be ready..."
sleep 180
# Check pod status
echo "Checking pod status..."
kubectl get pods -l app=kafka
kubectl get pods -l app=zookeeper
# Wait for at least one Kafka pod to be ready
echo "Waiting for Kafka pods to be ready..."
kubectl wait --for=condition=ready pod -l app=kafka --timeout=300s || echo "Kafka pods not ready, continuing anyway"
# Wait for Zookeeper to be ready
echo "Waiting for Zookeeper pods to be ready..."
kubectl wait --for=condition=ready pod -l app=zookeeper --timeout=300s || echo "Zookeeper pods not ready, continuing anyway"
- name: Set up Kafka brokers
id: set_brokers
run: |
echo "Setting up Kafka brokers for existing cluster configuration..."
declare -a kafka_service_brokers
declare -a kafka_service_brokers_ports
for INDEX in {0..2}; do
echo "Setting up broker ${INDEX}..."
# Try to get LoadBalancer IP
LB_IP=$(kubectl get svc outside-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo "")
if [ -n "$LB_IP" ] && [ "$LB_IP" != "null" ]; then
echo "Using LoadBalancer IP: $LB_IP"
kafka_service_brokers[$INDEX]=$LB_IP
else
echo "LoadBalancer IP not available, using NodePort approach..."
# Get the first node's internal IP
NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
kafka_service_brokers[$INDEX]=$NODE_IP
fi
# Get the port
PORT=$(kubectl get svc outside-${INDEX} -o jsonpath='{.spec.ports[0].port}' 2>/dev/null || echo "9094")
kafka_service_brokers_ports[$INDEX]=$PORT
echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT
echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT
echo "Broker ${INDEX}: ${kafka_service_brokers[$INDEX]}:${kafka_service_brokers_ports[$INDEX]}"
done
- name: Create Kafka topic
id: create_topic
run: |
echo "Creating Kafka topic 'beam'..."
# Get the first available Kafka pod
KAFKA_POD=$(kubectl get pods -l app=kafka -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
if [ -z "$KAFKA_POD" ]; then
echo "No Kafka pods found, skipping topic creation"
exit 0
fi
echo "Using Kafka pod: $KAFKA_POD"
# Wait a bit more for the pod to be fully operational
echo "Waiting for pod to be fully operational..."
sleep 60
# Create the topic using the correct container and path
echo "Creating topic 'beam'..."
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --create --topic beam --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 || echo "Topic may already exist"
# Verify topic was created
echo "Verifying topic creation..."
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 || echo "Could not list topics"
- name: run Kafka StressTest Large
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :it:kafka:KafkaStressTestLarge --info -DbootstrapServers="${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}" -DinfluxHost="http://10.128.0.96:8086" -DinfluxDatabase="beam_test_metrics" -DinfluxMeasurement="java_stress_test_kafka"