Test throttle() EIP DSL method #2628
diff --git a/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java b/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
index 0637615..c5ffef8 100644
--- a/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
+++ b/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
@@ -27,6 +27,9 @@
 
 public class EipRoutes extends RouteBuilder {
 
+    public static final int THROTTLE_PERIOD = 500;
+    public static final int THROTTLE_MAXIMUM_REQUEST_COUNT = 2;
+
     @Override
     public void configure() {
         from("direct:claimCheckByHeader")
@@ -113,6 +116,10 @@
                 .threads(2)
                 .setBody(e -> "Hello from thread " + Thread.currentThread().getName());
 
+        from("direct:throttle")
+                .throttle(THROTTLE_MAXIMUM_REQUEST_COUNT).timePeriodMillis(THROTTLE_PERIOD).rejectExecution(true)
+                .to("mock:throttle");
+
     }
 
     @Produces
diff --git a/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java b/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
index 2aea7d6..a41634d 100644
--- a/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
+++ b/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
@@ -17,9 +17,12 @@
 package org.apache.camel.quarkus.eip.it;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
@@ -434,4 +437,37 @@
 
     }
 
+    @Test
+    public void throttle() {
+        final int durationMs = EipRoutes.THROTTLE_PERIOD * 4;
+        LOG.infof("About to sent messages for %d ms", durationMs);
+        final long deadline = System.currentTimeMillis() + (durationMs);
+        int i = 0;
+        final Map<Integer, AtomicInteger> statusCounts = new HashMap<>();
+        statusCounts.put(200, new AtomicInteger());
+        statusCounts.put(500, new AtomicInteger()); // the counter for the rejected requests
+        while (System.currentTimeMillis() < deadline) {
+            /* Send messages for 500 ms */
+            final int status = RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .body("message-" + i++)
+                    .post("/eip/route/throttle")
+                    .then()
+                    .extract().statusCode();
+            statusCounts.get(status).incrementAndGet();
+        }
+        int successCount = statusCounts.get(200).get();
+        int rejectedCount = statusCounts.get(500).get();
+        LOG.infof("Sent %d messages, sucessful %d, rejected %d", i, successCount, rejectedCount);
+        Assertions.assertThat(rejectedCount).isGreaterThan(0); // assert that some were rejected
+        String[] samples = RestAssured.get("/eip/mock/throttle/" + successCount + "+/5000/body")
+                .then()
+                .statusCode(200)
+                .extract()
+                .body().asString().split(",");
+        LOG.infof("%d messages passed the route", samples.length);
+        Assertions.assertThat(samples.length).isEqualTo(successCount);
+        Assertions.assertThat(successCount)
+                .isLessThanOrEqualTo(EipRoutes.THROTTLE_PERIOD * EipRoutes.THROTTLE_MAXIMUM_REQUEST_COUNT);
+    }
 }