[example][java] Add multiple agent integration example (#239)
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
new file mode 100644
index 0000000..4732bd0
--- /dev/null
+++ b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
+import org.apache.flink.agents.examples.agents.ProductSuggestionAgent;
+import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes;
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
+import static org.apache.flink.streaming.api.windowing.time.Time.minutes;
+
+/**
+ * Java example demonstrating multiple workflow agents for product improvement suggestion.
+ *
+ * <p>This example demonstrates a multi-stage streaming pipeline using Flink Agents:
+ *
+ * <ol>
+ * <li>Reads product reviews from a source as a streaming source.
+ * <li>Uses an LLM agent to analyze each review and extract score and unsatisfied reasons.
+ * <li>Aggregates the analysis results in 1-minute tumbling windows, producing score distributions
+ * and collecting all unsatisfied reasons.
+ * <li>Uses another LLM agent to generate product improvement suggestions based on the aggregated
+ * analysis.
+ * <li>Prints the final suggestions to stdout.
+ * </ol>
+ */
+public class WorkflowMultipleAgentExample {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /**
+ * ProcessWindowFunction to aggregate score distribution and dislike reasons.
+ *
+ * <p>This class aggregates multiple ProductReviewAnalysisRes elements within a window,
+ * calculating score distribution percentages and collecting all unsatisfied reasons.
+ */
+ static class AggregateScoreDistributionAndDislikeReasons
+ extends ProcessWindowFunction<ProductReviewAnalysisRes, String, String, TimeWindow> {
+
+ @Override
+ public void process(
+ String key,
+ Context context,
+ Iterable<ProductReviewAnalysisRes> elements,
+ Collector<String> out)
+ throws JsonProcessingException {
+
+ // Initialize rating counts for scores 1-5
+ int[] ratingCounts = new int[5];
+ List<String> reasonList = new ArrayList<>();
+
+ // Process each element in the window
+ for (CustomTypesAndResources.ProductReviewAnalysisRes element : elements) {
+ int rating = element.getScore();
+ if (rating >= 1 && rating <= 5) {
+ ratingCounts[rating - 1]++;
+ }
+ reasonList.addAll(element.getReasons());
+ }
+
+ // Calculate total and percentages
+ int total = 0;
+ for (int count : ratingCounts) {
+ total += count;
+ }
+
+ // Calculate percentages and format them
+ List<String> formattedPercentages = new ArrayList<>();
+ if (total > 0) {
+ for (int count : ratingCounts) {
+ double percentage = Math.round((count * 100.0 / total) * 10.0) / 10.0;
+ formattedPercentages.add(String.format("%.1f%%", percentage));
+ }
+ } else {
+ // If no ratings, set all to 0%
+ for (int i = 0; i < 5; i++) {
+ formattedPercentages.add("0.0%");
+ }
+ }
+
+ // Create and emit the aggregated result
+ ProductReviewSummary summary =
+ new ProductReviewSummary(key, formattedPercentages, reasonList);
+ out.collect(MAPPER.writeValueAsString(summary));
+ }
+ }
+
+ /** Runs the example pipeline. */
+ public static void main(String[] args) throws Exception {
+ // Set up the Flink streaming environment and the Agents execution environment.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+ // Add Ollama chat model connection to be used by the ReviewAnalysisAgent
+ // and ProductSuggestionAgent.
+ agentsEnv.addResource(
+ "ollama_server",
+ ResourceType.CHAT_MODEL,
+ CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
+
+ // Read product reviews from input_data.txt file as a streaming source.
+ // Each element represents a ProductReview.
+ DataStream<String> productReviewStream =
+ env.fromSource(
+ FileSource.forRecordStreamFormat(
+ new TextLineInputFormat(),
+ new Path(
+ Objects.requireNonNull(
+ WorkflowSingleAgentExample.class
+ .getClassLoader()
+ .getResource(
+ "input_data.txt"))
+ .getPath()))
+ .build(),
+ WatermarkStrategy.noWatermarks(),
+ "streaming-agent-example");
+
+ // Use the ReviewAnalysisAgent (LLM) to analyze each review.
+ // The agent extracts the review score and unsatisfied reasons.
+ DataStream<Object> reviewAnalysisResStream =
+ agentsEnv
+ .fromDataStream(productReviewStream)
+ .apply(new ReviewAnalysisAgent())
+ .toDataStream();
+
+ // Aggregate the analysis results in 1-minute tumbling windows.
+ // This produces a score distribution and collects all unsatisfied reasons for
+ // each
+ // product.
+ DataStream<String> aggregatedAnalysisResStream =
+ reviewAnalysisResStream
+ .map(element -> (ProductReviewAnalysisRes) element)
+ .keyBy(ProductReviewAnalysisRes::getId)
+ .window(TumblingProcessingTimeWindows.of(minutes(1)))
+ .process(new AggregateScoreDistributionAndDislikeReasons());
+
+ // Use the ProductSuggestionAgent (LLM) to generate product improvement
+ // suggestions
+ // based on the aggregated analysis results.
+ DataStream<Object> productSuggestionResStream =
+ agentsEnv
+ .fromDataStream(aggregatedAnalysisResStream)
+ .apply(new ProductSuggestionAgent())
+ .toDataStream();
+
+ // Print the final product improvement suggestions to stdout.
+ productSuggestionResStream.print();
+
+ // Execute the pipeline.
+ agentsEnv.execute();
+ }
+}
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
index bece6db..f4587c1 100644
--- a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
+++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
@@ -61,6 +61,37 @@
new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
new ChatMessage(MessageRole.USER, "\"input\":\n" + "{input}")));
+ // Prompt for review analysis react agent
+ public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT =
+ new Prompt(
+ Arrays.asList(
+ new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
+ new ChatMessage(
+ MessageRole.USER, "\"id\": {id},\n" + "\"review\": {review}")));
+
+ // Prompt for product suggestion agent
+ public static final String PRODUCT_SUGGESTION_PROMPT_STR =
+ "Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement.\n\n"
+ + "Input format:\n"
+ + "{\n"
+ + " \"id\": \"1\",\n"
+ + " \"score_histogram\": [\"10%\", \"20%\", \"10%\", \"15%\", \"45%\"],\n"
+ + " \"unsatisfied_reasons\": [\"reason1\", \"reason2\", \"reason3\"]\n"
+ + "}\n\n"
+ + "Ensure that your response can be parsed by Java JSON, use the following format as an example:\n"
+ + "{\n"
+ + " \"suggestion_list\": [\n"
+ + " \"suggestion1\",\n"
+ + " \"suggestion2\",\n"
+ + " \"suggestion3\"\n"
+ + " ]\n"
+ + "}\n\n"
+ + "input:\n"
+ + "{input}";
+
+ public static final Prompt PRODUCT_SUGGESTION_PROMPT =
+ new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
+
/**
* Tool for notifying the shipping manager when product received a negative review due to
* shipping damage.
@@ -146,4 +177,80 @@
"ProductReviewAnalysisRes{id='%s', score=%d, reasons=%s}", id, score, reasons);
}
}
+
+ /** Aggregates multiple reviews and insights using LLM for a product. */
+ @JsonSerialize
+ @JsonDeserialize
+ public static class ProductReviewSummary {
+ private final String id;
+ private final List<String> scoreHist;
+ private final List<String> unsatisfiedReasons;
+
+ @JsonCreator
+ public ProductReviewSummary(
+ @JsonProperty("id") String id,
+ @JsonProperty("scoreHist") List<String> scoreHist,
+ @JsonProperty("unsatisfiedReasons") List<String> unsatisfiedReasons) {
+ this.id = id;
+ this.scoreHist = scoreHist;
+ this.unsatisfiedReasons = unsatisfiedReasons;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<String> getScoreHist() {
+ return scoreHist;
+ }
+
+ public List<String> getUnsatisfiedReasons() {
+ return unsatisfiedReasons;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ProductReviewSummary{id='%s', scoreHist=%s, unsatisfiedReasons=%s}",
+ id, scoreHist, unsatisfiedReasons);
+ }
+ }
+
+ /** Provides a summary of review data including suggestions for improvement. */
+ @JsonSerialize
+ @JsonDeserialize
+ public static class ProductSuggestion {
+ private final String id;
+ private final List<String> scoreHist;
+ private final List<String> suggestions;
+
+ @JsonCreator
+ public ProductSuggestion(
+ @JsonProperty("id") String id,
+ @JsonProperty("scoreHist") List<String> scoreHist,
+ @JsonProperty("suggestions") List<String> suggestions) {
+ this.id = id;
+ this.scoreHist = scoreHist;
+ this.suggestions = suggestions;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<String> getScoreHist() {
+ return scoreHist;
+ }
+
+ public List<String> getSuggestions() {
+ return suggestions;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ProductSuggestion{id='%s', scoreHist=%s, suggestions=%s}",
+ id, scoreHist, suggestions);
+ }
+ }
}
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
new file mode 100644
index 0000000..6487a90
--- /dev/null
+++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Prompt;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
+import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
+import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.PRODUCT_SUGGESTION_PROMPT;
+
+/**
+ * An agent that uses a large language model (LLM) to generate actionable product improvement
+ * suggestions from aggregated product review data.
+ *
+ * <p>This agent receives a summary of product reviews, including a rating distribution and a list
+ * of user dissatisfaction reasons, and produces concrete suggestions for product enhancement. It
+ * handles prompt construction, LLM interaction, and output parsing.
+ */
+public class ProductSuggestionAgent extends Agent {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final String ID = "id";
+ private static final String SCORE_HIST = "score_hist";
+
+ @ChatModelConnection
+ public static ResourceDescriptor ollamaChatModelConnection() {
+ return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+ .addInitialArgument("endpoint", "http://localhost:11434")
+ .build();
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor generateSuggestionModel() {
+ return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+ .addInitialArgument("connection", "ollamaChatModelConnection")
+ .addInitialArgument("model", "qwen3:8b")
+ .addInitialArgument("extract_reasoning", "true")
+ .addInitialArgument("prompt", "productSuggestionPrompt")
+ .build();
+ }
+
+ @Prompt
+ public static org.apache.flink.agents.api.prompt.Prompt productSuggestionPrompt() {
+ return PRODUCT_SUGGESTION_PROMPT;
+ }
+
+ /** Process input event. */
+ @Action(listenEvents = {InputEvent.class})
+ public static void processInput(InputEvent event, RunnerContext ctx) throws Exception {
+ String input = (String) event.getInput();
+
+ ProductReviewSummary summary = MAPPER.readValue(input, ProductReviewSummary.class);
+
+ ctx.getShortTermMemory().set(ID, summary.getId());
+ ctx.getShortTermMemory().set(SCORE_HIST, summary.getScoreHist());
+
+ String content =
+ String.format(
+ "{\n\"id\": %s,\n\"score_histogram\": %s,\n\"unsatisfied_reasons\": %s\n}",
+ summary.getId(), summary.getScoreHist(), summary.getUnsatisfiedReasons());
+
+ ChatMessage msg = new ChatMessage(MessageRole.USER, "", Map.of("input", content));
+
+ ctx.sendEvent(new ChatRequestEvent("generateSuggestionModel", List.of(msg)));
+ }
+
+ /** Process chat response event. */
+ @Action(listenEvents = {ChatResponseEvent.class})
+ public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx)
+ throws Exception {
+ JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+ JsonNode suggestionsNode = jsonNode.findValue("suggestion_list");
+ List<String> suggestions = new ArrayList<>();
+ if (suggestionsNode.isArray()) {
+ for (JsonNode node : suggestionsNode) {
+ suggestions.add(node.asText());
+ }
+ }
+
+ ctx.sendEvent(
+ new OutputEvent(
+ new CustomTypesAndResources.ProductSuggestion(
+ ctx.getShortTermMemory().get(ID).getValue().toString(),
+ (List<String>) ctx.getShortTermMemory().get(SCORE_HIST).getValue(),
+ suggestions)));
+ }
+}