Test throttle() EIP DSL method #2628
diff --git a/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java b/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
index 0637615..c5ffef8 100644
--- a/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
+++ b/integration-test-groups/foundation/eip/src/main/java/org/apache/camel/quarkus/eip/it/EipRoutes.java
@@ -27,6 +27,9 @@
public class EipRoutes extends RouteBuilder {
+ public static final int THROTTLE_PERIOD = 500;
+ public static final int THROTTLE_MAXIMUM_REQUEST_COUNT = 2;
+
@Override
public void configure() {
from("direct:claimCheckByHeader")
@@ -113,6 +116,10 @@
.threads(2)
.setBody(e -> "Hello from thread " + Thread.currentThread().getName());
+ from("direct:throttle")
+ .throttle(THROTTLE_MAXIMUM_REQUEST_COUNT).timePeriodMillis(THROTTLE_PERIOD).rejectExecution(true)
+ .to("mock:throttle");
+
}
@Produces
diff --git a/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java b/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
index 2aea7d6..a41634d 100644
--- a/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
+++ b/integration-test-groups/foundation/eip/src/test/java/org/apache/camel/quarkus/eip/it/EipTest.java
@@ -17,9 +17,12 @@
package org.apache.camel.quarkus.eip.it;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
@@ -434,4 +437,37 @@
}
+ @Test
+ public void throttle() {
+ final int durationMs = EipRoutes.THROTTLE_PERIOD * 4;
+ LOG.infof("About to sent messages for %d ms", durationMs);
+ final long deadline = System.currentTimeMillis() + (durationMs);
+ int i = 0;
+ final Map<Integer, AtomicInteger> statusCounts = new HashMap<>();
+ statusCounts.put(200, new AtomicInteger());
+ statusCounts.put(500, new AtomicInteger()); // the counter for the rejected requests
+ while (System.currentTimeMillis() < deadline) {
+ /* Send messages for 500 ms */
+ final int status = RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body("message-" + i++)
+ .post("/eip/route/throttle")
+ .then()
+ .extract().statusCode();
+ statusCounts.get(status).incrementAndGet();
+ }
+ int successCount = statusCounts.get(200).get();
+ int rejectedCount = statusCounts.get(500).get();
+ LOG.infof("Sent %d messages, sucessful %d, rejected %d", i, successCount, rejectedCount);
+ Assertions.assertThat(rejectedCount).isGreaterThan(0); // assert that some were rejected
+ String[] samples = RestAssured.get("/eip/mock/throttle/" + successCount + "+/5000/body")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body().asString().split(",");
+ LOG.infof("%d messages passed the route", samples.length);
+ Assertions.assertThat(samples.length).isEqualTo(successCount);
+ Assertions.assertThat(successCount)
+ .isLessThanOrEqualTo(EipRoutes.THROTTLE_PERIOD * EipRoutes.THROTTLE_MAXIMUM_REQUEST_COUNT);
+ }
}