[example][java] Add multiple agent integration example (#239)

diff --git a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
new file mode 100644
index 0000000..4732bd0
--- /dev/null
+++ b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
+import org.apache.flink.agents.examples.agents.ProductSuggestionAgent;
+import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes;
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
+import static org.apache.flink.streaming.api.windowing.time.Time.minutes;
+
+/**
+ * Java example demonstrating multiple workflow agents for product improvement suggestion.
+ *
+ * <p>This example demonstrates a multi-stage streaming pipeline using Flink Agents:
+ *
+ * <ol>
+ *   <li>Reads product reviews from a source as a streaming source.
+ *   <li>Uses an LLM agent to analyze each review and extract score and unsatisfied reasons.
+ *   <li>Aggregates the analysis results in 1-minute tumbling windows, producing score distributions
+ *       and collecting all unsatisfied reasons.
+ *   <li>Uses another LLM agent to generate product improvement suggestions based on the aggregated
+ *       analysis.
+ *   <li>Prints the final suggestions to stdout.
+ * </ol>
+ */
+public class WorkflowMultipleAgentExample {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /**
+     * ProcessWindowFunction to aggregate score distribution and dislike reasons.
+     *
+     * <p>This class aggregates multiple ProductReviewAnalysisRes elements within a window,
+     * calculating score distribution percentages and collecting all unsatisfied reasons.
+     */
+    static class AggregateScoreDistributionAndDislikeReasons
+            extends ProcessWindowFunction<ProductReviewAnalysisRes, String, String, TimeWindow> {
+
+        @Override
+        public void process(
+                String key,
+                Context context,
+                Iterable<ProductReviewAnalysisRes> elements,
+                Collector<String> out)
+                throws JsonProcessingException {
+
+            // Initialize rating counts for scores 1-5
+            int[] ratingCounts = new int[5];
+            List<String> reasonList = new ArrayList<>();
+
+            // Process each element in the window
+            for (CustomTypesAndResources.ProductReviewAnalysisRes element : elements) {
+                int rating = element.getScore();
+                if (rating >= 1 && rating <= 5) {
+                    ratingCounts[rating - 1]++;
+                }
+                reasonList.addAll(element.getReasons());
+            }
+
+            // Calculate total and percentages
+            int total = 0;
+            for (int count : ratingCounts) {
+                total += count;
+            }
+
+            // Calculate percentages and format them
+            List<String> formattedPercentages = new ArrayList<>();
+            if (total > 0) {
+                for (int count : ratingCounts) {
+                    double percentage = Math.round((count * 100.0 / total) * 10.0) / 10.0;
+                    formattedPercentages.add(String.format("%.1f%%", percentage));
+                }
+            } else {
+                // If no ratings, set all to 0%
+                for (int i = 0; i < 5; i++) {
+                    formattedPercentages.add("0.0%");
+                }
+            }
+
+            // Create and emit the aggregated result
+            ProductReviewSummary summary =
+                    new ProductReviewSummary(key, formattedPercentages, reasonList);
+            out.collect(MAPPER.writeValueAsString(summary));
+        }
+    }
+
+    /** Runs the example pipeline. */
+    public static void main(String[] args) throws Exception {
+        // Set up the Flink streaming environment and the Agents execution environment.
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        // Add Ollama chat model connection to be used by the ReviewAnalysisAgent
+        // and ProductSuggestionAgent.
+        agentsEnv.addResource(
+                "ollama_server",
+                ResourceType.CHAT_MODEL,
+                CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
+
+        // Read product reviews from input_data.txt file as a streaming source.
+        // Each element represents a ProductReview.
+        DataStream<String> productReviewStream =
+                env.fromSource(
+                        FileSource.forRecordStreamFormat(
+                                        new TextLineInputFormat(),
+                                        new Path(
+                                                Objects.requireNonNull(
+                                                                WorkflowSingleAgentExample.class
+                                                                        .getClassLoader()
+                                                                        .getResource(
+                                                                                "input_data.txt"))
+                                                        .getPath()))
+                                .build(),
+                        WatermarkStrategy.noWatermarks(),
+                        "streaming-agent-example");
+
+        // Use the ReviewAnalysisAgent (LLM) to analyze each review.
+        // The agent extracts the review score and unsatisfied reasons.
+        DataStream<Object> reviewAnalysisResStream =
+                agentsEnv
+                        .fromDataStream(productReviewStream)
+                        .apply(new ReviewAnalysisAgent())
+                        .toDataStream();
+
+        // Aggregate the analysis results in 1-minute tumbling windows.
+        // This produces a score distribution and collects all unsatisfied reasons for
+        // each
+        // product.
+        DataStream<String> aggregatedAnalysisResStream =
+                reviewAnalysisResStream
+                        .map(element -> (ProductReviewAnalysisRes) element)
+                        .keyBy(ProductReviewAnalysisRes::getId)
+                        .window(TumblingProcessingTimeWindows.of(minutes(1)))
+                        .process(new AggregateScoreDistributionAndDislikeReasons());
+
+        // Use the ProductSuggestionAgent (LLM) to generate product improvement
+        // suggestions
+        // based on the aggregated analysis results.
+        DataStream<Object> productSuggestionResStream =
+                agentsEnv
+                        .fromDataStream(aggregatedAnalysisResStream)
+                        .apply(new ProductSuggestionAgent())
+                        .toDataStream();
+
+        // Print the final product improvement suggestions to stdout.
+        productSuggestionResStream.print();
+
+        // Execute the pipeline.
+        agentsEnv.execute();
+    }
+}
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
index bece6db..f4587c1 100644
--- a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
+++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java
@@ -61,6 +61,37 @@
                             new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
                             new ChatMessage(MessageRole.USER, "\"input\":\n" + "{input}")));
 
