blob: 0c7311e0f77b52faa4348feefe0df3f475630700 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os.path
import pathlib
from enum import Enum, IntEnum
from typing import List, Optional, Dict
from api.v1 import api_pb2
from pydantic import BaseModel, Extra, Field, validator, root_validator, HttpUrl
from config import RepoProps
class ComplexityEnum(str, Enum):
BASIC = "BASIC"
MEDIUM = "MEDIUM"
ADVANCED = "ADVANCED"
class DatasetFormat(str, Enum):
JSON = "json"
AVRO = "avro"
class DatasetLocation(str, Enum):
LOCAL = "local"
class Dataset(BaseModel):
format: DatasetFormat
location: DatasetLocation
file_name: str = ""
class Topic(BaseModel):
id: str
source_dataset: str
class EmulatorType(str, Enum):
KAFKA = "kafka"
class Emulator(BaseModel):
type: EmulatorType
topic: Topic
class ImportFile(BaseModel):
name: str = Field(..., min_length=1)
context_line: int = 0
content: str = ""
class Tag(BaseModel):
"""
Tag represents the beam-playground embedded yaml content
"""
# These parameters are parsed from YAML and are required:
categories: List[str] = []
"""
Titles of categories to list this snippet in. Non-existent categories will be created.
"""
complexity: ComplexityEnum
"""
How hard is the snippet to understand.
"""
context_line: int = 1
"""
The line number to scroll to when the snippet is loaded.
This applies after the metadata block is removed from the file, so discount for those lines.
Integer, 1-based.
"""
description: str
"""
Description text to show on mouse over.
"""
name: str = Field(..., min_length=1)
"""
Name to show in the dropdown and to compose snippet ID from.
"""
# These parameters are parsed from YAML and are optional:
datasets: Dict[str, Dataset] = {}
"""
Datasets which will be used by emulators. Example:
datasets:
CountWords:
location: local
format: json
"""
default_example: bool = False
"""
Whether this is the default example in its SDK.
If multiple snippets set this to `true` the behavior is undefined.
"""
emulators: List[Emulator] = []
"""
List of emulators to start during pipeline execution. Currently only `kafka` type is supported.
Example:
emulators:
- type: kafka
topic:
id: dataset
source_dataset: CountWords
"""
always_run: bool = False
"""
The example output will not be cached and playground will always run its code when user clicks 'Run' if this is set to 'True'
"""
never_run: bool = False
"""
If 'True', it will not be possible to run this example from Playground UI. It will be read-only.
"""
multifile: bool = False
"""
Whether this is a file of a multi-file example.
"""
pipeline_options: str = ""
"""
Pipeline options. Example:
pipeline_options: --name1 value1 --name2 value2
"""
tags: List[str] = []
"""
Tags by which this snippet can be found in the dropdown.
"""
url_notebook: Optional[HttpUrl] = None
"""
The URL of the Colab notebook that is based on this snippet.
"""
# These parameters are NOT parsed from YAML but are added at construction:
filepath: str = Field(..., min_length=1)
files: List[ImportFile] = []
line_start: int
line_finish: int
class Config:
supported_categories = []
extra = Extra.forbid
@root_validator(skip_on_failure=True)
def lines_order(cls, values):
assert (
(0 <= values["line_start"] < values["line_finish"]) and
(values["context_line"] <= values["line_start"] or values["context_line"] > values["line_finish"])
), f"line ordering error: {values}"
return values
@root_validator(skip_on_failure=True)
def datasets_with_emulators(csl, values):
if values.get("datasets") and not values.get("emulators"):
raise ValueError("datasets w/o emulators")
return values
@validator("emulators", each_item=True)
def dataset_defined(cls, v, values, **kwargs):
if "datasets" not in values:
raise ValueError("datasets not defined")
for dataset_id in values["datasets"]:
if dataset_id == v.topic.source_dataset:
return v
raise ValueError(
f"Emulator topic {v.topic.id} has undefined dataset {v.topic.source_dataset}"
)
@validator("datasets")
def dataset_file_name(cls, datasets):
for dataset_id, dataset in datasets.items():
dataset.file_name = f"{dataset_id}.{dataset.format}"
if dataset.location == DatasetLocation.LOCAL:
dataset_path = os.path.join(
RepoProps.REPO_DATASETS_PATH, dataset.file_name
)
if not os.path.isfile(dataset_path):
logging.error(
"File not found at the specified path: %s", dataset_path
)
raise FileNotFoundError
return datasets
@validator("categories", each_item=True)
def category_supported(cls, v, values, config, **kwargs):
if v not in config.supported_categories:
raise ValueError(f"Category {v} not in {config.supported_categories}")
return v
@root_validator
def multifile_files(cls, values):
if values.get('multifile', False) and not values.get('files', []):
raise ValueError('multifile is True but no files defined')
return values
@validator("filepath")
def check_filepath_exists(cls, v: str):
if not os.path.isfile(v):
logging.error("Example file not found: %s", v)
raise FileNotFoundError(v)
return v
@validator("files", each_item=True)
def check_files(cls, v: ImportFile, values):
local_path = os.path.join(os.path.dirname(values["filepath"]), v.name)
if not os.path.isfile(local_path):
logging.error("Import file not found: %s", local_path)
raise FileNotFoundError(local_path)
v.content = open(local_path).read()
return v
class SdkEnum(IntEnum):
JAVA = api_pb2.SDK_JAVA
GO = api_pb2.SDK_GO
PYTHON = api_pb2.SDK_PYTHON
SCIO = api_pb2.SDK_SCIO
def StringToSdkEnum(s: str) -> SdkEnum:
parsed: int = api_pb2.Sdk.Value(s)
return SdkEnum(parsed)
class Example(BaseModel):
"""
Class which contains all information about beam example
"""
sdk: SdkEnum
tag: Tag
filepath: str = Field(..., min_length=1)
code: str = Field(..., min_length=1)
url_vcs: HttpUrl
type: int = api_pb2.PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
context_line: int = Field(..., gt=0)
status: int = api_pb2.STATUS_UNSPECIFIED
logs: str = ""
pipeline_id: str = ""
output: str = ""
compile_output: str = ""
graph: str = ""