blob: 96a9b88016746c1340bf598f71beddd2c9c4bfee [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
name: Run Python YAML RC Validation
on:
workflow_dispatch:
inputs:
RELEASE_VER:
description: 'Beam Release Version (e.g., 2.64.0)'
required: true
default: '2.64.0'
RC_NUM:
description: 'Release Candidate number (e.g., 1)'
required: true
default: '1'
# APACHE_CONTENTS_REPO is not needed for Python-only YAML test
# CLEANUP_BQ_RESOURCES is not needed as we use GCS
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.inputs.RELEASE_VER }}-${{ github.event.inputs.RC_NUM }}'
cancel-in-progress: true
# Setting explicit permissions for the action
permissions:
actions: write
pull-requests: write # Needed for setup-action potentially
checks: write
contents: read # Needs read to checkout the code
deployments: read
id-token: write # Required for GCP Workload Identity Federation
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read
env: # Workflow level env vars
GCP_PROJECT_ID: 'apache-beam-testing'
jobs:
run_python_yaml_rc_validation:
name: Run Python YAML RC Validation (${{ github.event.inputs.RELEASE_VER }} RC${{ github.event.inputs.RC_NUM }})
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 60 # Reduced timeout as the job runs for ~20 mins + setup/validation
env: # Job-level env vars
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
RUN_ID_SUFFIX: ${{ github.run_id }}_${{ github.run_attempt }}
GCE_REGION: 'us-central1'
RELEASE_VERSION: ${{ github.event.inputs.RELEASE_VER }}
RC_NUM: ${{ github.event.inputs.RC_NUM }}
# Define the base bucket and unique folder prefix directly here
GCS_UNIQUE_FOLDER_PREFIX: gs://rc-validation-migration-tests/yaml_rc_validation/${{ github.event.inputs.RELEASE_VER }}_RC${{ github.event.inputs.RC_NUM }}_${{ github.run_id }}_${{ github.run_attempt }}
# Temp, Staging, and Output locations will be constructed in the steps using the prefix above
RC_TAG: "v${{github.event.inputs.RELEASE_VER}}-RC${{github.event.inputs.RC_NUM}}"
PYTHON_VERSION: '3.12' # Or adjust if needed
BEAM_PYTHON_SDK_TAR_GZ: apache_beam-${{ github.event.inputs.RELEASE_VER }}.tar.gz
BEAM_SOURCE_ZIP: apache-beam-${{ github.event.inputs.RELEASE_VER }}-source-release.zip
APACHE_DIST_URL_BASE: https://dist.apache.org/repos/dist/dev/beam/${{ github.event.inputs.RELEASE_VER }}
YAML_PIPELINE_FILE: t1_2.yaml
SUBMISSION_TIMEOUT_SECONDS: 120 # Timeout for the python submission script itself
steps:
- name: Checkout code at RC tag
uses: actions/checkout@v4
with:
ref: ${{ env.RC_TAG }}
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: 11 # Keep Java setup for now, might be needed by gcloud/Dataflow
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install Dependencies
run: |
sudo apt-get update --yes
sudo apt-get install -y wget unzip coreutils procps grep sed
shell: bash
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v3
- name: Download RC Artifacts
run: |
echo "Downloading from ${{ env.APACHE_DIST_URL_BASE }}"
wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }}
wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512
# Source zip not strictly needed if installing from tar.gz, but keeping for consistency/potential future use
wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }}
wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }}.sha512
shell: bash
- name: Verify Hashes
run: |
echo "Verifying sha512 checksums..."
sha512sum -c ${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512
sha512sum -c ${{ env.BEAM_SOURCE_ZIP }}.sha512
shell: bash
- name: Setup Python Virtual Environment
run: |
echo "Setting up Python virtual environment..."
python -m venv beam_env
source beam_env/bin/activate
pip install --upgrade pip setuptools wheel
echo "Virtual environment ready."
shell: bash
- name: Install Python SDK with [gcp, yaml] extras
run: |
echo "Installing Python SDK: ${{ env.BEAM_PYTHON_SDK_TAR_GZ }} with [gcp,yaml] extras"
source beam_env/bin/activate
# Install from the downloaded tar.gz
pip install "${{ env.BEAM_PYTHON_SDK_TAR_GZ }}[gcp,yaml]"
echo "SDK installed."
pip freeze # Log installed packages
shell: bash
- name: Create YAML Pipeline File
run: |
echo "Creating YAML pipeline file: ${{ env.YAML_PIPELINE_FILE }}"
cat <<EOF > ${{ env.YAML_PIPELINE_FILE }}
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: projects/pubsub-public-data/topics/taxirides-realtime
format: json
schema:
type: object
properties:
ride_id: {type: string}
- type: WriteToJson
config:
# Construct the output path directly here
path: "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json"
num_shards: 100
windowing:
type: fixed
size: 30s
options:
streaming: true
EOF
echo "YAML file created:"
cat ${{ env.YAML_PIPELINE_FILE }}
shell: bash
- name: Run YAML Pipeline (Dataflow Runner), Wait, Extract ID, Cleanup Submitter
id: submit_yaml_df
run: |
echo "Running YAML Pipeline with DataflowRunner in Background..."
source beam_env/bin/activate
python -m apache_beam.yaml.main \
--yaml_pipeline_file=${{ env.YAML_PIPELINE_FILE }} \
--runner DataflowRunner \
--region=${{ env.GCE_REGION }} \
--project=${{ env.GCP_PROJECT_ID }} \
--temp_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/temp \
--staging_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/staging \
> yaml_dataflow_submit.log 2>&1 &
YAML_DF_PID=$!
echo "YAML Pipeline (Dataflow Runner) submission process started in background with PID: ${YAML_DF_PID}"
echo ${YAML_DF_PID} > yaml_dataflow_submit.pid
echo "Waiting up to ${{ env.SUBMISSION_TIMEOUT_SECONDS }} seconds for Dataflow job submission process (PID: ${YAML_DF_PID}) to potentially complete..."
sleep ${{ env.SUBMISSION_TIMEOUT_SECONDS }}
echo "Proceeding with Job ID extraction..."
# Try extracting Job ID using common patterns from Dataflow submission logs
JOB_ID=$(grep -oP 'Dataflow Job ID: \K\S+' yaml_dataflow_submit.log || grep -oP "job_id='?\K[^' >]+" yaml_dataflow_submit.log || grep -oP "id: '?\"?\K[^'\" >]+" yaml_dataflow_submit.log | head -n 1)
if [[ -n "$JOB_ID" ]]; then
echo "Extracted YAML Dataflow Job ID: $JOB_ID"
echo "$JOB_ID" > yaml_dataflow_jobid.txt
else
echo "ERROR: Could not extract YAML Dataflow Job ID after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s wait. Log content:"
echo "--- YAML Dataflow submission log START ---"
cat yaml_dataflow_submit.log || echo "Log file not found."
echo "--- YAML Dataflow submission log END ---"
# Exit the step with failure if job ID is crucial and not found
exit 1
fi
# Check if the submission process is still running and kill it if necessary
if [ -f yaml_dataflow_submit.pid ] && ps -p $YAML_DF_PID > /dev/null; then
echo "Submission process (PID: $YAML_DF_PID) is still running after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s. Attempting to kill it."
kill -9 $YAML_DF_PID || echo "Failed to kill process $YAML_DF_PID."
else
echo "Submission process (PID: $YAML_DF_PID) has already finished or PID file is missing."
fi
# Clean up PID file regardless
if [ -f yaml_dataflow_submit.pid ]; then
rm yaml_dataflow_submit.pid
fi
echo "YAML Pipeline (Dataflow Runner) submission step finished processing."
shell: bash
- name: Wait for Job to Run
run: |
if [ ! -f yaml_dataflow_jobid.txt ]; then
echo "Skipping wait as Job ID was not extracted."
exit 0 # Allow cleanup to proceed
fi
JOB_ID=$(cat yaml_dataflow_jobid.txt)
echo "Waiting for 20 minutes for Dataflow job $JOB_ID to run..."
sleep 1200 # 20 minutes = 1200 seconds
echo "Wait finished."
shell: bash
- name: Cancel YAML Dataflow Job
if: always() # Run even if wait failed or previous steps failed, to attempt cleanup
run: |
if [ -f yaml_dataflow_jobid.txt ]; then
JOB_ID=$(cat yaml_dataflow_jobid.txt)
if [[ -n "$JOB_ID" ]]; then
echo "Attempting to cancel YAML Dataflow job: $JOB_ID in region ${{ env.GCE_REGION }}"
gcloud dataflow jobs cancel "$JOB_ID" --region=${{ env.GCE_REGION }} --project=${{ env.GCP_PROJECT_ID }} || echo "Failed to cancel YAML Dataflow job $JOB_ID (maybe it finished or was already cancelled)."
else
echo "YAML Dataflow Job ID file exists but is empty."
fi
# Keep jobid file for validation step, remove in final cleanup
else
echo "yaml_dataflow_jobid.txt not found, cannot cancel job (it might have failed before ID extraction)."
fi
shell: bash
- name: Validate GCS Output
run: |
if [ ! -f yaml_dataflow_jobid.txt ]; then
echo "Skipping GCS validation as Job ID was not extracted (job likely failed early)."
exit 0 # Allow cleanup to proceed
fi
# Construct the output path pattern directly here
OUTPUT_PATTERN="${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json-*-of-*"
echo "Validating GCS output files exist matching pattern: ${OUTPUT_PATTERN}"
# Wait a bit for cancellation to finalize and files to potentially appear fully
sleep 60
# Check if any files matching the pattern exist within the unique output folder.
echo "Checking for files matching pattern: ${OUTPUT_PATTERN}"
if gsutil ls "${OUTPUT_PATTERN}" > /dev/null 2>&1; then
echo "SUCCESS: Found output files matching pattern in GCS."
gsutil ls "${OUTPUT_PATTERN}" # List found files
else
echo "ERROR: No output files found matching pattern '${OUTPUT_PATTERN}' in GCS bucket."
exit 1
fi
shell: bash
# ================== Cleanup ==================
- name: Cleanup GCS Temp/Staging and Local Files
if: always()
run: |
echo "Deleting unique run folder in GCS: ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}"
# Delete the entire unique folder for this run, including temp, staging, and output
gsutil -m rm -r "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}" || echo "Failed to delete unique run folder ${{ env.GCS_UNIQUE_FOLDER_PREFIX }} in GCS. Manual cleanup might be required."
echo "Removing local log, yaml, and jobid files..."
rm -f yaml_dataflow_submit.log ${{ env.YAML_PIPELINE_FILE }} yaml_dataflow_jobid.txt
shell: bash