| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################# |
| from abc import ABC |
| from typing import Any, Dict |
| |
| try: |
| from typing import override |
| except ImportError: |
| from typing_extensions import override |
| from uuid import UUID, uuid4 |
| |
| from pydantic import BaseModel, Field, model_validator |
| from pydantic_core import PydanticSerializationError |
| from pyflink.common import Row |
| |
| |
| class Event(BaseModel, ABC, extra="allow"): |
| """Base class for all event types in the system. Event allow extra properties, but |
| these properties are required isinstance of BaseModel, or json serializable. |
| |
| Attributes: |
| ---------- |
| id : UUID |
| Unique identifier for the event, automatically generated using uuid4. |
| """ |
| |
| id: UUID = Field(default_factory=uuid4) |
| |
| @staticmethod |
| def __serialize_unknown(field: Any) -> Dict[str, Any]: |
| """Handle serialization of unknown types, specifically Row objects.""" |
| if isinstance(field, Row): |
| return {"type": "Row", "values": field._values} |
| else: |
| err_msg = f"Unable to serialize unknown type: {field.__class__}" |
| raise PydanticSerializationError(err_msg) |
| |
| @override |
| def model_dump_json(self, **kwargs: Any) -> str: |
| """Override model_dump_json to handle Row objects using fallback.""" |
| # Set fallback if not provided in kwargs |
| if "fallback" not in kwargs: |
| kwargs["fallback"] = self.__serialize_unknown |
| return super().model_dump_json(**kwargs) |
| |
| @model_validator(mode="after") |
| def validate_extra(self) -> "Event": |
| """Ensure init fields is serializable.""" |
| self.model_dump_json() |
| return self |
| |
| def __setattr__(self, name: str, value: Any) -> None: |
| super().__setattr__(name, value) |
| # Ensure added property can be serialized. |
| self.model_dump_json() |
| |
| |
| class InputEvent(Event): |
| """Event generated by the framework, carrying an input data that |
| arrives at the agent. |
| """ |
| |
| input: Any |
| |
| |
| class OutputEvent(Event): |
| """Event representing a result from agent. By generating an OutputEvent, |
| actions can emit output data. |
| |
| Attributes: |
| ---------- |
| output : Any |
| The output result returned by the agent. |
| """ |
| |
| output: Any |