blob: a198e6c99c017fafad1596607ea94a46c1bf3177 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
import importlib
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List
from importlib_resources import files
from pyflink.common import TypeInformation
from pyflink.datastream import DataStream, KeySelector, StreamExecutionEnvironment
from pyflink.table import Schema, StreamTableEnvironment, Table
from flink_agents.api.agent import Agent
from flink_agents.api.configuration import Configuration
from flink_agents.api.resource import (
ResourceDescriptor,
ResourceType,
SerializableResource,
)
class AgentBuilder(ABC):
"""Builder for integrating agent with input and output."""
@abstractmethod
def apply(self, agent: Agent) -> "AgentBuilder":
"""Set agent of AgentBuilder.
Parameters
----------
agent : Agent
The agent user defined to run in execution environment.
"""
@abstractmethod
def to_list(self) -> List[Dict[str, Any]]:
"""Get output list of agent execution.
The element in the list is a dict like {'key': output}.
Returns:
-------
list
Outputs of agent execution.
"""
@abstractmethod
def to_datastream(self) -> DataStream:
"""Get output datastream of agent execution.
Returns:
-------
DataStream
Output datastream of agent execution.
"""
# TODO: auto generate output_type.
@abstractmethod
def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:
"""Get output table of agent execution.
Parameters
----------
schema : Schema
Indicate schema of the output table.
output_type : TypeInformation
Indicate schema corresponding type information.
Returns:
-------
Table
Output table of agent execution.
"""
class AgentsExecutionEnvironment(ABC):
"""Base class for agent execution environment."""
_resources: Dict[ResourceType, Dict[str, Any]]
def __init__(self) -> None:
"""Init method."""
self._actions = {}
self._resources = {}
for type in ResourceType:
self._resources[type] = {}
@property
def resources(self) -> Dict[ResourceType, Dict[str, Any]]:
"""Get registered resources."""
return self._resources
@staticmethod
def get_execution_environment(
env: StreamExecutionEnvironment | None = None,
t_env: StreamTableEnvironment | None = None,
**kwargs: Dict[str, Any],
) -> "AgentsExecutionEnvironment":
"""Get agents execution environment.
Currently, user can run flink agents in ide using LocalExecutionEnvironment or
RemoteExecutionEnvironment. To distinguish which environment to use, when run
flink agents with pyflink datastream/table, user should pass flink
StreamExecutionEnvironment when get AgentsExecutionEnvironment.
Returns:
-------
AgentsExecutionEnvironment
Environment for agent execution.
"""
if env is None:
return importlib.import_module(
"flink_agents.runtime.local_execution_environment"
).create_instance(env=env, t_env=t_env, **kwargs)
else:
for path in files("flink_agents.lib").iterdir():
env.add_jars(f"file://{path}")
return importlib.import_module(
"flink_agents.runtime.remote_execution_environment"
).create_instance(env=env, t_env=t_env, **kwargs)
@abstractmethod
def get_config(self, path: str | None = None) -> Configuration:
"""Get the writable configuration for flink agents.
Returns:
-------
WritableConfiguration
The configuration for flink agents.
"""
@abstractmethod
def from_list(self, input: List[Dict[str, Any]]) -> AgentBuilder:
"""Set input for agents. Used for local execution.
Parameters
----------
input : list
Receive a list as input. The element in the list should be a dict like
{'key': Any, 'value': Any} or {'value': Any} , extra field will be ignored.
Returns:
-------
AgentBuilder
A new builder to build an agent for specific input.
"""
@abstractmethod
def from_datastream(
self, input: DataStream, key_selector: KeySelector | Callable | None = None
) -> AgentBuilder:
"""Set input for agents. Used for remote execution.
Parameters
----------
input : DataStream
Receive a DataStream as input.
key_selector : KeySelector
Extract key from each input record.
Returns:
-------
AgentBuilder
A new builder to build an agent for specific input.
"""
@abstractmethod
def from_table(
self,
input: Table,
key_selector: KeySelector | Callable | None = None,
) -> AgentBuilder:
"""Set input for agents. Used for remote execution.
Parameters
----------
input : Table
Receive a Table as input.
t_env: StreamTableEnvironment
table environment supports convert Table to/from DataStream.
key_selector : KeySelector
Extract key from each input record.
Returns:
-------
AgentBuilder
A new builder to build an agent for specific input.
"""
@abstractmethod
def execute(self) -> None:
"""Execute agent individually."""
def add_resource(
self, name: str, instance: SerializableResource | ResourceDescriptor
) -> "AgentsExecutionEnvironment":
"""Register resource to agent execution environment.
Parameters
----------
name : str
The name of the prompt, should be unique in the same Agent.
instance: SerializableResource | ResourceDescriptor
The serializable resource instance, or the descriptor of resource.
Returns:
-------
AgentsExecutionEnvironment
The environment to register the resource.
"""
if isinstance(instance, SerializableResource):
resource_type = instance.resource_type()
elif isinstance(instance, ResourceDescriptor):
resource_type = instance.clazz.resource_type()
else:
err_msg = f"Unexpected resource {instance}"
raise TypeError(err_msg)
if name in self._resources[resource_type]:
msg = f"{resource_type.value} {name} already defined"
raise ValueError(msg)
self._resources[resource_type][name] = instance
return self