| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| Common helper module for CI/CD Steps |
| """ |
| import asyncio |
| import logging |
| import os |
| import urllib.parse |
| from pathlib import PurePath |
| from typing import List, Optional, Dict |
| from api.v1 import api_pb2 |
| |
| import pydantic |
| import yaml |
| |
| from api.v1.api_pb2 import ( |
| SDK_UNSPECIFIED, |
| STATUS_UNSPECIFIED, |
| Sdk, |
| STATUS_VALIDATING, |
| STATUS_PREPARING, |
| STATUS_COMPILING, |
| STATUS_EXECUTING, |
| PRECOMPILED_OBJECT_TYPE_UNIT_TEST, |
| PRECOMPILED_OBJECT_TYPE_KATA, |
| PRECOMPILED_OBJECT_TYPE_UNSPECIFIED, |
| PRECOMPILED_OBJECT_TYPE_EXAMPLE, |
| PrecompiledObjectType, |
| ) |
| from config import Config, TagFields, PrecompiledExampleType |
| from grpc_client import GRPCClient |
| from constants import BEAM_ROOT_DIR_ENV_VAR_KEY |
| |
| from models import Example, Tag, SdkEnum, Dataset |
| |
| |
| def _check_no_nested(subdirs: List[str]): |
| """ |
| Check there're no nested subdirs |
| |
| Sort alphabetically and compare the pairs of adjacent items |
| using pathlib.PurePath: we don't want fs calls in this check |
| """ |
| sorted_subdirs = sorted(PurePath(s) for s in subdirs) |
| for dir1, dir2 in zip(sorted_subdirs, sorted_subdirs[1:]): |
| if dir1 in [dir2, *dir2.parents]: |
| raise ValueError(f"{dir2} is a subdirectory of {dir1}") |
| |
| |
| def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Example]: |
| """ |
| Find and return beam examples. |
| |
| Search throws all child files of work_dir directory files with beam tag: |
| Beam-playground: |
| name: NameOfExample |
| description: Description of NameOfExample. |
| multifile: false |
| default_example: false |
| context_line: 10 |
| categories: |
| - category-1 |
| - category-2 |
| pipeline_options: --inputFile your_file --outputFile your_output_file |
| complexity: MEDIUM |
| tags: |
| - example |
| If some example contains beam tag with incorrect format raise an error. |
| |
| Args: |
| root_dir: project root dir |
| subdirs: sub-directories where to search examples. |
| sdk: sdk that using to find examples for the specific sdk. |
| |
| Returns: |
| List of Examples. |
| """ |
| has_errors = False |
| examples = [] |
| _check_no_nested(subdirs) |
| for subdir in subdirs: |
| subdir = os.path.join(root_dir, subdir) |
| logging.info("subdir: %s", subdir) |
| for root, _, files in os.walk(subdir): |
| for filename in files: |
| filepath = os.path.join(root, filename) |
| try: |
| try: |
| example = _load_example( |
| filename=filename, filepath=filepath, sdk=sdk |
| ) |
| if example is not None: |
| examples.append(example) |
| except pydantic.ValidationError as err: |
| if len(err.errors()) > 1: |
| raise |
| if err.errors()[0]["msg"] == "multifile is True but no files defined": |
| logging.warning("incomplete multifile example ignored %s", filepath) |
| continue |
| raise |
| except Exception: |
| logging.exception("error loading example at %s", filepath) |
| has_errors = True |
| if has_errors: |
| raise ValueError( |
| "Some of the beam examples contain beam playground tag with " |
| "an incorrect format" |
| ) |
| return examples |
| |
| |
| def get_tag(filepath: PurePath) -> Optional[Tag]: |
| """ |
| Parse file by filepath and find beam tag |
| |
| Args: |
| filepath: path of the file |
| |
| Returns: |
| If file contains tag, returns Tag object |
| If file doesn't contain tag, returns None |
| """ |
| with open(filepath, encoding="utf-8") as parsed_file: |
| lines = parsed_file.readlines() |
| |
| line_start: Optional[int] = None |
| line_finish: Optional[int] = None |
| tag_prefix: Optional[str] = "" |
| for idx, line in enumerate(lines): |
| if line_start is None and line.endswith(Config.BEAM_PLAYGROUND_TITLE): |
| line_start = idx |
| prefix_len = len(line) - len(Config.BEAM_PLAYGROUND_TITLE) |
| tag_prefix = line[:prefix_len] |
| elif line_start and not line.startswith(tag_prefix): |
| line_finish = idx |
| break |
| |
| if not line_start or not line_finish: |
| return None |
| |
| embdedded_yaml_content = "".join( |
| line[len(tag_prefix) :] for line in lines[line_start:line_finish] |
| ) |
| yml = yaml.load(embdedded_yaml_content, Loader=yaml.SafeLoader) |
| |
| try: |
| return Tag( |
| filepath=str(filepath), |
| line_start=line_start, |
| line_finish=line_finish, |
| **yml[Config.BEAM_PLAYGROUND], |
| ) |
| except pydantic.ValidationError as err: |
| if len(err.errors()) == 1 and err.errors()[0]["msg"] == "multifile is True but no files defined": |
| logging.warning("incomplete multifile example ignored %s", filepath) |
| return None |
| raise |
| |
| def _load_example(filename, filepath, sdk: SdkEnum) -> Optional[Example]: |
| """ |
| Check file by filepath for matching to beam example. If file is beam example, |
| |
| Args: |
| filename: name of the file. |
| filepath: path to the file. |
| sdk: sdk that using to find examples for the specific sdk. |
| |
| Returns: |
| If the file is an example, return Example object |
| If it's not, return None |
| In case of error, raise Exception |
| """ |
| logging.debug("inspecting file %s", filepath) |
| extension = filepath.split(os.extsep)[-1] |
| if extension == Config.SDK_TO_EXTENSION[sdk]: |
| logging.debug("sdk %s matched extension %s", api_pb2.Sdk.Name(sdk), extension) |
| tag = get_tag(filepath) |
| if tag is not None: |
| logging.debug("playground-beam tag found") |
| return _get_example(filepath, filename, tag, sdk) |
| return None |
| |
| |
| # Make load_supported_categories called only once |
| # to make testing easier |
| _load_supported_categories = False |
| |
| |
| def load_supported_categories(categories_path: str): |
| """ |
| Load the list of supported categories from categories_path file |
| into Tag model config |
| |
| Args: |
| categories_path: path to the file with categories. |
| """ |
| global _load_supported_categories |
| if _load_supported_categories: |
| return |
| with open(categories_path, encoding="utf-8") as supported_categories: |
| yaml_object = yaml.load(supported_categories.read(), Loader=yaml.SafeLoader) |
| |
| Tag.Config.supported_categories = yaml_object[TagFields.categories] |
| _load_supported_categories = True |
| |
| |
| def _get_content(filepath: str, tag_start_line: int, tag_finish_line) -> str: |
| with open(filepath, encoding="utf-8") as parsed_file: |
| lines = parsed_file.readlines() |
| lines = lines[:tag_start_line] + lines[tag_finish_line:] |
| return "".join(lines) |
| |
| |
| def _get_url_vcs(filepath: str) -> str: |
| """ |
| Construct VCS URL from example's filepath |
| """ |
| root_dir = os.getenv(BEAM_ROOT_DIR_ENV_VAR_KEY, "../..") |
| rel_path = os.path.relpath(filepath, root_dir) |
| url_vcs = "{}/{}".format(Config.URL_VCS_PREFIX, urllib.parse.quote(rel_path)) |
| return url_vcs |
| |
| |
| def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example: |
| """ |
| Return an Example by filepath and filename. |
| |
| Args: |
| filepath: path of the example's file. |
| filename: name of the example's file. |
| tag: tag of the example. |
| |
| Returns: |
| Parsed Example object. |
| """ |
| |
| # Calculate context line with tag removed. Note: context_line is 1-based, line_start and line_finish are 0-based. |
| context_line = tag.context_line if tag.context_line <= tag.line_start else tag.context_line - (tag.line_finish - tag.line_start) |
| |
| return Example( |
| sdk=SdkEnum(sdk), |
| tag=tag, |
| filepath=filepath, |
| status=STATUS_UNSPECIFIED, |
| type=_get_object_type(filename, filepath), |
| code=_get_content(filepath, tag.line_start, tag.line_finish), |
| url_vcs=_get_url_vcs(filepath), # type: ignore |
| context_line=context_line, |
| ) |
| |
| |
| async def update_example_status(example: Example, client: GRPCClient): |
| """ |
| Receive status for examples and update example.status and pipeline_id |
| |
| Use client to send requests to the backend: |
| 1. Start code processing. |
| 2. Ping the backend while status is STATUS_VALIDATING/ |
| STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING |
| Update example.status with resulting status. |
| |
| Args: |
| example: beam example for processing and updating status and pipeline_id. |
| client: client to send requests to the server. |
| """ |
| datasets: List[api_pb2.Dataset] = [] |
| for emulator in example.tag.emulators: |
| dataset: Dataset = example.tag.datasets[emulator.topic.source_dataset] |
| |
| datasets.append( |
| api_pb2.Dataset( |
| type=api_pb2.EmulatorType.Value( |
| f"EMULATOR_TYPE_{emulator.type.upper()}" |
| ), |
| options={"topic": emulator.topic.id}, |
| dataset_path=dataset.file_name, |
| ) |
| ) |
| files: List[api_pb2.SnippetFile] = [ |
| api_pb2.SnippetFile(name=example.filepath, content=example.code, is_main=True) |
| ] |
| for file in example.tag.files: |
| files.append( |
| api_pb2.SnippetFile(name=file.name, content=file.content, is_main=False) |
| ) |
| |
| pipeline_id = await client.run_code( |
| example.code, example.sdk, example.tag.pipeline_options, datasets, files=files, |
| ) |
| example.pipeline_id = pipeline_id |
| status = await client.check_status(pipeline_id) |
| while status in [ |
| STATUS_VALIDATING, |
| STATUS_PREPARING, |
| STATUS_COMPILING, |
| STATUS_EXECUTING, |
| ]: |
| await asyncio.sleep(Config.PAUSE_DELAY) |
| status = await client.check_status(pipeline_id) |
| example.status = status |
| |
| |
| def _get_object_type(filename, filepath): |
| """ |
| Get type of an object based on it filename/filepath |
| |
| Args: |
| filename: object's filename |
| filepath: object's filepath |
| |
| Returns: type of the object (example, kata, unit-test) |
| """ |
| filename_no_ext = (os.path.splitext(filename)[0]).lower() |
| if filename_no_ext.endswith(PrecompiledExampleType.test_ends): |
| object_type = PRECOMPILED_OBJECT_TYPE_UNIT_TEST |
| elif PrecompiledExampleType.katas in filepath.split(os.sep): |
| object_type = PRECOMPILED_OBJECT_TYPE_KATA |
| elif PrecompiledExampleType.examples in filepath.split(os.sep): |
| object_type = PRECOMPILED_OBJECT_TYPE_EXAMPLE |
| else: |
| object_type = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED |
| return object_type |
| |
| |
| class DuplicatesError(Exception): |
| pass |
| |
| |
| class ConflictingDatasetsError(Exception): |
| pass |
| |
| |
| def validate_examples_for_duplicates_by_name(examples: List[Example]): |
| """ |
| Validate examples for duplicates by example name to avoid duplicates in the Cloud Datastore |
| :param examples: examples from the repository for saving to the Cloud Datastore |
| """ |
| duplicates: Dict[str, Example] = {} |
| for example in examples: |
| if example.tag.name not in duplicates.keys(): |
| duplicates[example.tag.name] = example |
| else: |
| err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.tag.name].filepath} \n - path #2: {example.filepath}" |
| logging.error(err_msg) |
| raise DuplicatesError(err_msg) |
| |
| |
| def validate_examples_for_conflicting_datasets(examples: List[Example]): |
| """ |
| Validate examples for conflicting datasets to avoid conflicts in the Cloud Datastore |
| :param examples: examples from the repository for saving to the Cloud Datastore |
| """ |
| datasets: Dict[str, Dataset] = {} |
| for example in examples: |
| for k, v in example.tag.datasets.items(): |
| if k not in datasets: |
| datasets[k] = v |
| elif datasets[k].file_name != v.file_name or \ |
| datasets[k].format != v.format or \ |
| datasets[k].location != v.location: |
| err_msg = f"Examples have conflicting datasets.\n" \ |
| f"Conflicts: \n" \ |
| f" - file_name #1: {datasets[k].file_name} \n" \ |
| f" - format #1: {datasets[k].format} \n" \ |
| f" - location #1: {datasets[k].location} \n" \ |
| f" - file_name #2: {v.file_name}\n" \ |
| f" - format #2: {v.format}\n" \ |
| f" - location #2: {v.location}\n" \ |
| f"Dataset name: {k}" |
| logging.error(err_msg) |
| raise ConflictingDatasetsError(err_msg) |