blob: f211650f7bba3f6544969daf462f75b0b271e757 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common helper module for CI/CD Steps
"""
import asyncio
import logging
import os
import urllib.parse
from pathlib import PurePath
from typing import List, Optional, Dict
from api.v1 import api_pb2
import pydantic
import yaml
from api.v1.api_pb2 import (
SDK_UNSPECIFIED,
STATUS_UNSPECIFIED,
Sdk,
STATUS_VALIDATING,
STATUS_PREPARING,
STATUS_COMPILING,
STATUS_EXECUTING,
PRECOMPILED_OBJECT_TYPE_UNIT_TEST,
PRECOMPILED_OBJECT_TYPE_KATA,
PRECOMPILED_OBJECT_TYPE_UNSPECIFIED,
PRECOMPILED_OBJECT_TYPE_EXAMPLE,
PrecompiledObjectType,
)
from config import Config, TagFields, PrecompiledExampleType
from grpc_client import GRPCClient
from constants import BEAM_ROOT_DIR_ENV_VAR_KEY
from models import Example, Tag, SdkEnum, Dataset
def _check_no_nested(subdirs: List[str]):
"""
Check there're no nested subdirs
Sort alphabetically and compare the pairs of adjacent items
using pathlib.PurePath: we don't want fs calls in this check
"""
sorted_subdirs = sorted(PurePath(s) for s in subdirs)
for dir1, dir2 in zip(sorted_subdirs, sorted_subdirs[1:]):
if dir1 in [dir2, *dir2.parents]:
raise ValueError(f"{dir2} is a subdirectory of {dir1}")
def find_examples(root_dir: str, subdirs: List[str], sdk: SdkEnum) -> List[Example]:
"""
Find and return beam examples.
Search throws all child files of work_dir directory files with beam tag:
Beam-playground:
name: NameOfExample
description: Description of NameOfExample.
multifile: false
default_example: false
context_line: 10
categories:
- category-1
- category-2
pipeline_options: --inputFile your_file --outputFile your_output_file
complexity: MEDIUM
tags:
- example
If some example contains beam tag with incorrect format raise an error.
Args:
root_dir: project root dir
subdirs: sub-directories where to search examples.
sdk: sdk that using to find examples for the specific sdk.
Returns:
List of Examples.
"""
has_errors = False
examples = []
_check_no_nested(subdirs)
for subdir in subdirs:
subdir = os.path.join(root_dir, subdir)
logging.info("subdir: %s", subdir)
for root, _, files in os.walk(subdir):
for filename in files:
filepath = os.path.join(root, filename)
try:
try:
example = _load_example(
filename=filename, filepath=filepath, sdk=sdk
)
if example is not None:
examples.append(example)
except pydantic.ValidationError as err:
if len(err.errors()) > 1:
raise
if err.errors()[0]["msg"] == "multifile is True but no files defined":
logging.warning("incomplete multifile example ignored %s", filepath)
continue
raise
except Exception:
logging.exception("error loading example at %s", filepath)
has_errors = True
if has_errors:
raise ValueError(
"Some of the beam examples contain beam playground tag with "
"an incorrect format"
)
return examples
def get_tag(filepath: PurePath) -> Optional[Tag]:
"""
Parse file by filepath and find beam tag
Args:
filepath: path of the file
Returns:
If file contains tag, returns Tag object
If file doesn't contain tag, returns None
"""
with open(filepath, encoding="utf-8") as parsed_file:
lines = parsed_file.readlines()
line_start: Optional[int] = None
line_finish: Optional[int] = None
tag_prefix: Optional[str] = ""
for idx, line in enumerate(lines):
if line_start is None and line.endswith(Config.BEAM_PLAYGROUND_TITLE):
line_start = idx
prefix_len = len(line) - len(Config.BEAM_PLAYGROUND_TITLE)
tag_prefix = line[:prefix_len]
elif line_start and not line.startswith(tag_prefix):
line_finish = idx
break
if not line_start or not line_finish:
return None
embdedded_yaml_content = "".join(
line[len(tag_prefix) :] for line in lines[line_start:line_finish]
)
yml = yaml.load(embdedded_yaml_content, Loader=yaml.SafeLoader)
try:
return Tag(
filepath=str(filepath),
line_start=line_start,
line_finish=line_finish,
**yml[Config.BEAM_PLAYGROUND],
)
except pydantic.ValidationError as err:
if len(err.errors()) == 1 and err.errors()[0]["msg"] == "multifile is True but no files defined":
logging.warning("incomplete multifile example ignored %s", filepath)
return None
raise
def _load_example(filename, filepath, sdk: SdkEnum) -> Optional[Example]:
"""
Check file by filepath for matching to beam example. If file is beam example,
Args:
filename: name of the file.
filepath: path to the file.
sdk: sdk that using to find examples for the specific sdk.
Returns:
If the file is an example, return Example object
If it's not, return None
In case of error, raise Exception
"""
logging.debug("inspecting file %s", filepath)
extension = filepath.split(os.extsep)[-1]
if extension == Config.SDK_TO_EXTENSION[sdk]:
logging.debug("sdk %s matched extension %s", api_pb2.Sdk.Name(sdk), extension)
tag = get_tag(filepath)
if tag is not None:
logging.debug("playground-beam tag found")
return _get_example(filepath, filename, tag, sdk)
return None
# Make load_supported_categories called only once
# to make testing easier
_load_supported_categories = False
def load_supported_categories(categories_path: str):
"""
Load the list of supported categories from categories_path file
into Tag model config
Args:
categories_path: path to the file with categories.
"""
global _load_supported_categories
if _load_supported_categories:
return
with open(categories_path, encoding="utf-8") as supported_categories:
yaml_object = yaml.load(supported_categories.read(), Loader=yaml.SafeLoader)
Tag.Config.supported_categories = yaml_object[TagFields.categories]
_load_supported_categories = True
def _get_content(filepath: str, tag_start_line: int, tag_finish_line) -> str:
with open(filepath, encoding="utf-8") as parsed_file:
lines = parsed_file.readlines()
lines = lines[:tag_start_line] + lines[tag_finish_line:]
return "".join(lines)
def _get_url_vcs(filepath: str) -> str:
"""
Construct VCS URL from example's filepath
"""
root_dir = os.getenv(BEAM_ROOT_DIR_ENV_VAR_KEY, "../..")
rel_path = os.path.relpath(filepath, root_dir)
url_vcs = "{}/{}".format(Config.URL_VCS_PREFIX, urllib.parse.quote(rel_path))
return url_vcs
def _get_example(filepath: str, filename: str, tag: Tag, sdk: int) -> Example:
"""
Return an Example by filepath and filename.
Args:
filepath: path of the example's file.
filename: name of the example's file.
tag: tag of the example.
Returns:
Parsed Example object.
"""
# Calculate context line with tag removed. Note: context_line is 1-based, line_start and line_finish are 0-based.
context_line = tag.context_line if tag.context_line <= tag.line_start else tag.context_line - (tag.line_finish - tag.line_start)
return Example(
sdk=SdkEnum(sdk),
tag=tag,
filepath=filepath,
status=STATUS_UNSPECIFIED,
type=_get_object_type(filename, filepath),
code=_get_content(filepath, tag.line_start, tag.line_finish),
url_vcs=_get_url_vcs(filepath), # type: ignore
context_line=context_line,
)
async def update_example_status(example: Example, client: GRPCClient):
"""
Receive status for examples and update example.status and pipeline_id
Use client to send requests to the backend:
1. Start code processing.
2. Ping the backend while status is STATUS_VALIDATING/
STATUS_PREPARING/STATUS_COMPILING/STATUS_EXECUTING
Update example.status with resulting status.
Args:
example: beam example for processing and updating status and pipeline_id.
client: client to send requests to the server.
"""
datasets: List[api_pb2.Dataset] = []
for emulator in example.tag.emulators:
dataset: Dataset = example.tag.datasets[emulator.topic.source_dataset]
datasets.append(
api_pb2.Dataset(
type=api_pb2.EmulatorType.Value(
f"EMULATOR_TYPE_{emulator.type.upper()}"
),
options={"topic": emulator.topic.id},
dataset_path=dataset.file_name,
)
)
files: List[api_pb2.SnippetFile] = [
api_pb2.SnippetFile(name=example.filepath, content=example.code, is_main=True)
]
for file in example.tag.files:
files.append(
api_pb2.SnippetFile(name=file.name, content=file.content, is_main=False)
)
pipeline_id = await client.run_code(
example.code, example.sdk, example.tag.pipeline_options, datasets, files=files,
)
example.pipeline_id = pipeline_id
status = await client.check_status(pipeline_id)
while status in [
STATUS_VALIDATING,
STATUS_PREPARING,
STATUS_COMPILING,
STATUS_EXECUTING,
]:
await asyncio.sleep(Config.PAUSE_DELAY)
status = await client.check_status(pipeline_id)
example.status = status
def _get_object_type(filename, filepath):
"""
Get type of an object based on it filename/filepath
Args:
filename: object's filename
filepath: object's filepath
Returns: type of the object (example, kata, unit-test)
"""
filename_no_ext = (os.path.splitext(filename)[0]).lower()
if filename_no_ext.endswith(PrecompiledExampleType.test_ends):
object_type = PRECOMPILED_OBJECT_TYPE_UNIT_TEST
elif PrecompiledExampleType.katas in filepath.split(os.sep):
object_type = PRECOMPILED_OBJECT_TYPE_KATA
elif PrecompiledExampleType.examples in filepath.split(os.sep):
object_type = PRECOMPILED_OBJECT_TYPE_EXAMPLE
else:
object_type = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
return object_type
class DuplicatesError(Exception):
pass
class ConflictingDatasetsError(Exception):
pass
def validate_examples_for_duplicates_by_name(examples: List[Example]):
"""
Validate examples for duplicates by example name to avoid duplicates in the Cloud Datastore
:param examples: examples from the repository for saving to the Cloud Datastore
"""
duplicates: Dict[str, Example] = {}
for example in examples:
if example.tag.name not in duplicates.keys():
duplicates[example.tag.name] = example
else:
err_msg = f"Examples have duplicate names.\nDuplicates: \n - path #1: {duplicates[example.tag.name].filepath} \n - path #2: {example.filepath}"
logging.error(err_msg)
raise DuplicatesError(err_msg)
def validate_examples_for_conflicting_datasets(examples: List[Example]):
"""
Validate examples for conflicting datasets to avoid conflicts in the Cloud Datastore
:param examples: examples from the repository for saving to the Cloud Datastore
"""
datasets: Dict[str, Dataset] = {}
for example in examples:
for k, v in example.tag.datasets.items():
if k not in datasets:
datasets[k] = v
elif datasets[k].file_name != v.file_name or \
datasets[k].format != v.format or \
datasets[k].location != v.location:
err_msg = f"Examples have conflicting datasets.\n" \
f"Conflicts: \n" \
f" - file_name #1: {datasets[k].file_name} \n" \
f" - format #1: {datasets[k].format} \n" \
f" - location #1: {datasets[k].location} \n" \
f" - file_name #2: {v.file_name}\n" \
f" - format #2: {v.format}\n" \
f" - location #2: {v.location}\n" \
f"Dataset name: {k}"
logging.error(err_msg)
raise ConflictingDatasetsError(err_msg)