| # coding: utf-8 |
| |
| """ |
| Airflow API |
| |
| Airflow API. All endpoints located under ``/api/v2`` can be used safely, are stable and backward compatible. Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change depending on the need of the frontend. Users should not rely on those but use the public ones instead. |
| |
| The version of the OpenAPI document: 2 |
| Generated by OpenAPI Generator (https://openapi-generator.tech) |
| |
| Do not edit the class manually. |
| """ # noqa: E501 |
| |
| |
| from __future__ import annotations |
| import json |
| import pprint |
| from pydantic import BaseModel, ConfigDict, Field, StrictStr, ValidationError, field_validator |
| from typing import Any, List, Optional |
| from airflow_client.client.models.bulk_create_action_variable_body import BulkCreateActionVariableBody |
| from airflow_client.client.models.bulk_delete_action_variable_body import BulkDeleteActionVariableBody |
| from airflow_client.client.models.bulk_update_action_variable_body import BulkUpdateActionVariableBody |
| from pydantic import StrictStr, Field |
| from typing import Union, List, Set, Optional, Dict |
| from typing_extensions import Literal, Self |
| |
| BULKBODYVARIABLEBODYACTIONSINNER_ONE_OF_SCHEMAS = ["BulkCreateActionVariableBody", "BulkDeleteActionVariableBody", "BulkUpdateActionVariableBody"] |
| |
| class BulkBodyVariableBodyActionsInner(BaseModel): |
| """ |
| BulkBodyVariableBodyActionsInner |
| """ |
| # data type: BulkCreateActionVariableBody |
| oneof_schema_1_validator: Optional[BulkCreateActionVariableBody] = None |
| # data type: BulkUpdateActionVariableBody |
| oneof_schema_2_validator: Optional[BulkUpdateActionVariableBody] = None |
| # data type: BulkDeleteActionVariableBody |
| oneof_schema_3_validator: Optional[BulkDeleteActionVariableBody] = None |
| actual_instance: Optional[Union[BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody]] = None |
| one_of_schemas: Set[str] = { "BulkCreateActionVariableBody", "BulkDeleteActionVariableBody", "BulkUpdateActionVariableBody" } |
| |
| model_config = ConfigDict( |
| validate_assignment=True, |
| protected_namespaces=(), |
| ) |
| |
| |
| def __init__(self, *args, **kwargs) -> None: |
| if args: |
| if len(args) > 1: |
| raise ValueError("If a position argument is used, only 1 is allowed to set `actual_instance`") |
| if kwargs: |
| raise ValueError("If a position argument is used, keyword arguments cannot be used.") |
| super().__init__(actual_instance=args[0]) |
| else: |
| super().__init__(**kwargs) |
| |
| @field_validator('actual_instance') |
| def actual_instance_must_validate_oneof(cls, v): |
| instance = BulkBodyVariableBodyActionsInner.model_construct() |
| error_messages = [] |
| match = 0 |
| # validate data type: BulkCreateActionVariableBody |
| if not isinstance(v, BulkCreateActionVariableBody): |
| error_messages.append(f"Error! Input type `{type(v)}` is not `BulkCreateActionVariableBody`") |
| else: |
| match += 1 |
| # validate data type: BulkUpdateActionVariableBody |
| if not isinstance(v, BulkUpdateActionVariableBody): |
| error_messages.append(f"Error! Input type `{type(v)}` is not `BulkUpdateActionVariableBody`") |
| else: |
| match += 1 |
| # validate data type: BulkDeleteActionVariableBody |
| if not isinstance(v, BulkDeleteActionVariableBody): |
| error_messages.append(f"Error! Input type `{type(v)}` is not `BulkDeleteActionVariableBody`") |
| else: |
| match += 1 |
| if match > 1: |
| # more than 1 match |
| raise ValueError("Multiple matches found when setting `actual_instance` in BulkBodyVariableBodyActionsInner with oneOf schemas: BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody. Details: " + ", ".join(error_messages)) |
| elif match == 0: |
| # no match |
| raise ValueError("No match found when setting `actual_instance` in BulkBodyVariableBodyActionsInner with oneOf schemas: BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody. Details: " + ", ".join(error_messages)) |
| else: |
| return v |
| |
| @classmethod |
| def from_dict(cls, obj: Union[str, Dict[str, Any]]) -> Self: |
| return cls.from_json(json.dumps(obj)) |
| |
| @classmethod |
| def from_json(cls, json_str: str) -> Self: |
| """Returns the object represented by the json string""" |
| instance = cls.model_construct() |
| error_messages = [] |
| match = 0 |
| |
| # deserialize data into BulkCreateActionVariableBody |
| try: |
| instance.actual_instance = BulkCreateActionVariableBody.from_json(json_str) |
| match += 1 |
| except (ValidationError, ValueError) as e: |
| error_messages.append(str(e)) |
| # deserialize data into BulkUpdateActionVariableBody |
| try: |
| instance.actual_instance = BulkUpdateActionVariableBody.from_json(json_str) |
| match += 1 |
| except (ValidationError, ValueError) as e: |
| error_messages.append(str(e)) |
| # deserialize data into BulkDeleteActionVariableBody |
| try: |
| instance.actual_instance = BulkDeleteActionVariableBody.from_json(json_str) |
| match += 1 |
| except (ValidationError, ValueError) as e: |
| error_messages.append(str(e)) |
| |
| if match > 1: |
| # more than 1 match |
| raise ValueError("Multiple matches found when deserializing the JSON string into BulkBodyVariableBodyActionsInner with oneOf schemas: BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody. Details: " + ", ".join(error_messages)) |
| elif match == 0: |
| # no match |
| raise ValueError("No match found when deserializing the JSON string into BulkBodyVariableBodyActionsInner with oneOf schemas: BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody. Details: " + ", ".join(error_messages)) |
| else: |
| return instance |
| |
| def to_json(self) -> str: |
| """Returns the JSON representation of the actual instance""" |
| if self.actual_instance is None: |
| return "null" |
| |
| if hasattr(self.actual_instance, "to_json") and callable(self.actual_instance.to_json): |
| return self.actual_instance.to_json() |
| else: |
| return json.dumps(self.actual_instance) |
| |
| def to_dict(self) -> Optional[Union[Dict[str, Any], BulkCreateActionVariableBody, BulkDeleteActionVariableBody, BulkUpdateActionVariableBody]]: |
| """Returns the dict representation of the actual instance""" |
| if self.actual_instance is None: |
| return None |
| |
| if hasattr(self.actual_instance, "to_dict") and callable(self.actual_instance.to_dict): |
| return self.actual_instance.to_dict() |
| else: |
| # primitive type |
| return self.actual_instance |
| |
| def to_str(self) -> str: |
| """Returns the string representation of the actual instance""" |
| return pprint.pformat(self.model_dump()) |
| |
| |