blob: 44d32a25896b8808d4e09501acfa277156c07ee6 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
from abc import ABC
from typing import Any, Dict
try:
from typing import override
except ImportError:
from typing_extensions import override
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, model_validator
from pydantic_core import PydanticSerializationError
from pyflink.common import Row
class Event(BaseModel, ABC, extra="allow"):
"""Base class for all event types in the system. Event allow extra properties, but
these properties are required isinstance of BaseModel, or json serializable.
Attributes:
----------
id : UUID
Unique identifier for the event, automatically generated using uuid4.
"""
id: UUID = Field(default_factory=uuid4)
@staticmethod
def __serialize_unknown(field: Any) -> Dict[str, Any]:
"""Handle serialization of unknown types, specifically Row objects."""
if isinstance(field, Row):
return {"type": "Row", "values": field._values}
else:
err_msg = f"Unable to serialize unknown type: {field.__class__}"
raise PydanticSerializationError(err_msg)
@override
def model_dump_json(self, **kwargs: Any) -> str:
"""Override model_dump_json to handle Row objects using fallback."""
# Set fallback if not provided in kwargs
if "fallback" not in kwargs:
kwargs["fallback"] = self.__serialize_unknown
return super().model_dump_json(**kwargs)
@model_validator(mode="after")
def validate_extra(self) -> "Event":
"""Ensure init fields is serializable."""
self.model_dump_json()
return self
def __setattr__(self, name: str, value: Any) -> None:
super().__setattr__(name, value)
# Ensure added property can be serialized.
self.model_dump_json()
class InputEvent(Event):
"""Event generated by the framework, carrying an input data that
arrives at the agent.
"""
input: Any
class OutputEvent(Event):
"""Event representing a result from agent. By generating an OutputEvent,
actions can emit output data.
Attributes:
----------
output : Any
The output result returned by the agent.
"""
output: Any