+    // Prompt for review analysis react agent
+    public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT =
+            new Prompt(
+                    Arrays.asList(
+                            new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
+                            new ChatMessage(
+                                    MessageRole.USER, "\"id\": {id},\n" + "\"review\": {review}")));
+
+    // Prompt for product suggestion agent
+    public static final String PRODUCT_SUGGESTION_PROMPT_STR =
+            "Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement.\n\n"
+                    + "Input format:\n"
+                    + "{\n"
+                    + "    \"id\": \"1\",\n"
+                    + "    \"score_histogram\": [\"10%\", \"20%\", \"10%\", \"15%\", \"45%\"],\n"
+                    + "    \"unsatisfied_reasons\": [\"reason1\", \"reason2\", \"reason3\"]\n"
+                    + "}\n\n"
+                    + "Ensure that your response can be parsed by Java JSON, use the following format as an example:\n"
+                    + "{\n"
+                    + "    \"suggestion_list\": [\n"
+                    + "        \"suggestion1\",\n"
+                    + "        \"suggestion2\",\n"
+                    + "        \"suggestion3\"\n"
+                    + "    ]\n"
+                    + "}\n\n"
+                    + "input:\n"
+                    + "{input}";
+
+    public static final Prompt PRODUCT_SUGGESTION_PROMPT =
+            new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
+
     /**
      * Tool for notifying the shipping manager when product received a negative review due to
      * shipping damage.
@@ -146,4 +177,80 @@
                     "ProductReviewAnalysisRes{id='%s', score=%d, reasons=%s}", id, score, reasons);
         }
     }
+
+    /** Aggregates multiple reviews and insights using LLM for a product. */
+    @JsonSerialize
+    @JsonDeserialize
+    public static class ProductReviewSummary {
+        private final String id;
+        private final List<String> scoreHist;
+        private final List<String> unsatisfiedReasons;
+
+        @JsonCreator
+        public ProductReviewSummary(
+                @JsonProperty("id") String id,
+                @JsonProperty("scoreHist") List<String> scoreHist,
+                @JsonProperty("unsatisfiedReasons") List<String> unsatisfiedReasons) {
+            this.id = id;
+            this.scoreHist = scoreHist;
+            this.unsatisfiedReasons = unsatisfiedReasons;
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        public List<String> getScoreHist() {
+            return scoreHist;
+        }
+
+        public List<String> getUnsatisfiedReasons() {
+            return unsatisfiedReasons;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "ProductReviewSummary{id='%s', scoreHist=%s, unsatisfiedReasons=%s}",
+                    id, scoreHist, unsatisfiedReasons);
+        }
+    }
+
+    /** Provides a summary of review data including suggestions for improvement. */
+    @JsonSerialize
+    @JsonDeserialize
+    public static class ProductSuggestion {
+        private final String id;
+        private final List<String> scoreHist;
+        private final List<String> suggestions;
+
+        @JsonCreator
+        public ProductSuggestion(
+                @JsonProperty("id") String id,
+                @JsonProperty("scoreHist") List<String> scoreHist,
+                @JsonProperty("suggestions") List<String> suggestions) {
+            this.id = id;
+            this.scoreHist = scoreHist;
+            this.suggestions = suggestions;
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        public List<String> getScoreHist() {
+            return scoreHist;
+        }
+
+        public List<String> getSuggestions() {
+            return suggestions;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "ProductSuggestion{id='%s', scoreHist=%s, suggestions=%s}",
+                    id, scoreHist, suggestions);
+        }
+    }
 }
diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
new file mode 100644
index 0000000..6487a90
--- /dev/null
+++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Prompt;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
+import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
+import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.PRODUCT_SUGGESTION_PROMPT;
+
+/**
+ * An agent that uses a large language model (LLM) to generate actionable product improvement
+ * suggestions from aggregated product review data.
+ *
+ * <p>This agent receives a summary of product reviews, including a rating distribution and a list
+ * of user dissatisfaction reasons, and produces concrete suggestions for product enhancement. It
+ * handles prompt construction, LLM interaction, and output parsing.
+ */
+public class ProductSuggestionAgent extends Agent {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final String ID = "id";
+    private static final String SCORE_HIST = "score_hist";
+
+    @ChatModelConnection
+    public static ResourceDescriptor ollamaChatModelConnection() {
+        return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+                .addInitialArgument("endpoint", "http://localhost:11434")
+                .build();
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor generateSuggestionModel() {
+        return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+                .addInitialArgument("connection", "ollamaChatModelConnection")
+                .addInitialArgument("model", "qwen3:8b")
+                .addInitialArgument("extract_reasoning", "true")
+                .addInitialArgument("prompt", "productSuggestionPrompt")
+                .build();
+    }
+
+    @Prompt
+    public static org.apache.flink.agents.api.prompt.Prompt productSuggestionPrompt() {
+        return PRODUCT_SUGGESTION_PROMPT;
+    }
+
+    /** Process input event. */
+    @Action(listenEvents = {InputEvent.class})
+    public static void processInput(InputEvent event, RunnerContext ctx) throws Exception {
+        String input = (String) event.getInput();
+
+        ProductReviewSummary summary = MAPPER.readValue(input, ProductReviewSummary.class);
+
+        ctx.getShortTermMemory().set(ID, summary.getId());
+        ctx.getShortTermMemory().set(SCORE_HIST, summary.getScoreHist());
+
+        String content =
+                String.format(
+                        "{\n\"id\": %s,\n\"score_histogram\": %s,\n\"unsatisfied_reasons\": %s\n}",
+                        summary.getId(), summary.getScoreHist(), summary.getUnsatisfiedReasons());
+
+        ChatMessage msg = new ChatMessage(MessageRole.USER, "", Map.of("input", content));
+
+        ctx.sendEvent(new ChatRequestEvent("generateSuggestionModel", List.of(msg)));
+    }
+
+    /** Process chat response event. */
+    @Action(listenEvents = {ChatResponseEvent.class})
+    public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx)
+            throws Exception {
+        JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+        JsonNode suggestionsNode = jsonNode.findValue("suggestion_list");
+        List<String> suggestions = new ArrayList<>();
+        if (suggestionsNode.isArray()) {
+            for (JsonNode node : suggestionsNode) {
+                suggestions.add(node.asText());
+            }
+        }
+
+        ctx.sendEvent(
+                new OutputEvent(
+                        new CustomTypesAndResources.ProductSuggestion(
+                                ctx.getShortTermMemory().get(ID).getValue().toString(),
+                                (List<String>) ctx.getShortTermMemory().get(SCORE_HIST).getValue(),
+                                suggestions)));
+    }
+}