| # coding: utf-8 |
| |
| """ |
| Airflow API |
| |
| Airflow API. All endpoints located under ``/api/v2`` can be used safely, are stable and backward compatible. Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change depending on the need of the frontend. Users should not rely on those but use the public ones instead. |
| |
| The version of the OpenAPI document: 2 |
| Generated by OpenAPI Generator (https://openapi-generator.tech) |
| |
| Do not edit the class manually. |
| """ # noqa: E501 |
| |
| |
| from __future__ import annotations |
| import pprint |
| import re # noqa: F401 |
| import json |
| |
| from datetime import datetime |
| from pydantic import BaseModel, ConfigDict, Field, StrictBool, StrictFloat, StrictInt, StrictStr |
| from typing import Any, ClassVar, Dict, List, Optional, Union |
| from airflow_client.client.models.time_delta import TimeDelta |
| from typing import Optional, Set |
| from typing_extensions import Self |
| |
| class TaskResponse(BaseModel): |
| """ |
| Task serializer for responses. |
| """ # noqa: E501 |
| class_ref: Optional[Dict[str, Any]] = None |
| depends_on_past: StrictBool |
| doc_md: Optional[StrictStr] = None |
| downstream_task_ids: Optional[List[StrictStr]] = None |
| end_date: Optional[datetime] = None |
| execution_timeout: Optional[TimeDelta] = None |
| extra_links: List[StrictStr] = Field(description="Extract and return extra_links.") |
| is_mapped: Optional[StrictBool] = None |
| operator_name: Optional[StrictStr] = None |
| owner: Optional[StrictStr] = None |
| params: Optional[Dict[str, Any]] = None |
| pool: Optional[StrictStr] = None |
| pool_slots: Optional[Union[StrictFloat, StrictInt]] = None |
| priority_weight: Optional[Union[StrictFloat, StrictInt]] = None |
| queue: Optional[StrictStr] = None |
| retries: Optional[Union[StrictFloat, StrictInt]] = None |
| retry_delay: Optional[TimeDelta] = None |
| retry_exponential_backoff: StrictBool |
| start_date: Optional[datetime] = None |
| task_display_name: Optional[StrictStr] = None |
| task_id: Optional[StrictStr] = None |
| template_fields: Optional[List[StrictStr]] = None |
| trigger_rule: Optional[StrictStr] = None |
| ui_color: Optional[StrictStr] = None |
| ui_fgcolor: Optional[StrictStr] = None |
| wait_for_downstream: StrictBool |
| weight_rule: Optional[StrictStr] = None |
| __properties: ClassVar[List[str]] = ["class_ref", "depends_on_past", "doc_md", "downstream_task_ids", "end_date", "execution_timeout", "extra_links", "is_mapped", "operator_name", "owner", "params", "pool", "pool_slots", "priority_weight", "queue", "retries", "retry_delay", "retry_exponential_backoff", "start_date", "task_display_name", "task_id", "template_fields", "trigger_rule", "ui_color", "ui_fgcolor", "wait_for_downstream", "weight_rule"] |
| |
| model_config = ConfigDict( |
| populate_by_name=True, |
| validate_assignment=True, |
| protected_namespaces=(), |
| ) |
| |
| |
| def to_str(self) -> str: |
| """Returns the string representation of the model using alias""" |
| return pprint.pformat(self.model_dump(by_alias=True)) |
| |
| def to_json(self) -> str: |
| """Returns the JSON representation of the model using alias""" |
| # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead |
| return json.dumps(self.to_dict()) |
| |
| @classmethod |
| def from_json(cls, json_str: str) -> Optional[Self]: |
| """Create an instance of TaskResponse from a JSON string""" |
| return cls.from_dict(json.loads(json_str)) |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Return the dictionary representation of the model using alias. |
| |
| This has the following differences from calling pydantic's |
| `self.model_dump(by_alias=True)`: |
| |
| * `None` is only added to the output dict for nullable fields that |
| were set at model initialization. Other fields with value `None` |
| are ignored. |
| """ |
| excluded_fields: Set[str] = set([ |
| ]) |
| |
| _dict = self.model_dump( |
| by_alias=True, |
| exclude=excluded_fields, |
| exclude_none=True, |
| ) |
| # override the default output from pydantic by calling `to_dict()` of execution_timeout |
| if self.execution_timeout: |
| _dict['execution_timeout'] = self.execution_timeout.to_dict() |
| # override the default output from pydantic by calling `to_dict()` of retry_delay |
| if self.retry_delay: |
| _dict['retry_delay'] = self.retry_delay.to_dict() |
| return _dict |
| |
| @classmethod |
| def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: |
| """Create an instance of TaskResponse from a dict""" |
| if obj is None: |
| return None |
| |
| if not isinstance(obj, dict): |
| return cls.model_validate(obj) |
| |
| _obj = cls.model_validate({ |
| "class_ref": obj.get("class_ref"), |
| "depends_on_past": obj.get("depends_on_past"), |
| "doc_md": obj.get("doc_md"), |
| "downstream_task_ids": obj.get("downstream_task_ids"), |
| "end_date": obj.get("end_date"), |
| "execution_timeout": TimeDelta.from_dict(obj["execution_timeout"]) if obj.get("execution_timeout") is not None else None, |
| "extra_links": obj.get("extra_links"), |
| "is_mapped": obj.get("is_mapped"), |
| "operator_name": obj.get("operator_name"), |
| "owner": obj.get("owner"), |
| "params": obj.get("params"), |
| "pool": obj.get("pool"), |
| "pool_slots": obj.get("pool_slots"), |
| "priority_weight": obj.get("priority_weight"), |
| "queue": obj.get("queue"), |
| "retries": obj.get("retries"), |
| "retry_delay": TimeDelta.from_dict(obj["retry_delay"]) if obj.get("retry_delay") is not None else None, |
| "retry_exponential_backoff": obj.get("retry_exponential_backoff"), |
| "start_date": obj.get("start_date"), |
| "task_display_name": obj.get("task_display_name"), |
| "task_id": obj.get("task_id"), |
| "template_fields": obj.get("template_fields"), |
| "trigger_rule": obj.get("trigger_rule"), |
| "ui_color": obj.get("ui_color"), |
| "ui_fgcolor": obj.get("ui_fgcolor"), |
| "wait_for_downstream": obj.get("wait_for_downstream"), |
| "weight_rule": obj.get("weight_rule") |
| }) |
| return _obj |
| |
| |