blob: 78d3b71a2f254a70b7ed571d217c590eeb956776 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
from pathlib import Path
from typing import List
from tqdm.asyncio import tqdm
from api.v1.api_pb2 import Sdk, SDK_PYTHON, SDK_JAVA
from api.v1.api_pb2 import (
STATUS_COMPILE_ERROR,
STATUS_ERROR,
STATUS_RUN_ERROR,
STATUS_RUN_TIMEOUT,
STATUS_VALIDATION_ERROR,
STATUS_PREPARATION_ERROR,
)
from config import Origin, Config
from grpc_client import GRPCClient
from helper import update_example_status
from models import Example, SdkEnum
class VerifyException(Exception):
pass
class Verifier:
"""Run examples and verify the results, enrich examples with produced artifacts"""
LOGS_FILENAME = "logs.log"
OUTPUT_FILENAME = "output.log"
GRAPH_FILENAME = "graph.log"
COMPILE_OUTPUT_FILENAME = "compile_output.log"
_sdk: SdkEnum
_origin: Origin
def __init__(self, sdk: SdkEnum, origin: Origin):
self._sdk = sdk
self._origin = origin
def run_verify(self, examples: List[Example]):
"""
Save beam examples and their output in the Google Cloud Datastore.
"""
logging.info("Start of executing Playground examples ...")
asyncio.run(self._run_and_verify(examples))
logging.info("Finish of executing Playground examples")
async def _get_statuses(
self,
client: GRPCClient,
examples: List[Example],
concurrency: int = 10
):
"""
Receive status and update example.status and example.pipeline_id for
each example
Args:
examples: beam examples for processing and updating statuses and
pipeline_id values.
"""
tasks = []
try:
concurrency = int(os.environ["BEAM_CONCURRENCY"])
logging.info("override default concurrency: %d", concurrency)
except (KeyError, ValueError):
pass
semaphore = asyncio.Semaphore(concurrency)
async def _semaphored_task(example):
await semaphore.acquire()
try:
await update_example_status(example, client)
await self._populate_fields(example, client)
finally:
semaphore.release()
for example in examples:
if example.tag.never_run:
logging.info("skipping non runnable example %s", example.filepath)
else:
tasks.append(_semaphored_task(example))
await tqdm.gather(*tasks)
async def _populate_fields(self, example: Example, client: GRPCClient):
"""
Populate fields of the example reading them from the backend or from the repository.
Args:
example: beam example that should be verified
"""
if example.tag.never_run:
logging.info("populating example fields from provided files %s", example.filepath)
self._populate_from_repo(example)
else:
await self._populate_from_runner(example, client)
def _populate_from_repo(self, example: Example):
"""
Populate fields of the example reading them from the repository.
Args:
example: beam example that should be verified
"""
path = Path(example.filepath)
example_folder = path.parent
log_file_path = example_folder / self.LOGS_FILENAME
# Check if the file exists and read its content
if log_file_path.exists():
example.logs = log_file_path.read_text()
graph_file_path = example_folder / self.GRAPH_FILENAME
# Check if the file exists and read its content
if graph_file_path.exists():
example.graph = graph_file_path.read_text()
output_file_path = example_folder / self.OUTPUT_FILENAME
# Check if the file exists and read its content
if output_file_path.exists():
example.output = output_file_path.read_text()
compile_output_file_path = example_folder / self.COMPILE_OUTPUT_FILENAME
# Check if the file exists and read its content
if compile_output_file_path.exists():
example.compile_output = compile_output_file_path.read_text()
async def _populate_from_runner(self, example: Example, client: GRPCClient):
try:
example.compile_output = await client.get_compile_output(
example.pipeline_id
)
example.output = await client.get_run_output(example.pipeline_id, example.filepath)
example.logs = await client.get_log(example.pipeline_id, example.filepath)
if example.sdk in [SDK_JAVA, SDK_PYTHON]:
example.graph = await client.get_graph(
example.pipeline_id, example.filepath
)
except Exception as e:
logging.error(example.url_vcs)
logging.error(example.compile_output)
raise RuntimeError(f"error in {example.tag.name}") from e
async def _run_and_verify(self, examples: List[Example]):
"""
Run beam examples and keep their output.
Call the backend to start code processing for the examples.
Then receive code output.
Args:
examples: beam examples that should be run
"""
async with GRPCClient() as client:
await self._get_statuses(
client, examples
) # run examples code and wait until all are executed
await self._verify_examples(client, examples, self._origin)
async def _verify_examples(
self, client: GRPCClient, examples: List[Example], origin: Origin
):
"""
Verify statuses of beam examples and the number of found default examples.
Check example.status for each examples. If the status of the example is:
- STATUS_VALIDATION_ERROR/STATUS_PREPARATION_ERROR
/STATUS_ERROR/STATUS_RUN_TIMEOUT: log error
- STATUS_COMPILE_ERROR: get logs using GetCompileOutput request and
log them with error.
- STATUS_RUN_ERROR: get logs using GetRunError request and
log them with error.
Args:
examples: beam examples that should be verified
"""
count_of_verified = 0
verify_status_failed = False
default_examples = []
for example in examples:
if example.tag.default_example:
default_examples.append(example)
if example.status not in Config.ERROR_STATUSES:
count_of_verified += 1
continue
if example.status == STATUS_VALIDATION_ERROR:
logging.error("Example: %s has validation error", example.filepath)
elif example.status == STATUS_PREPARATION_ERROR:
logging.error("Example: %s has preparation error", example.filepath)
elif example.status == STATUS_ERROR:
logging.error(
"Example: %s has error during setup run builder", example.filepath
)
elif example.status == STATUS_RUN_TIMEOUT:
logging.error("Example: %s failed because of timeout", example.filepath)
elif example.status == STATUS_COMPILE_ERROR:
err = await client.get_compile_output(example.pipeline_id)
logging.error(
"Example: %s has compilation error: %s", example.filepath, err
)
elif example.status == STATUS_RUN_ERROR:
err = await client.get_run_error(example.pipeline_id)
logging.error(
"Example: %s has execution error: %s", example.filepath, err
)
verify_status_failed = True
logging.info(
"Number of verified Playground examples: %s / %s",
count_of_verified,
len(examples),
)
logging.info(
"Number of Playground examples with some error: %s / %s",
len(examples) - count_of_verified,
len(examples),
)
if origin == Origin.PG_EXAMPLES:
if len(default_examples) == 0:
logging.error("Default example not found")
raise VerifyException(
"CI step failed due to finding an incorrect number "
"of default examples. Default example not found"
)
if len(default_examples) > 1:
logging.error("Many default examples found")
logging.error("Examples where the default_example field is true:")
for example in default_examples:
logging.error(example.filepath)
raise VerifyException(
"CI step failed due to finding an incorrect number "
"of default examples. Many default examples found"
)
if verify_status_failed:
raise VerifyException("CI step failed due to errors in the examples")