| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| name: Run Python YAML RC Validation |
| |
| on: |
| workflow_dispatch: |
| inputs: |
| RELEASE_VER: |
| description: 'Beam Release Version (e.g., 2.64.0)' |
| required: true |
| default: '2.64.0' |
| RC_NUM: |
| description: 'Release Candidate number (e.g., 1)' |
| required: true |
| default: '1' |
| # APACHE_CONTENTS_REPO is not needed for Python-only YAML test |
| # CLEANUP_BQ_RESOURCES is not needed as we use GCS |
| |
| # This allows a subsequently queued workflow run to interrupt previous runs |
| concurrency: |
| group: '${{ github.workflow }} @ ${{ github.event.inputs.RELEASE_VER }}-${{ github.event.inputs.RC_NUM }}' |
| cancel-in-progress: true |
| |
| # Setting explicit permissions for the action |
| permissions: |
| actions: write |
| pull-requests: write # Needed for setup-action potentially |
| checks: write |
| contents: read # Needs read to checkout the code |
| deployments: read |
| id-token: write # Required for GCP Workload Identity Federation |
| issues: write |
| discussions: read |
| packages: read |
| pages: read |
| repository-projects: read |
| security-events: read |
| statuses: read |
| |
| env: # Workflow level env vars |
| GCP_PROJECT_ID: 'apache-beam-testing' |
| |
| jobs: |
| run_python_yaml_rc_validation: |
| name: Run Python YAML RC Validation (${{ github.event.inputs.RELEASE_VER }} RC${{ github.event.inputs.RC_NUM }}) |
| runs-on: [self-hosted, ubuntu-20.04, main] |
| timeout-minutes: 60 # Reduced timeout as the job runs for ~20 mins + setup/validation |
| env: # Job-level env vars |
| DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} |
| GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} |
| GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} |
| RUN_ID_SUFFIX: ${{ github.run_id }}_${{ github.run_attempt }} |
| GCE_REGION: 'us-central1' |
| RELEASE_VERSION: ${{ github.event.inputs.RELEASE_VER }} |
| RC_NUM: ${{ github.event.inputs.RC_NUM }} |
| # Define the base bucket and unique folder prefix directly here |
| GCS_UNIQUE_FOLDER_PREFIX: gs://rc-validation-migration-tests/yaml_rc_validation/${{ github.event.inputs.RELEASE_VER }}_RC${{ github.event.inputs.RC_NUM }}_${{ github.run_id }}_${{ github.run_attempt }} |
| # Temp, Staging, and Output locations will be constructed in the steps using the prefix above |
| RC_TAG: "v${{github.event.inputs.RELEASE_VER}}-RC${{github.event.inputs.RC_NUM}}" |
| PYTHON_VERSION: '3.12' # Or adjust if needed |
| BEAM_PYTHON_SDK_TAR_GZ: apache_beam-${{ github.event.inputs.RELEASE_VER }}.tar.gz |
| BEAM_SOURCE_ZIP: apache-beam-${{ github.event.inputs.RELEASE_VER }}-source-release.zip |
| APACHE_DIST_URL_BASE: https://dist.apache.org/repos/dist/dev/beam/${{ github.event.inputs.RELEASE_VER }} |
| YAML_PIPELINE_FILE: t1_2.yaml |
| SUBMISSION_TIMEOUT_SECONDS: 120 # Timeout for the python submission script itself |
| |
| steps: |
| - name: Checkout code at RC tag |
| uses: actions/checkout@v4 |
| with: |
| ref: ${{ env.RC_TAG }} |
| |
| - name: Setup environment |
| uses: ./.github/actions/setup-environment-action |
| with: |
| java-version: 11 # Keep Java setup for now, might be needed by gcloud/Dataflow |
| |
| - name: Setup Python |
| uses: actions/setup-python@v5 |
| with: |
| python-version: ${{ env.PYTHON_VERSION }} |
| |
| - name: Install Dependencies |
| run: | |
| sudo apt-get update --yes |
| sudo apt-get install -y wget unzip coreutils procps grep sed |
| shell: bash |
| |
| - name: Set up Cloud SDK |
| uses: google-github-actions/setup-gcloud@v3 |
| |
| - name: Download RC Artifacts |
| run: | |
| echo "Downloading from ${{ env.APACHE_DIST_URL_BASE }}" |
| wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }} |
| wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512 |
| # Source zip not strictly needed if installing from tar.gz, but keeping for consistency/potential future use |
| wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }} |
| wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }}.sha512 |
| shell: bash |
| |
| - name: Verify Hashes |
| run: | |
| echo "Verifying sha512 checksums..." |
| sha512sum -c ${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512 |
| sha512sum -c ${{ env.BEAM_SOURCE_ZIP }}.sha512 |
| shell: bash |
| |
| - name: Setup Python Virtual Environment |
| run: | |
| echo "Setting up Python virtual environment..." |
| python -m venv beam_env |
| source beam_env/bin/activate |
| pip install --upgrade pip setuptools wheel |
| echo "Virtual environment ready." |
| shell: bash |
| |
| - name: Install Python SDK with [gcp, yaml] extras |
| run: | |
| echo "Installing Python SDK: ${{ env.BEAM_PYTHON_SDK_TAR_GZ }} with [gcp,yaml] extras" |
| source beam_env/bin/activate |
| # Install from the downloaded tar.gz |
| pip install "${{ env.BEAM_PYTHON_SDK_TAR_GZ }}[gcp,yaml]" |
| echo "SDK installed." |
| pip freeze # Log installed packages |
| shell: bash |
| |
| - name: Create YAML Pipeline File |
| run: | |
| echo "Creating YAML pipeline file: ${{ env.YAML_PIPELINE_FILE }}" |
| cat <<EOF > ${{ env.YAML_PIPELINE_FILE }} |
| pipeline: |
| type: chain |
| transforms: |
| - type: ReadFromPubSub |
| config: |
| topic: projects/pubsub-public-data/topics/taxirides-realtime |
| format: json |
| schema: |
| type: object |
| properties: |
| ride_id: {type: string} |
| - type: WriteToJson |
| config: |
| # Construct the output path directly here |
| path: "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json" |
| num_shards: 100 |
| windowing: |
| type: fixed |
| size: 30s |
| options: |
| streaming: true |
| EOF |
| echo "YAML file created:" |
| cat ${{ env.YAML_PIPELINE_FILE }} |
| shell: bash |
| |
| - name: Run YAML Pipeline (Dataflow Runner), Wait, Extract ID, Cleanup Submitter |
| id: submit_yaml_df |
| run: | |
| echo "Running YAML Pipeline with DataflowRunner in Background..." |
| source beam_env/bin/activate |
| python -m apache_beam.yaml.main \ |
| --yaml_pipeline_file=${{ env.YAML_PIPELINE_FILE }} \ |
| --runner DataflowRunner \ |
| --region=${{ env.GCE_REGION }} \ |
| --project=${{ env.GCP_PROJECT_ID }} \ |
| --temp_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/temp \ |
| --staging_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/staging \ |
| > yaml_dataflow_submit.log 2>&1 & |
| |
| YAML_DF_PID=$! |
| echo "YAML Pipeline (Dataflow Runner) submission process started in background with PID: ${YAML_DF_PID}" |
| echo ${YAML_DF_PID} > yaml_dataflow_submit.pid |
| |
| echo "Waiting up to ${{ env.SUBMISSION_TIMEOUT_SECONDS }} seconds for Dataflow job submission process (PID: ${YAML_DF_PID}) to potentially complete..." |
| sleep ${{ env.SUBMISSION_TIMEOUT_SECONDS }} |
| |
| echo "Proceeding with Job ID extraction..." |
| # Try extracting Job ID using common patterns from Dataflow submission logs |
| JOB_ID=$(grep -oP 'Dataflow Job ID: \K\S+' yaml_dataflow_submit.log || grep -oP "job_id='?\K[^' >]+" yaml_dataflow_submit.log || grep -oP "id: '?\"?\K[^'\" >]+" yaml_dataflow_submit.log | head -n 1) |
| |
| if [[ -n "$JOB_ID" ]]; then |
| echo "Extracted YAML Dataflow Job ID: $JOB_ID" |
| echo "$JOB_ID" > yaml_dataflow_jobid.txt |
| else |
| echo "ERROR: Could not extract YAML Dataflow Job ID after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s wait. Log content:" |
| echo "--- YAML Dataflow submission log START ---" |
| cat yaml_dataflow_submit.log || echo "Log file not found." |
| echo "--- YAML Dataflow submission log END ---" |
| # Exit the step with failure if job ID is crucial and not found |
| exit 1 |
| fi |
| |
| # Check if the submission process is still running and kill it if necessary |
| if [ -f yaml_dataflow_submit.pid ] && ps -p $YAML_DF_PID > /dev/null; then |
| echo "Submission process (PID: $YAML_DF_PID) is still running after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s. Attempting to kill it." |
| kill -9 $YAML_DF_PID || echo "Failed to kill process $YAML_DF_PID." |
| else |
| echo "Submission process (PID: $YAML_DF_PID) has already finished or PID file is missing." |
| fi |
| # Clean up PID file regardless |
| if [ -f yaml_dataflow_submit.pid ]; then |
| rm yaml_dataflow_submit.pid |
| fi |
| |
| echo "YAML Pipeline (Dataflow Runner) submission step finished processing." |
| shell: bash |
| |
| - name: Wait for Job to Run |
| run: | |
| if [ ! -f yaml_dataflow_jobid.txt ]; then |
| echo "Skipping wait as Job ID was not extracted." |
| exit 0 # Allow cleanup to proceed |
| fi |
| JOB_ID=$(cat yaml_dataflow_jobid.txt) |
| echo "Waiting for 20 minutes for Dataflow job $JOB_ID to run..." |
| sleep 1200 # 20 minutes = 1200 seconds |
| echo "Wait finished." |
| shell: bash |
| |
| - name: Cancel YAML Dataflow Job |
| if: always() # Run even if wait failed or previous steps failed, to attempt cleanup |
| run: | |
| if [ -f yaml_dataflow_jobid.txt ]; then |
| JOB_ID=$(cat yaml_dataflow_jobid.txt) |
| if [[ -n "$JOB_ID" ]]; then |
| echo "Attempting to cancel YAML Dataflow job: $JOB_ID in region ${{ env.GCE_REGION }}" |
| gcloud dataflow jobs cancel "$JOB_ID" --region=${{ env.GCE_REGION }} --project=${{ env.GCP_PROJECT_ID }} || echo "Failed to cancel YAML Dataflow job $JOB_ID (maybe it finished or was already cancelled)." |
| else |
| echo "YAML Dataflow Job ID file exists but is empty." |
| fi |
| # Keep jobid file for validation step, remove in final cleanup |
| else |
| echo "yaml_dataflow_jobid.txt not found, cannot cancel job (it might have failed before ID extraction)." |
| fi |
| shell: bash |
| |
| - name: Validate GCS Output |
| run: | |
| if [ ! -f yaml_dataflow_jobid.txt ]; then |
| echo "Skipping GCS validation as Job ID was not extracted (job likely failed early)." |
| exit 0 # Allow cleanup to proceed |
| fi |
| # Construct the output path pattern directly here |
| OUTPUT_PATTERN="${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json-*-of-*" |
| echo "Validating GCS output files exist matching pattern: ${OUTPUT_PATTERN}" |
| # Wait a bit for cancellation to finalize and files to potentially appear fully |
| sleep 60 |
| # Check if any files matching the pattern exist within the unique output folder. |
| echo "Checking for files matching pattern: ${OUTPUT_PATTERN}" |
| if gsutil ls "${OUTPUT_PATTERN}" > /dev/null 2>&1; then |
| echo "SUCCESS: Found output files matching pattern in GCS." |
| gsutil ls "${OUTPUT_PATTERN}" # List found files |
| else |
| echo "ERROR: No output files found matching pattern '${OUTPUT_PATTERN}' in GCS bucket." |
| exit 1 |
| fi |
| shell: bash |
| |
| # ================== Cleanup ================== |
| - name: Cleanup GCS Temp/Staging and Local Files |
| if: always() |
| run: | |
| echo "Deleting unique run folder in GCS: ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}" |
| # Delete the entire unique folder for this run, including temp, staging, and output |
| gsutil -m rm -r "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}" || echo "Failed to delete unique run folder ${{ env.GCS_UNIQUE_FOLDER_PREFIX }} in GCS. Manual cleanup might be required." |
| |
| echo "Removing local log, yaml, and jobid files..." |
| rm -f yaml_dataflow_submit.log ${{ env.YAML_PIPELINE_FILE }} yaml_dataflow_jobid.txt |
| shell: bash |